OStatus plugin: Rolling batch queueing for PuSH output to >50 subscribing sites. Keeps latency down for other things enqueued while we work...
This commit is contained in:
parent
8b9436e8ae
commit
41e9dba729
|
@ -87,6 +87,8 @@ class OStatusPlugin extends Plugin
|
||||||
|
|
||||||
// Outgoing from our internal PuSH hub
|
// Outgoing from our internal PuSH hub
|
||||||
$qm->connect('hubconf', 'HubConfQueueHandler');
|
$qm->connect('hubconf', 'HubConfQueueHandler');
|
||||||
|
$qm->connect('hubprep', 'HubPrepQueueHandler');
|
||||||
|
|
||||||
$qm->connect('hubout', 'HubOutQueueHandler');
|
$qm->connect('hubout', 'HubOutQueueHandler');
|
||||||
|
|
||||||
// Outgoing Salmon replies (when we don't need a return value)
|
// Outgoing Salmon replies (when we don't need a return value)
|
||||||
|
|
|
@ -304,6 +304,26 @@ class HubSub extends Memcached_DataObject
|
||||||
$qm->enqueue($data, 'hubout');
|
$qm->enqueue($data, 'hubout');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue up a large batch of pushes to multiple subscribers
|
||||||
|
* for this same topic update.
|
||||||
|
*
|
||||||
|
* If queues are disabled, this will run immediately.
|
||||||
|
*
|
||||||
|
* @param string $atom well-formed Atom feed
|
||||||
|
* @param array $pushCallbacks list of callback URLs
|
||||||
|
*/
|
||||||
|
function bulkDistribute($atom, $pushCallbacks)
|
||||||
|
{
|
||||||
|
$data = array('atom' => $atom,
|
||||||
|
'topic' => $this->topic,
|
||||||
|
'pushCallbacks' => $pushCallbacks);
|
||||||
|
common_log(LOG_INFO, "Queuing PuSH batch: $this->topic to " .
|
||||||
|
count($pushCallbacks) . " sites");
|
||||||
|
$qm = QueueManager::get();
|
||||||
|
$qm->enqueue($data, 'hubprep');
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a 'fat ping' to the subscriber's callback endpoint
|
* Send a 'fat ping' to the subscriber's callback endpoint
|
||||||
* containing the given Atom feed chunk.
|
* containing the given Atom feed chunk.
|
||||||
|
|
|
@ -25,6 +25,18 @@
|
||||||
*/
|
*/
|
||||||
class OStatusQueueHandler extends QueueHandler
|
class OStatusQueueHandler extends QueueHandler
|
||||||
{
|
{
|
||||||
|
// If we have more than this many subscribing sites on a single feed,
|
||||||
|
// break up the PuSH distribution into smaller batches which will be
|
||||||
|
// rolled into the queue progressively. This reduces disruption to
|
||||||
|
// other, shorter activities being enqueued while we work.
|
||||||
|
const MAX_UNBATCHED = 50;
|
||||||
|
|
||||||
|
// Each batch (a 'hubprep' entry) will have this many items.
|
||||||
|
// Selected to provide a balance between queue packet size
|
||||||
|
// and number of batches that will end up getting processed.
|
||||||
|
// For 20,000 target sites, 1000 should work acceptably.
|
||||||
|
const BATCH_SIZE = 1000;
|
||||||
|
|
||||||
function transport()
|
function transport()
|
||||||
{
|
{
|
||||||
return 'ostatus';
|
return 'ostatus';
|
||||||
|
@ -147,14 +159,31 @@ class OStatusQueueHandler extends QueueHandler
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue up direct feed update pushes to subscribers on our internal hub.
|
* Queue up direct feed update pushes to subscribers on our internal hub.
|
||||||
|
* If there are a large number of subscriber sites, intermediate bulk
|
||||||
|
* distribution triggers may be queued.
|
||||||
|
*
|
||||||
* @param string $atom update feed, containing only new/changed items
|
* @param string $atom update feed, containing only new/changed items
|
||||||
* @param HubSub $sub open query of subscribers
|
* @param HubSub $sub open query of subscribers
|
||||||
*/
|
*/
|
||||||
function pushFeedInternal($atom, $sub)
|
function pushFeedInternal($atom, $sub)
|
||||||
{
|
{
|
||||||
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
|
common_log(LOG_INFO, "Preparing $sub->N PuSH distribution(s) for $sub->topic");
|
||||||
|
$n = 0;
|
||||||
|
$batch = array();
|
||||||
while ($sub->fetch()) {
|
while ($sub->fetch()) {
|
||||||
$sub->distribute($atom);
|
$n++;
|
||||||
|
if ($n < self::MAX_UNBATCHED) {
|
||||||
|
$sub->distribute($atom);
|
||||||
|
} else {
|
||||||
|
$batch[] = $sub->callback;
|
||||||
|
if (count($batch) >= self::BATCH_SIZE) {
|
||||||
|
$sub->bulkDistribute($atom, $batch);
|
||||||
|
$batch = array();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count($batch) >= 0) {
|
||||||
|
$sub->bulkDistribute($atom, $batch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user