OStatus PuSH fixlets:
- set minimal error page output on PuSH callback - allow hub to retry ($config['ostatus']['hub_retries']), simplify internal iface a bit. Retries are pushed to end of queue but otherwise not delayed yet; makes delivery more robust to one-off transitory errors but not yet against downtime.
This commit is contained in:
parent
37179a91d5
commit
aa0b2ce81a
|
@ -29,6 +29,7 @@ class PushCallbackAction extends Action
|
||||||
{
|
{
|
||||||
function handle()
|
function handle()
|
||||||
{
|
{
|
||||||
|
StatusNet::setApi(true); // Minimize error messages to aid in debugging
|
||||||
parent::handle();
|
parent::handle();
|
||||||
if ($_SERVER['REQUEST_METHOD'] == 'POST') {
|
if ($_SERVER['REQUEST_METHOD'] == 'POST') {
|
||||||
$this->handlePost();
|
$this->handlePost();
|
||||||
|
|
|
@ -226,6 +226,26 @@ class HubSub extends Memcached_DataObject
|
||||||
return parent::insert();
|
return parent::insert();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule delivery of a 'fat ping' to the subscriber's callback
|
||||||
|
* endpoint. If queues are disabled, this will run immediately.
|
||||||
|
*
|
||||||
|
* @param string $atom well-formed Atom feed
|
||||||
|
* @param int $retries optional count of retries if POST fails; defaults to hub_retries from config or 0 if unset
|
||||||
|
*/
|
||||||
|
function distribute($atom, $retries=null)
|
||||||
|
{
|
||||||
|
if ($retries === null) {
|
||||||
|
$retries = intval(common_config('ostatus', 'hub_retries'));
|
||||||
|
}
|
||||||
|
|
||||||
|
$data = array('sub' => clone($this),
|
||||||
|
'atom' => $atom,
|
||||||
|
'retries' => $retries);
|
||||||
|
$qm = QueueManager::get();
|
||||||
|
$qm->enqueue($data, 'hubout');
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a 'fat ping' to the subscriber's callback endpoint
|
* Send a 'fat ping' to the subscriber's callback endpoint
|
||||||
* containing the given Atom feed chunk.
|
* containing the given Atom feed chunk.
|
||||||
|
@ -234,6 +254,7 @@ class HubSub extends Memcached_DataObject
|
||||||
* a higher level; don't just shove in a complete feed!
|
* a higher level; don't just shove in a complete feed!
|
||||||
*
|
*
|
||||||
* @param string $atom well-formed Atom feed
|
* @param string $atom well-formed Atom feed
|
||||||
|
* @throws Exception (HTTP or general)
|
||||||
*/
|
*/
|
||||||
function push($atom)
|
function push($atom)
|
||||||
{
|
{
|
||||||
|
@ -245,24 +266,18 @@ class HubSub extends Memcached_DataObject
|
||||||
$hmac = '(none)';
|
$hmac = '(none)';
|
||||||
}
|
}
|
||||||
common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
|
common_log(LOG_INFO, "About to push feed to $this->callback for $this->topic, HMAC $hmac");
|
||||||
try {
|
|
||||||
$request = new HTTPClient();
|
$request = new HTTPClient();
|
||||||
$request->setBody($atom);
|
$request->setBody($atom);
|
||||||
$response = $request->post($this->callback, $headers);
|
$response = $request->post($this->callback, $headers);
|
||||||
|
|
||||||
if ($response->isOk()) {
|
if ($response->isOk()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
} else {
|
||||||
common_log(LOG_ERR, "Error sending PuSH content " .
|
throw new Exception("Callback returned status: " .
|
||||||
"to $this->callback for $this->topic: " .
|
$response->getStatus() .
|
||||||
$response->getStatus());
|
"; body: " .
|
||||||
return false;
|
trim($response->getBody()));
|
||||||
|
|
||||||
} catch (Exception $e) {
|
|
||||||
common_log(LOG_ERR, "Error sending PuSH content " .
|
|
||||||
"to $this->callback for $this->topic: " .
|
|
||||||
$e->getMessage());
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,10 +124,7 @@ class HubDistribQueueHandler extends QueueHandler
|
||||||
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
|
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
|
||||||
$qm = QueueManager::get();
|
$qm = QueueManager::get();
|
||||||
while ($sub->fetch()) {
|
while ($sub->fetch()) {
|
||||||
common_log(LOG_INFO, "Prepping PuSH distribution to $sub->callback for $sub->topic");
|
$sub->distribute($atom);
|
||||||
$data = array('sub' => clone($sub),
|
|
||||||
'atom' => $atom);
|
|
||||||
$qm->enqueue($data, 'hubout');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ class HubOutQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
$sub = $data['sub'];
|
$sub = $data['sub'];
|
||||||
$atom = $data['atom'];
|
$atom = $data['atom'];
|
||||||
|
$retries = $data['retries'];
|
||||||
|
|
||||||
assert($sub instanceof HubSub);
|
assert($sub instanceof HubSub);
|
||||||
assert(is_string($atom));
|
assert(is_string($atom));
|
||||||
|
@ -40,13 +41,20 @@ class HubOutQueueHandler extends QueueHandler
|
||||||
try {
|
try {
|
||||||
$sub->push($atom);
|
$sub->push($atom);
|
||||||
} catch (Exception $e) {
|
} catch (Exception $e) {
|
||||||
common_log(LOG_ERR, "Failed PuSH to $sub->callback for $sub->topic: " .
|
$retries--;
|
||||||
$e->getMessage());
|
$msg = "Failed PuSH to $sub->callback for $sub->topic: " .
|
||||||
// @fixme Reschedule a later delivery?
|
$e->getMessage();
|
||||||
return true;
|
if ($retries > 0) {
|
||||||
|
common_log(LOG_ERR, "$msg; scheduling for $retries more tries");
|
||||||
|
|
||||||
|
// @fixme when we have infrastructure to schedule a retry
|
||||||
|
// after a delay, use it.
|
||||||
|
$sub->distribute($atom, $retries);
|
||||||
|
} else {
|
||||||
|
common_log(LOG_ERR, "$msg; discarding");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user