OStatus group delivery initial implementation.

- added rel="ostatus:attention" links for group delivery
- added events for plugins to override group profile/permalink pages
- pulled Notice::saveGroups up to save-time so we can override;
  it's relatively cheap and gives us a clean list of target
  groups for distrib time even with customized delivery.
- fixed notice::getGroups to return group objects as expected
- added some doc on new parameters to Notice::saveNew
 - 'groups' list of group IDs to push to in place of parsing
- messages that come in via PuSH and contain local group targets
  are delivered to local group members
- messages that come in via PuSH and contain remote group targets
  are delivered to local members of the remote group

Todo:
- handle group posts that only come through Salmon
- handle conflicts in case something comes in both through Salmon and PuSH
- better source verification
- need a cleaner interface to look up groups by URI
- need a way to handle remote groups with conflicting names
This commit is contained in:
Brion Vibber 2010-02-23 00:44:45 +00:00
parent 8aa8e124cb
commit d410df0406
7 changed files with 207 additions and 32 deletions

View File

@ -176,7 +176,8 @@ class ApiTimelineGroupAction extends ApiPrivateAuthAction
$atom->addEntryFromNotices($this->notices);
$this->raw($atom->getString());
//$this->raw($atom->getString());
print $atom->getString(); // temp hack until PuSH feeds are redone cleanly
} catch (Atom10FeedException $e) {
$this->serverError(

View File

@ -187,7 +187,14 @@ class Notice extends Memcached_DataObject
* int 'location_ns' geoname namespace to interpret location_id
* int 'reply_to'; notice ID this is a reply to
* int 'repeat_of'; notice ID this is a repeat of
* string 'uri' permalink to notice; defaults to local notice URL
* string 'uri' unique ID for notice; defaults to local notice URL
* string 'url' permalink to notice; defaults to local notice URL
* string 'rendered' rendered HTML version of content
* array 'replies' list of profile URIs for reply delivery in
* place of extracting @-replies from content.
* array 'groups' list of group IDs to deliver to, in place of
* extracting ! tags from content
* @fixme tag override
*
* @return Notice
* @throws ClientException
@ -342,6 +349,12 @@ class Notice extends Memcached_DataObject
$notice->saveReplies();
}
if (isset($groups)) {
$notice->saveKnownGroups($groups);
} else {
$notice->saveGroups();
}
$notice->distribute();
return $notice;
@ -692,7 +705,22 @@ class Notice extends Memcached_DataObject
return $ni;
}
function addToInboxes($groups, $recipients)
/**
* Adds this notice to the inboxes of each local user who should receive
* it, based on author subscriptions, group memberships, and @-replies.
*
* Warning: running a second time currently will make items appear
* multiple times in users' inboxes.
*
* @fixme make more robust against errors
* @fixme break up massive deliveries to smaller background tasks
*
* @param array $groups optional list of Group objects;
* if left empty, will be loaded from group_inbox records
* @param array $recipient optional list of reply profile ids
* if left empty, will be loaded from reply records
*/
function addToInboxes($groups=null, $recipients=null)
{
$ni = $this->whoGets($groups, $recipients);
@ -742,6 +770,42 @@ class Notice extends Memcached_DataObject
}
/**
* Record this notice to the given group inboxes for delivery.
* Overrides the regular parsing of !group markup.
*
* @param string $group_ids
* @fixme might prefer URIs as identifiers, as for replies?
* best with generalizations on user_group to support
* remote groups better.
*/
function saveKnownGroups($group_ids)
{
if (!is_array($group_ids)) {
throw new ServerException("Bad type provided to saveKnownGroups");
}
$groups = array();
foreach ($group_ids as $id) {
$group = User_group::staticGet('id', $id);
if ($group) {
common_log(LOG_ERR, "Local delivery to group id $id, $group->nickname");
$result = $this->addToGroupInbox($group);
if (!$result) {
common_log_db_error($gi, 'INSERT', __FILE__);
}
// @fixme should we save the tags here or not?
$groups[] = clone($group);
} else {
common_log(LOG_ERR, "Local delivery to group id $id skipped, doesn't exist");
}
}
return $groups;
}
/**
* Parse !group delivery and record targets into group_inbox.
* @return array of Group objects
*/
function saveGroups()
@ -824,6 +888,19 @@ class Notice extends Memcached_DataObject
return true;
}
/**
* Save reply records indicating that this notice needs to be
* delivered to the local users with the given URIs.
*
* Since this is expected to be used when saving foreign-sourced
* messages, we won't deliver to any remote targets as that's the
* source service's responsibility.
*
* @fixme Unlike saveReplies() there's no mail notification here.
* Move that to distrib queue handler?
*
* @param array of unique identifier URIs for recipients
*/
function saveKnownReplies($uris)
{
foreach ($uris as $uri) {
@ -845,6 +922,13 @@ class Notice extends Memcached_DataObject
}
/**
* Pull @-replies from this message's content in StatusNet markup format
* and save reply records indicating that this message needs to be
* delivered to those users.
*
* Side effect: local recipients get e-mail notifications here.
* @fixme move mail notifications to distrib?
*
* @return array of integer profile IDs
*/
@ -934,9 +1018,10 @@ class Notice extends Memcached_DataObject
}
/**
* Same calculation as saveGroups but without the saving
* @fixme merge the functions
* @return array of Group_inbox objects
* Pull list of groups this notice needs to be delivered to,
* as previously recorded by saveGroups() or saveKnownGroups().
*
* @return array of Group objects
*/
function getGroups()
{
@ -959,7 +1044,10 @@ class Notice extends Memcached_DataObject
if ($gi->find()) {
while ($gi->fetch()) {
$groups[] = clone($gi);
$group = User_group::staticGet('id', $gi->group_id);
if ($group) {
$groups[] = $group;
}
}
}
@ -1063,6 +1151,17 @@ class Notice extends Memcached_DataObject
}
}
$groups = $this->getGroups();
foreach ($groups as $group) {
$xs->element(
'link', array(
'rel' => 'ostatus:attention',
'href' => $group->permalink()
)
);
}
if (!empty($this->repeat_of)) {
$repeat = Notice::staticGet('id', $this->repeat_of);
if (!empty($repeat)) {

View File

@ -39,15 +39,25 @@ class User_group extends Memcached_DataObject
function homeUrl()
{
return common_local_url('showgroup',
$url = null;
if (Event::handle('StartUserGroupHomeUrl', array($this, &$url))) {
$url = common_local_url('showgroup',
array('nickname' => $this->nickname));
}
Event::handle('EndUserGroupHomeUrl', array($this, &$url));
return $url;
}
function permalink()
{
return common_local_url('groupbyid',
$url = null;
if (Event::handle('StartUserGroupPermalink', array($this, &$url))) {
$url = common_local_url('groupbyid',
array('id' => $this->id));
}
Event::handle('EndUserGroupPermalink', array($this, &$url));
return $url;
}
function getNotices($offset, $limit, $since_id=null, $max_id=null)
{

View File

@ -69,19 +69,7 @@ class DistribQueueHandler
}
try {
$groups = $notice->saveGroups();
} catch (Exception $e) {
$this->logit($notice, $e);
}
try {
$recipients = $notice->getReplies();
} catch (Exception $e) {
$this->logit($notice, $e);
}
try {
$notice->addToInboxes($groups, $recipients);
$notice->addToInboxes();
} catch (Exception $e) {
$this->logit($notice, $e);
}

View File

@ -591,6 +591,22 @@ class OStatusPlugin extends Plugin
return true;
}
function onStartUserGroupHomeUrl($group, &$url)
{
return $this->onStartUserGroupPermalink($group, &$url);
}
function onStartUserGroupPermalink($group, &$url)
{
$oprofile = Ostatus_profile::staticGet('group_id', $group->id);
if ($oprofile) {
// @fixme this should probably be in the user_group table
// @fixme this uri not guaranteed to be a profile page
$url = $oprofile->uri;
return false;
}
}
function onStartShowSubscriptionsContent($action)
{
$user = common_current_user();

View File

@ -501,6 +501,7 @@ class Ostatus_profile extends Memcached_DataObject
/**
* Process an incoming post activity from this remote feed.
* @param Activity $activity
* @fixme break up this function, it's getting nasty long
*/
protected function processPost($activity)
{
@ -518,7 +519,6 @@ class Ostatus_profile extends Memcached_DataObject
}
$oprofile = $this;
}
$sourceUri = $activity->object->id;
$dupe = Notice::staticGet('uri', $sourceUri);
@ -555,15 +555,76 @@ class Ostatus_profile extends Memcached_DataObject
}
}
// @fixme ensure that groups get handled correctly
$profile = $oprofile->localProfile();
$params['groups'] = array();
$params['replies'] = array();
if ($activity->context) {
foreach ($activity->context->attention as $recipient) {
$roprofile = Ostatus_profile::staticGet('uri', $recipient);
if ($roprofile) {
if ($roprofile->isGroup()) {
// Deliver to local recipients of this remote group.
// @fixme sender verification?
$params['groups'][] = $roprofile->group_id;
continue;
} else {
// Delivery to remote users is the source service's job.
continue;
}
}
$saved = Notice::saveNew($oprofile->localProfile()->id,
$user = User::staticGet('uri', $recipient);
if ($user) {
// An @-reply directed to a local user.
// @fixme sender verification, spam etc?
$params['replies'][] = $recipient;
continue;
}
// @fixme we need a uri on user_group
// $group = User_group::staticGet('uri', $recipient);
$template = common_local_url('groupbyid', array('id' => '31337'));
$template = preg_quote($template, '/');
$template = str_replace('31337', '(\d+)', $template);
common_log(LOG_DEBUG, $template);
if (preg_match("/$template/", $recipient, $matches)) {
$id = $matches[1];
$group = User_group::staticGet('id', $id);
if ($group) {
// Deliver to all members of this local group.
// @fixme sender verification?
if ($profile->isMember($group)) {
common_log(LOG_DEBUG, "delivering to group $id $group->nickname");
$params['groups'][] = $group->id;
} else {
common_log(LOG_DEBUG, "not delivering to group $id $group->nickname because sender $profile->nickname is not a member");
}
continue;
} else {
common_log(LOG_DEBUG, "not delivering to missing group $id");
}
} else {
common_log(LOG_DEBUG, "not delivering to groups for $recipient");
}
}
}
try {
$saved = Notice::saveNew($profile->id,
$content,
'ostatus',
$params);
} catch (Exception $e) {
common_log(LOG_ERR, "Failed saving notice entry for $sourceUri: " . $e->getMessage());
return;
}
// Record which feed this came through...
try {
Ostatus_source::saveNew($saved, $this, 'push');
} catch (Exception $e) {
common_log(LOG_ERR, "Failed saving ostatus_source entry for $saved->notice_id: " . $e->getMessage());
}
}
/**

View File

@ -36,7 +36,7 @@ class HubDistribQueueHandler extends QueueHandler
$this->pushUser($notice);
foreach ($notice->getGroups() as $group) {
$this->pushGroup($notice, $group->group_id);
$this->pushGroup($notice, $group->id);
}
return true;
}