From c36bdc1ba535dc3e2dc9098dbe40735b1955d96d Mon Sep 17 00:00:00 2001 From: Brion Vibber Date: Wed, 24 Feb 2010 20:36:36 +0000 Subject: [PATCH] - break OMB profile update pings to a background queue - add event hooks to profile update pings - send Salmon pings with custom update-profile event to OStatus subscribees and groups (subscribers will see it on your next post) - fix OStatus queues with overlong transport names, should work on DB queues now - Ostatus_profile::notifyActivity() and ::notifyDeferred() now can take XML, Notice, or Activity for convenience --- lib/activity.php | 3 ++ lib/profilequeuehandler.php | 48 +++++++++++++++++ lib/queuemanager.php | 3 ++ lib/util.php | 14 +++-- plugins/OStatus/OStatusPlugin.php | 53 +++++++++++++++++-- plugins/OStatus/actions/pushcallback.php | 2 +- plugins/OStatus/classes/HubSub.php | 2 +- plugins/OStatus/classes/Ostatus_profile.php | 52 +++++++++++++++--- ...euehandler.php => hubconfqueuehandler.php} | 4 +- plugins/OStatus/lib/ostatusqueuehandler.php | 22 ++------ ...ueuehandler.php => pushinqueuehandler.php} | 4 +- ...ueuehandler.php => salmonqueuehandler.php} | 4 +- 12 files changed, 170 insertions(+), 41 deletions(-) create mode 100644 lib/profilequeuehandler.php rename plugins/OStatus/lib/{hubverifyqueuehandler.php => hubconfqueuehandler.php} (95%) rename plugins/OStatus/lib/{pushinputqueuehandler.php => pushinqueuehandler.php} (94%) rename plugins/OStatus/lib/{salmonoutqueuehandler.php => salmonqueuehandler.php} (94%) diff --git a/lib/activity.php b/lib/activity.php index fa4ae02748..33932ad0ef 100644 --- a/lib/activity.php +++ b/lib/activity.php @@ -691,6 +691,9 @@ class ActivityVerb const UNFAVORITE = 'http://ostatus.org/schema/1.0/unfavorite'; const UNFOLLOW = 'http://ostatus.org/schema/1.0/unfollow'; const LEAVE = 'http://ostatus.org/schema/1.0/leave'; + + // For simple profile-update pings; no content to share. + const UPDATE_PROFILE = 'http://ostatus.org/schema/1.0/update-profile'; } class ActivityContext diff --git a/lib/profilequeuehandler.php b/lib/profilequeuehandler.php new file mode 100644 index 0000000000..e8a00aef30 --- /dev/null +++ b/lib/profilequeuehandler.php @@ -0,0 +1,48 @@ +. + */ + +/** + * @package QueueHandler + * @maintainer Brion Vibber + */ + +class ProfileQueueHandler extends QueueHandler +{ + + function transport() + { + return 'profile'; + } + + function handle($profile) + { + if (!($profile instanceof Profile)) { + common_log(LOG_ERR, "Got a bogus profile, not broadcasting"); + return true; + } + + if (Event::handle('StartBroadcastProfile', array($profile))) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_profile($profile); + } + Event::handle('EndBroadcastProfile', array($profile)); + return true; + } + +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 8f8c8f133f..9fdc801100 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -262,6 +262,9 @@ abstract class QueueManager extends IoManager $this->connect('sms', 'SmsQueueHandler'); } + // Broadcasting profile updates to OMB remote subscribers + $this->connect('profile', 'ProfileQueueHandler'); + // XMPP output handlers... if (common_config('xmpp', 'enabled')) { // Delivery prep, read by queuedaemon.php: diff --git a/lib/util.php b/lib/util.php index 7fb2c6c4b0..9354431f27 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1119,12 +1119,16 @@ function common_enqueue_notice($notice) return true; } -function common_broadcast_profile($profile) +/** + * Broadcast profile updates to OMB and other remote subscribers. + * + * Since this may be slow with a lot of subscribers or bad remote sites, + * this is run through the background queues if possible. + */ +function common_broadcast_profile(Profile $profile) { - // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ - require_once(INSTALLDIR.'/lib/omb.php'); - omb_broadcast_profile($profile); - // XXX: Other broadcasts...? + $qm = QueueManager::get(); + $qm->enqueue($profile, "profile"); return true; } diff --git a/plugins/OStatus/OStatusPlugin.php b/plugins/OStatus/OStatusPlugin.php index 9376c048de..90abe034de 100644 --- a/plugins/OStatus/OStatusPlugin.php +++ b/plugins/OStatus/OStatusPlugin.php @@ -82,14 +82,14 @@ class OStatusPlugin extends Plugin $qm->connect('ostatus', 'OStatusQueueHandler'); // Outgoing from our internal PuSH hub - $qm->connect('hubverify', 'HubVerifyQueueHandler'); + $qm->connect('hubconf', 'HubConfQueueHandler'); $qm->connect('hubout', 'HubOutQueueHandler'); // Outgoing Salmon replies (when we don't need a return value) - $qm->connect('salmonout', 'SalmonOutQueueHandler'); + $qm->connect('salmon', 'SalmonQueueHandler'); // Incoming from a foreign PuSH hub - $qm->connect('pushinput', 'PushInputQueueHandler'); + $qm->connect('pushin', 'PushInQueueHandler'); return true; } @@ -656,4 +656,51 @@ class OStatusPlugin extends Plugin return true; } + + /** + * Ping remote profiles with updates to this profile. + * Salmon pings are queued for background processing. + */ + function onEndBroadcastProfile(Profile $profile) + { + $user = User::staticGet('id', $profile->id); + + // Find foreign accounts I'm subscribed to that support Salmon pings. + // + // @fixme we could run updates through the PuSH feed too, + // in which case we can skip Salmon pings to folks who + // are also subscribed to me. + $sql = "SELECT * FROM ostatus_profile " . + "WHERE profile_id IN " . + "(SELECT subscribed FROM subscription WHERE subscriber=%d) " . + "OR group_id IN " . + "(SELECT group_id FROM group_member WHERE profile_id=%d)"; + $oprofile = new Ostatus_profile(); + $oprofile->query(sprintf($sql, $profile->id, $profile->id)); + + if ($oprofile->N == 0) { + common_log(LOG_DEBUG, "No OStatus remote subscribees for $profile->nickname"); + return true; + } + + $act = new Activity(); + + $act->verb = ActivityVerb::UPDATE_PROFILE; + $act->id = TagURI::mint('update-profile:%d:%s', + $profile->id, + common_date_iso8601(time())); + $act->time = time(); + $act->title = _m("Profile update"); + $act->content = sprintf(_m("%s has updated their profile page."), + $profile->getBestName()); + + $act->actor = ActivityObject::fromProfile($profile); + $act->object = $act->actor; + + while ($oprofile->fetch()) { + $oprofile->notifyDeferred($act); + } + + return true; + } } diff --git a/plugins/OStatus/actions/pushcallback.php b/plugins/OStatus/actions/pushcallback.php index 4184f0e0c0..9a2067b8ca 100644 --- a/plugins/OStatus/actions/pushcallback.php +++ b/plugins/OStatus/actions/pushcallback.php @@ -68,7 +68,7 @@ class PushCallbackAction extends Action 'post' => $post, 'hmac' => $hmac); $qm = QueueManager::get(); - $qm->enqueue($data, 'pushinput'); + $qm->enqueue($data, 'pushin'); } /** diff --git a/plugins/OStatus/classes/HubSub.php b/plugins/OStatus/classes/HubSub.php index eae2928c32..1ac181feeb 100644 --- a/plugins/OStatus/classes/HubSub.php +++ b/plugins/OStatus/classes/HubSub.php @@ -164,7 +164,7 @@ class HubSub extends Memcached_DataObject 'token' => $token, 'retries' => $retries); $qm = QueueManager::get(); - $qm->enqueue($data, 'hubverify'); + $qm->enqueue($data, 'hubconf'); } /** diff --git a/plugins/OStatus/classes/Ostatus_profile.php b/plugins/OStatus/classes/Ostatus_profile.php index 9f9efb96ee..61505206ec 100644 --- a/plugins/OStatus/classes/Ostatus_profile.php +++ b/plugins/OStatus/classes/Ostatus_profile.php @@ -431,21 +431,57 @@ class Ostatus_profile extends Memcached_DataObject return false; } - public function notifyActivity($activity) + /** + * Send a Salmon notification ping immediately, and confirm that we got + * an acceptable response from the remote site. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyActivity($entry) { if ($this->salmonuri) { - - $xml = '' . - $activity->asString(true); - - $salmon = new Salmon(); // ? - - return $salmon->post($this->salmonuri, $xml); + $salmon = new Salmon(); + return $salmon->post($this->salmonuri, $this->notifyPrepXml($entry)); } return false; } + /** + * Queue a Salmon notification for later. If queues are disabled we'll + * send immediately but won't get the return value. + * + * @param mixed $entry XML string, Notice, or Activity + * @return boolean success + */ + public function notifyDeferred($entry) + { + if ($this->salmonuri) { + $data = array('salmonuri' => $this->salmonuri, + 'entry' => $this->notifyPrepXml($entry)); + + $qm = QueueManager::get(); + return $qm->enqueue($data, 'salmon'); + } + + return false; + } + + protected function notifyPrepXml($entry) + { + $preamble = ''; + if (is_string($entry)) { + return $entry; + } else if ($entry instanceof Activity) { + return $preamble . $entry->asString(true); + } else if ($entry instanceof Notice) { + return $preamble . $entry->asAtomEntry(true, true); + } else { + throw new ServerException("Invalid type passed to Ostatus_profile::notify; must be XML string or Activity entry"); + } + } + function getBestName() { if ($this->isGroup()) { diff --git a/plugins/OStatus/lib/hubverifyqueuehandler.php b/plugins/OStatus/lib/hubconfqueuehandler.php similarity index 95% rename from plugins/OStatus/lib/hubverifyqueuehandler.php rename to plugins/OStatus/lib/hubconfqueuehandler.php index 7ce9e14312..c8e0b72fee 100644 --- a/plugins/OStatus/lib/hubverifyqueuehandler.php +++ b/plugins/OStatus/lib/hubconfqueuehandler.php @@ -22,11 +22,11 @@ * @package Hub * @author Brion Vibber */ -class HubVerifyQueueHandler extends QueueHandler +class HubConfQueueHandler extends QueueHandler { function transport() { - return 'hubverify'; + return 'hubconf'; } function handle($data) diff --git a/plugins/OStatus/lib/ostatusqueuehandler.php b/plugins/OStatus/lib/ostatusqueuehandler.php index c1e50bffa1..0da85600fb 100644 --- a/plugins/OStatus/lib/ostatusqueuehandler.php +++ b/plugins/OStatus/lib/ostatusqueuehandler.php @@ -83,23 +83,11 @@ class OStatusQueueHandler extends QueueHandler function pingReply($oprofile) { if ($this->user) { - if (!empty($oprofile->salmonuri)) { - // For local posts, send a Salmon ping to the mentioned - // remote user or group. - // @fixme as an optimization we can skip this if the - // remote profile is subscribed to the author. - - common_log(LOG_INFO, "Prepping to send notice '{$this->notice->uri}' to remote profile '{$oprofile->uri}'."); - - $xml = ''; - $xml .= $this->notice->asAtomEntry(true, true); - - $data = array('salmonuri' => $oprofile->salmonuri, - 'entry' => $xml); - - $qm = QueueManager::get(); - $qm->enqueue($data, 'salmonout'); - } + // For local posts, send a Salmon ping to the mentioned + // remote user or group. + // @fixme as an optimization we can skip this if the + // remote profile is subscribed to the author. + $oprofile->notifyDeferred($this->notice); } } diff --git a/plugins/OStatus/lib/pushinputqueuehandler.php b/plugins/OStatus/lib/pushinqueuehandler.php similarity index 94% rename from plugins/OStatus/lib/pushinputqueuehandler.php rename to plugins/OStatus/lib/pushinqueuehandler.php index cbd9139b50..a90f52df26 100644 --- a/plugins/OStatus/lib/pushinputqueuehandler.php +++ b/plugins/OStatus/lib/pushinqueuehandler.php @@ -23,11 +23,11 @@ * @author Brion Vibber */ -class PushInputQueueHandler extends QueueHandler +class PushInQueueHandler extends QueueHandler { function transport() { - return 'pushinput'; + return 'pushin'; } function handle($data) diff --git a/plugins/OStatus/lib/salmonoutqueuehandler.php b/plugins/OStatus/lib/salmonqueuehandler.php similarity index 94% rename from plugins/OStatus/lib/salmonoutqueuehandler.php rename to plugins/OStatus/lib/salmonqueuehandler.php index 536ff94af6..aa97018dc9 100644 --- a/plugins/OStatus/lib/salmonoutqueuehandler.php +++ b/plugins/OStatus/lib/salmonqueuehandler.php @@ -22,11 +22,11 @@ * @package OStatusPlugin * @author Brion Vibber */ -class SalmonOutQueueHandler extends QueueHandler +class SalmonQueueHandler extends QueueHandler { function transport() { - return 'salmonout'; + return 'salmon'; } function handle($data)