Rework the push mechanism a bit to a less DB dependant queue

This commit is contained in:
Mikael Nordfeldth 2017-07-10 14:43:28 +02:00
parent 3bc2454e91
commit 74a60ab963
2 changed files with 100 additions and 58 deletions

View File

@ -234,15 +234,11 @@ class HubSub extends Managed_DataObject
$retries = intval(common_config('ostatus', 'hub_retries')); $retries = intval(common_config('ostatus', 'hub_retries'));
} }
// We dare not clone() as when the clone is discarded it'll $data = array('topic' => $this->getTopic(),
// destroy the result data for the parent query. 'callback' => $this->callback,
// @fixme use clone() again when it's safe to copy an
// individual item from a multi-item query again.
$sub = HubSub::getByHashkey($this->getTopic(), $this->callback);
$data = array('sub' => $sub,
'atom' => $atom, 'atom' => $atom,
'retries' => $retries); 'retries' => $retries);
common_log(LOG_INFO, "Queuing WebSub: {$this->getTopic()} to {$this->callback}"); common_log(LOG_INFO, sprintf('Queuing WebSub: %s to %s', _ve($data['topic']), _ve($data['callback'])));
$qm = QueueManager::get(); $qm = QueueManager::get();
$qm->enqueue($data, 'hubout'); $qm->enqueue($data, 'hubout');
} }
@ -272,41 +268,72 @@ class HubSub extends Managed_DataObject
} }
/** /**
* Send a 'fat ping' to the subscriber's callback endpoint * @return boolean true/false for HTTP response
* containing the given Atom feed chunk. * @throws Exception for lower-than-HTTP errors (such as NS lookup failure, connection refused...)
*
* Determination of which items to send should be done at
* a higher level; don't just shove in a complete feed!
*
* @param string $atom well-formed Atom feed
* @throws Exception (HTTP or general)
*/ */
function push($atom) public static function pushAtom($topic, $callback, $atom, $secret=null, $hashalg='sha1')
{ {
$headers = array('Content-Type: application/atom+xml'); $headers = array('Content-Type: application/atom+xml');
if ($this->secret) { if ($secret) {
$hmac = hash_hmac('sha1', $atom, $this->secret); $hmac = hash_hmac($hashalg, $atom, $secret);
$headers[] = "X-Hub-Signature: sha1=$hmac"; $headers[] = "X-Hub-Signature: {$hashalg}={$hmac}";
} else { } else {
$hmac = '(none)'; $hmac = '(none)';
} }
common_log(LOG_INFO, "About to push feed to $this->callback for {$this->getTopic()}, HMAC $hmac"); common_log(LOG_INFO, sprintf('About to WebSub-push feed to %s for %s, HMAC %s', _ve($callback), _ve($topic), _ve($hmac)));
$request = new HTTPClient(); $request = new HTTPClient();
$request->setConfig(array('follow_redirects' => false)); $request->setConfig(array('follow_redirects' => false));
$request->setBody($atom); $request->setBody($atom);
// This will throw exception on non-HTTP failures
try { try {
$response = $request->post($this->callback, $headers); $response = $request->post($callback, $headers);
if ($response->isOk()) {
return true;
}
} catch (Exception $e) { } catch (Exception $e) {
$response = null; common_debug(sprintf('WebSub callback to %s for %s failed with exception %s: %s', _ve($callback), _ve($topic), _ve(get_class($e)), _ve($e->getMessage())));
throw $e;
common_debug('WebSub callback to '._ve($this->callback).' for '._ve($this->getTopic()).' failed with exception: '._ve($e->getMessage()));
} }
return $response->isOk();
}
/**
* Send a 'fat ping' to the subscriber's callback endpoint
* containing the given Atom feed chunk.
*
* Determination of which feed items to send should be done at
* a higher level; don't just shove in a complete feed!
*
* FIXME: Add 'failed' incremental count.
*
* @param string $atom well-formed Atom feed
* @return boolean Whether the PuSH was accepted or not.
* @throws Exception (HTTP or general)
*/
function push($atom)
{
try {
$success = self::pushAtom($this->getTopic(), $this->callback, $atom, $this->secret);
if ($success) {
return true;
} elseif ('https' === parse_url($this->callback, PHP_URL_SCHEME)) {
// Already HTTPS, no need to check remote http/https migration issues
return false;
}
// if pushAtom returned false and we didn't try an HTTPS endpoint,
// let's try HTTPS too (assuming only http:// and https:// are used ;))
} catch (Exception $e) {
if ('https' === parse_url($this->callback, PHP_URL_SCHEME)) {
// Already HTTPS, no need to check remote http/https migration issues
throw $e;
}
}
// We failed the WebSub push, but it might be that the remote site has changed their configuration to HTTPS
common_debug('WebSub HTTPSFIX: push failed, so we need to see if it can be the remote http->https migration issue.');
// XXX: DO NOT trust a Location header here, _especially_ from 'http' protocols, // XXX: DO NOT trust a Location header here, _especially_ from 'http' protocols,
// but not 'https' either at least if we don't do proper CA verification. Trust that // but not 'https' either at least if we don't do proper CA verification. Trust that
// the most common change here is simply switching 'http' to 'https' and we will // the most common change here is simply switching 'http' to 'https' and we will
@ -314,37 +341,38 @@ class HubSub extends Managed_DataObject
// if we want to change the callback URLs, preferrably just manual resubscriptions // if we want to change the callback URLs, preferrably just manual resubscriptions
// from the remote side, combined with implemented WebSub subscription timeouts. // from the remote side, combined with implemented WebSub subscription timeouts.
// We failed the WebSub, but it might be that the remote site has changed their configuration to HTTPS // Test if the feed callback for this node has already been migrated to HTTPS in our database
if ('http' === parse_url($this->callback, PHP_URL_SCHEME)) { // (otherwise we'd get collisions when inserting it further down)
// Test if the feed callback for this node has migrated to HTTPS $httpscallback = preg_replace('/^http/', 'https', $this->callback, 1);
$httpscallback = preg_replace('/^http/', 'https', $this->callback, 1); $alreadyreplaced = self::getByHashKey($this->getTopic(), $httpscallback);
$alreadyreplaced = self::getByHashKey($this->getTopic(), $httpscallback); if ($alreadyreplaced instanceof HubSub) {
if ($alreadyreplaced instanceof HubSub) { // Let's remove the old HTTP callback object.
$this->delete(); $this->delete();
throw new AlreadyFulfilledException('The remote side has already established an HTTPS callback, deleting the legacy HTTP entry.');
}
common_debug('WebSub callback to '._ve($this->callback).' for '._ve($this->getTopic()).' trying HTTPS callback: '._ve($httpscallback)); // XXX: I think this means we might lose a message or two when
$response = $request->post($httpscallback, $headers); // remote side migrates to HTTPS because we only try _once_
if ($response->isOk()) { // for _one_ WebSub push. The rest of the posts already
$orig = clone($this); // stored in our queue (if any) will not find a HubSub
$this->callback = $httpscallback; // object. This could maybe be fixed by handling migration
// NOTE: hashkey will be set in $this->onUpdateKeys($orig) through updateWithKeys // in HubOutQueueHandler while handling the item there.
$this->updateWithKeys($orig); common_debug('WebSub HTTPSFIX: Pushing Atom to HTTPS callback instead of HTTP, because of switch to HTTPS since enrolled in queue.');
return true; return self::pushAtom($this->getTopic(), $httpscallback, $atom, $this->secret);
}
} }
// FIXME: Add 'failed' incremental count for this callback. common_debug('WebSub HTTPSFIX: callback to '._ve($this->callback).' for '._ve($this->getTopic()).' trying HTTPS callback: '._ve($httpscallback));
$success = self::pushAtom($this->getTopic(), $httpscallback, $atom, $this->secret);
if (is_null($response)) { if ($success) {
// This means we got a lower-than-HTTP level error, like domain not found or maybe connection refused // Yay, we made a successful push to https://, let's remember this in the future!
// This should be using a more distinguishable exception class, but for now this will do. $orig = clone($this);
throw new Exception(sprintf(_m('HTTP request failed without response to URL: %s'), _ve(isset($httpscallback) ? $httpscallback : $this->callback))); $this->callback = $httpscallback;
// NOTE: hashkey will be set in $this->onUpdateKeys($orig) through updateWithKeys
$this->updateWithKeys($orig);
return true;
} }
// TRANS: Exception. %1$s is a response status code, %2$s is the body of the response. // If there have been any exceptions thrown before, they're handled
throw new Exception(sprintf(_m('Callback returned status: %1$s. Body: %2$s'), // higher up. This function's return value is just whether the WebSub
$response->getStatus(),trim($response->getBody()))); // push was accepted or not.
return $success;
} }
} }

View File

@ -35,12 +35,26 @@ class HubOutQueueHandler extends QueueHandler
function handle($data) function handle($data)
{ {
$sub = $data['sub']; assert(array_key_exists('atom', $data));
assert(is_string($data['atom']));
$atom = $data['atom']; $atom = $data['atom'];
$retries = $data['retries'];
assert(array_key_exists('retries', $data));
$retries = intval($data['retries']);
if (array_key_exists('topic', $data) && array_key_exists('callback', $data)) {
assert(is_string($data['topic']));
assert(is_string($data['callback']));
$sub = HubSub::getByHashkey($data['topic'], $data['callback']);
} elseif (array_key_exists('sub', $data)) {
// queue behaviour changed 2017-07-09 to store topic/callback instead of sub object
common_debug('Legacy behaviour of storing HubSub objects found, this should go away when all objects are handled...');
$sub = $data['sub'];
} else {
throw new ServerException('No HubSub object available with queue item data.');
}
assert($sub instanceof HubSub); assert($sub instanceof HubSub);
assert(is_string($atom));
try { try {
$sub->push($atom); $sub->push($atom);