Merge remote branch 'statusnet/testing' into testing

This commit is contained in:
James Walker 2010-02-22 01:40:03 -05:00
commit 7b2ea7aa22
16 changed files with 444 additions and 421 deletions

View File

@ -66,10 +66,12 @@ class SupAction extends Action
$divider = common_sql_date(time() - $seconds); $divider = common_sql_date(time() - $seconds);
$notice->query('SELECT profile_id, max(id) AS max_id ' . $notice->query('SELECT profile_id, max(id) AS max_id ' .
'FROM notice ' . 'FROM ( ' .
'SELECT profile_id, id FROM notice ' .
((common_config('db','type') == 'pgsql') ? ((common_config('db','type') == 'pgsql') ?
'WHERE extract(epoch from created) > (extract(epoch from now()) - ' . $seconds . ') ' : 'WHERE extract(epoch from created) > (extract(epoch from now()) - ' . $seconds . ') ' :
'WHERE created > "'.$divider.'" ' ) . 'WHERE created > "'.$divider.'" ' ) .
') AS latest ' .
'GROUP BY profile_id'); 'GROUP BY profile_id');
$updates = array(); $updates = array();

View File

@ -333,8 +333,15 @@ class Notice extends Memcached_DataObject
# Clear the cache for subscribed users, so they'll update at next request # Clear the cache for subscribed users, so they'll update at next request
# XXX: someone clever could prepend instead of clearing the cache # XXX: someone clever could prepend instead of clearing the cache
$notice->blowOnInsert(); $notice->blowOnInsert();
if (isset($replies)) {
$notice->saveKnownReplies($replies);
} else {
$notice->saveReplies();
}
$notice->distribute(); $notice->distribute();
return $notice; return $notice;
@ -817,6 +824,26 @@ class Notice extends Memcached_DataObject
return true; return true;
} }
function saveKnownReplies($uris)
{
foreach ($uris as $uri) {
$user = User::staticGet('uri', $uri);
if (!empty($user)) {
$reply = new Reply();
$reply->notice_id = $this->id;
$reply->profile_id = $user->id;
$id = $reply->insert();
}
}
return;
}
/** /**
* @return array of integer profile IDs * @return array of integer profile IDs
*/ */

View File

@ -841,28 +841,22 @@ class Profile extends Memcached_DataObject
{ {
$uri = null; $uri = null;
// check for a local user first // give plugins a chance to set the URI
$user = User::staticGet('id', $this->id); if (Event::handle('StartGetProfileUri', array($this, &$uri))) {
if (!empty($user)) { // check for a local user first
$uri = common_local_url( $user = User::staticGet('id', $this->id);
'userbyid',
array('id' => $user->id)
);
} else {
// give plugins a chance to set the URI
if (Event::handle('StartGetProfileUri', array($this, &$uri))) {
if (!empty($user)) {
$uri = $user->uri;
} else {
// return OMB profile if any // return OMB profile if any
$remote = Remote_profile::staticGet('id', $this->id); $remote = Remote_profile::staticGet('id', $this->id);
if (!empty($remote)) { if (!empty($remote)) {
$uri = $remote->uri; $uri = $remote->uri;
} }
Event::handle('EndGetProfileUri', array($this, &$uri));
} }
Event::handle('EndGetProfileUri', array($this, &$uri));
} }
return $uri; return $uri;

View File

@ -75,7 +75,7 @@ class DistribQueueHandler
} }
try { try {
$recipients = $notice->saveReplies(); $recipients = $notice->getReplies();
} catch (Exception $e) { } catch (Exception $e) {
$this->logit($notice, $e); $this->logit($notice, $e);
} }
@ -107,7 +107,7 @@ class DistribQueueHandler
return true; return true;
} }
protected function logit($notice, $e) protected function logit($notice, $e)
{ {
common_log(LOG_ERR, "Distrib queue exception saving notice $notice->id: " . common_log(LOG_ERR, "Distrib queue exception saving notice $notice->id: " .

View File

@ -29,11 +29,9 @@ require_once 'Auth/Yadis/Yadis.php';
function omb_oauth_consumer() function omb_oauth_consumer()
{ {
static $con = null; // Don't try to make this static. Leads to issues in
if (is_null($con)) { // multi-site setups - Z
$con = new OAuthConsumer(common_root_url(), ''); return new OAuthConsumer(common_root_url(), '');
}
return $con;
} }
function omb_oauth_server() function omb_oauth_server()

View File

@ -60,8 +60,6 @@ class OStatusPlugin extends Plugin
$m->connect('main/push/callback/:feed', $m->connect('main/push/callback/:feed',
array('action' => 'pushcallback'), array('action' => 'pushcallback'),
array('feed' => '[0-9]+')); array('feed' => '[0-9]+'));
$m->connect('settings/feedsub',
array('action' => 'feedsubsettings'));
// Salmon endpoint // Salmon endpoint
$m->connect('main/salmon/user/:id', $m->connect('main/salmon/user/:id',
@ -203,16 +201,18 @@ class OStatusPlugin extends Plugin
{ {
$mentioned = $notice->getReplies(); $mentioned = $notice->getReplies();
foreach ($mentioned as $profile) { foreach ($mentioned as $profile_id) {
$oprofile = Ostatus_profile::staticGet('profile_id', $profile->id); $oprofile = Ostatus_profile::staticGet('profile_id', $profile_id);
if (!empty($oprofile) && !empty($oprofile->salmonuri)) { if (!empty($oprofile) && !empty($oprofile->salmonuri)) {
common_log(LOG_INFO, "Sending notice '{$notice->uri}' to remote profile '{$oprofile->uri}'.");
// FIXME: this needs to go out in a queue handler // FIXME: this needs to go out in a queue handler
$xml = '<?xml version="1.0" encoding="UTF-8" ?>'; $xml = '<?xml version="1.0" encoding="UTF-8" ?>';
$xml .= $notice->asAtomEntry(); $xml .= $notice->asAtomEntry(true, true);
$salmon = new Salmon(); $salmon = new Salmon();
$salmon->post($oprofile->salmonuri, $xml); $salmon->post($oprofile->salmonuri, $xml);
@ -309,6 +309,7 @@ class OStatusPlugin extends Plugin
function onCheckSchema() { function onCheckSchema() {
$schema = Schema::get(); $schema = Schema::get();
$schema->ensureTable('ostatus_profile', Ostatus_profile::schemaDef()); $schema->ensureTable('ostatus_profile', Ostatus_profile::schemaDef());
$schema->ensureTable('ostatus_source', Ostatus_source::schemaDef());
$schema->ensureTable('feedsub', FeedSub::schemaDef()); $schema->ensureTable('feedsub', FeedSub::schemaDef());
$schema->ensureTable('hubsub', HubSub::schemaDef()); $schema->ensureTable('hubsub', HubSub::schemaDef());
return true; return true;
@ -488,4 +489,14 @@ class OStatusPlugin extends Plugin
return true; return true;
} }
function onStartGetProfileUri($profile, &$uri)
{
$oprofile = Ostatus_profile::staticGet('profile_id', $profile->id);
if (!empty($oprofile)) {
$uri = $oprofile->uri;
return false;
}
return true;
}
} }

View File

@ -1,230 +0,0 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2009, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @package FeedSubPlugin
* @maintainer Brion Vibber <brion@status.net>
*/
if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
class FeedSubSettingsAction extends ConnectSettingsAction
{
protected $profile_uri;
protected $preview;
protected $munger;
/**
* Title of the page
*
* @return string Title of the page
*/
function title()
{
return _m('Feed subscriptions');
}
/**
* Instructions for use
*
* @return instructions for use
*/
function getInstructions()
{
return _m('You can subscribe to feeds from other sites; ' .
'updates will appear in your personal timeline.');
}
/**
* Content area of the page
*
* Shows a form for associating a Twitter account with this
* StatusNet account. Also lets the user set preferences.
*
* @return void
*/
function showContent()
{
$user = common_current_user();
$profile = $user->getProfile();
$this->elementStart('form', array('method' => 'post',
'id' => 'form_settings_feedsub',
'class' => 'form_settings',
'action' =>
common_local_url('feedsubsettings')));
$this->hidden('token', common_session_token());
$this->elementStart('fieldset', array('id' => 'settings_feeds'));
$this->elementStart('ul', 'form_data');
$this->elementStart('li', array('id' => 'settings_twitter_login_button'));
$this->input('profile_uri',
_m('Feed URL'),
$this->profile_uri,
_m('Enter the profile URL of a PubSubHubbub-enabled feed'));
$this->elementEnd('li');
$this->elementEnd('ul');
if ($this->preview) {
$this->submit('subscribe', _m('Subscribe'));
} else {
$this->submit('validate', _m('Continue'));
}
$this->elementEnd('fieldset');
$this->elementEnd('form');
if ($this->preview) {
$this->previewFeed();
}
}
/**
* Handle posts to this form
*
* Based on the button that was pressed, muxes out to other functions
* to do the actual task requested.
*
* All sub-functions reload the form with a message -- success or failure.
*
* @return void
*/
function handlePost()
{
// CSRF protection
$token = $this->trimmed('token');
if (!$token || $token != common_session_token()) {
$this->showForm(_('There was a problem with your session token. '.
'Try again, please.'));
return;
}
if ($this->arg('validate')) {
$this->validateAndPreview();
} else if ($this->arg('subscribe')) {
$this->saveFeed();
} else {
$this->showForm(_('Unexpected form submission.'));
}
}
/**
* Set up and add a feed
*
* @return boolean true if feed successfully read
* Sends you back to input form if not.
*/
function validateFeed()
{
$profile_uri = trim($this->arg('profile_uri'));
if ($profile_uri == '') {
$this->showForm(_m('Empty remote profile URL!'));
return;
}
$this->profile_uri = $profile_uri;
// @fixme validate, normalize bla bla
try {
$oprofile = Ostatus_profile::ensureProfile($this->profile_uri);
$this->oprofile = $oprofile;
return true;
} catch (FeedSubBadURLException $e) {
$err = _m('Invalid URL or could not reach server.');
} catch (FeedSubBadResponseException $e) {
$err = _m('Cannot read feed; server returned error.');
} catch (FeedSubEmptyException $e) {
$err = _m('Cannot read feed; server returned an empty page.');
} catch (FeedSubBadHTMLException $e) {
$err = _m('Bad HTML, could not find feed link.');
} catch (FeedSubNoFeedException $e) {
$err = _m('Could not find a feed linked from this URL.');
} catch (FeedSubUnrecognizedTypeException $e) {
$err = _m('Not a recognized feed type.');
} catch (FeedSubException $e) {
// Any new ones we forgot about
$err = sprintf(_m('Bad feed URL: %s %s'), get_class($e), $e->getMessage());
}
$this->showForm($err);
return false;
}
function saveFeed()
{
if ($this->validateFeed()) {
$this->preview = true;
// And subscribe the current user to the local profile
$user = common_current_user();
if (!$this->oprofile->subscribe()) {
$this->showForm(_m("Failed to set up server-to-server subscription."));
return;
}
if ($this->oprofile->isGroup()) {
$group = $this->oprofile->localGroup();
if ($user->isMember($group)) {
$this->showForm(_m('Already a member!'));
} elseif (Group_member::join($this->profile->group_id, $user->id)) {
$this->showForm(_m('Joined remote group!'));
} else {
$this->showForm(_m('Remote group join failed!'));
}
} else {
$local = $this->oprofile->localProfile();
if ($user->isSubscribed($local)) {
$this->showForm(_m('Already subscribed!'));
} elseif ($this->oprofile->subscribeLocalToRemote($user)) {
$this->showForm(_m('Remote user subscribed!'));
} else {
$this->showForm(_m('Remote subscription failed!'));
}
}
}
}
function validateAndPreview()
{
if ($this->validateFeed()) {
$this->preview = true;
$this->showForm(_m('Previewing feed:'));
}
}
function previewFeed()
{
$this->text('Profile preview should go here');
}
function showScripts()
{
parent::showScripts();
$this->autofocus('feedurl');
}
}

View File

@ -119,7 +119,7 @@ class OStatusInitAction extends Action
} else { } else {
$this->connectProfile($this->acct); $this->connectProfile($this->acct);
} }
} elseif (strpos('@', $this->acct) !== false) { } elseif (strpos($this->acct, '@') !== false) {
$this->connectWebfinger($this->acct); $this->connectWebfinger($this->acct);
} }
} }
@ -139,7 +139,7 @@ class OStatusInitAction extends Action
$user = User::staticGet('nickname', $this->nickname); $user = User::staticGet('nickname', $this->nickname);
$target_profile = common_local_url('userbyid', array('id' => $user->id)); $target_profile = common_local_url('userbyid', array('id' => $user->id));
$url = $w->applyTemplate($link['template'], $feed_url); $url = $w->applyTemplate($link['template'], $target_profile);
common_redirect($url, 303); common_redirect($url, 303);
} }

View File

@ -72,7 +72,7 @@ class PushCallbackAction extends Action
} }
/** /**
* Handler for GET verification requests from the hub * Handler for GET verification requests from the hub.
*/ */
function handleGet() function handleGet()
{ {
@ -81,31 +81,37 @@ class PushCallbackAction extends Action
$challenge = $this->arg('hub_challenge'); $challenge = $this->arg('hub_challenge');
$lease_seconds = $this->arg('hub_lease_seconds'); $lease_seconds = $this->arg('hub_lease_seconds');
$verify_token = $this->arg('hub_verify_token'); $verify_token = $this->arg('hub_verify_token');
if ($mode != 'subscribe' && $mode != 'unsubscribe') { if ($mode != 'subscribe' && $mode != 'unsubscribe') {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with mode \"$mode\""); throw new ClientException("Bad hub.mode $mode", 404);
throw new ServerException("Bogus hub callback: bad mode", 404);
} }
$feedsub = FeedSub::staticGet('uri', $topic); $feedsub = FeedSub::staticGet('uri', $topic);
if (!$feedsub) { if (!$feedsub) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback for unknown feed $topic"); throw new ClientException("Bad hub.topic feed $topic", 404);
throw new ServerException("Bogus hub callback: unknown feed", 404);
} }
if ($feedsub->verify_token !== $verify_token) { if ($feedsub->verify_token !== $verify_token) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); throw new ClientException("Bad hub.verify_token $token for $topic", 404);
throw new ServerException("Bogus hub callback: bad token", 404);
} }
if ($mode != $feedsub->sub_state) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$feedsub->sub_state}\"");
throw new ServerException("Bogus hub callback: mode doesn't match subscription state.", 404);
}
// OK!
if ($mode == 'subscribe') { if ($mode == 'subscribe') {
common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); // We may get re-sub requests legitimately.
if ($feedsub->sub_state != 'subscribe' && $feedsub->sub_state != 'active') {
throw new ClientException("Unexpected subscribe request for $topic.", 404);
}
} else {
if ($feedsub->sub_state != 'unsubscribe') {
throw new ClientException("Unexpected unsubscribe request for $topic.", 404);
}
}
if ($mode == 'subscribe') {
if ($feedsub->sub_state == 'active') {
common_log(LOG_INFO, __METHOD__ . ': sub update confirmed');
} else {
common_log(LOG_INFO, __METHOD__ . ': sub confirmed');
}
$feedsub->confirmSubscribe($lease_seconds); $feedsub->confirmSubscribe($lease_seconds);
} else { } else {
common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic"); common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic");

View File

@ -59,102 +59,121 @@ class PushHubAction extends Action
$mode = $this->trimmed('hub.mode'); $mode = $this->trimmed('hub.mode');
switch ($mode) { switch ($mode) {
case "subscribe": case "subscribe":
$this->subscribe();
break;
case "unsubscribe": case "unsubscribe":
$this->unsubscribe(); $this->subunsub($mode);
break; break;
case "publish": case "publish":
throw new ServerException("Publishing outside feeds not supported.", 400); throw new ClientException("Publishing outside feeds not supported.", 400);
default: default:
throw new ServerException("Unrecognized mode '$mode'.", 400); throw new ClientException("Unrecognized mode '$mode'.", 400);
} }
} }
/** /**
* Process a PuSH feed subscription request. * Process a request for a new or modified PuSH feed subscription.
* If asynchronous verification is requested, updates won't be saved immediately.
* *
* HTTP return codes: * HTTP return codes:
* 202 Accepted - request saved and awaiting verification * 202 Accepted - request saved and awaiting verification
* 204 No Content - already subscribed * 204 No Content - already subscribed
* 403 Forbidden - rejecting this (not specifically spec'd) * 400 Bad Request - rejecting this (not specifically spec'd)
*/ */
function subscribe() function subunsub($mode)
{ {
$feed = $this->argUrl('hub.topic');
$callback = $this->argUrl('hub.callback'); $callback = $this->argUrl('hub.callback');
$topic = $this->argUrl('hub.topic');
if (!$this->recognizedFeed($topic)) {
throw new ClientException("Unsupported hub.topic $topic; this hub only serves local user and group Atom feeds.");
}
$verify = $this->arg('hub.verify'); // @fixme may be multiple
if ($verify != 'sync' && $verify != 'async') {
throw new ClientException("Invalid hub.verify $verify; must be sync or async.");
}
$lease = $this->arg('hub.lease_seconds', null);
if ($mode == 'subscribe' && $lease != '' && !preg_match('/^\d+$/', $lease)) {
throw new ClientException("Invalid hub.lease $lease; must be empty or positive integer.");
}
$token = $this->arg('hub.verify_token', null); $token = $this->arg('hub.verify_token', null);
common_log(LOG_DEBUG, __METHOD__ . ": checking sub'd to $feed $callback"); $secret = $this->arg('hub.secret', null);
if ($this->getSub($feed, $callback)) { if ($secret != '' && strlen($secret) >= 200) {
// Already subscribed; return 204 per spec. throw new ClientException("Invalid hub.secret $secret; must be under 200 bytes.");
}
$sub = HubSub::staticGet($sub->topic, $sub->callback);
if (!$sub) {
// Creating a new one!
$sub = new HubSub();
$sub->topic = $topic;
$sub->callback = $callback;
}
if ($mode == 'subscribe') {
if ($secret) {
$sub->secret = $secret;
}
if ($lease) {
$sub->setLease(intval($lease));
}
}
if (!common_config('queue', 'enabled')) {
// Won't be able to background it.
$verify = 'sync';
}
if ($verify == 'async') {
$sub->scheduleVerify($mode, $token);
header('HTTP/1.1 202 Accepted');
} else {
$sub->verify($mode, $token);
header('HTTP/1.1 204 No Content'); header('HTTP/1.1 204 No Content');
common_log(LOG_DEBUG, __METHOD__ . ': already subscribed');
return;
} }
common_log(LOG_DEBUG, __METHOD__ . ': setting up');
$sub = new HubSub();
$sub->topic = $feed;
$sub->callback = $callback;
$sub->secret = $this->arg('hub.secret', null);
if (strlen($sub->secret) > 200) {
throw new ClientException("hub.secret must be no longer than 200 chars", 400);
}
$sub->setLease(intval($this->arg('hub.lease_seconds')));
// @fixme check for feeds we don't manage
// @fixme check the verification mode, might want a return immediately?
common_log(LOG_DEBUG, __METHOD__ . ': inserting');
$ok = $sub->insert();
if (!$ok) {
throw new ServerException("Failed to save subscription record", 500);
}
// @fixme check errors ;)
$data = array('sub' => $sub, 'mode' => 'subscribe', 'token' => $token);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubverify');
header('HTTP/1.1 202 Accepted');
common_log(LOG_DEBUG, __METHOD__ . ': done');
} }
/** /**
* Process a PuSH feed unsubscription request. * Check whether the given URL represents one of our canonical
* user or group Atom feeds.
* *
* HTTP return codes: * @param string $feed URL
* 202 Accepted - request saved and awaiting verification * @return boolean true if it matches
* 204 No Content - already subscribed
* 400 Bad Request - invalid params or rejected feed
*
* @fixme background this
*/ */
function unsubscribe() function recognizedFeed($feed)
{ {
$feed = $this->argUrl('hub.topic'); $matches = array();
$callback = $this->argUrl('hub.callback'); if (preg_match('!/(\d+)\.atom$!', $feed, $matches)) {
$sub = $this->getSub($feed, $callback); $id = $matches[1];
$params = array('id' => $id, 'format' => 'atom');
if ($sub) { $userFeed = common_local_url('ApiTimelineUser', $params);
$token = $this->arg('hub.verify_token', null); $groupFeed = common_local_url('ApiTimelineGroup', $params);
if ($sub->verify('unsubscribe', $token)) {
$sub->delete(); if ($feed == $userFeed) {
common_log(LOG_INFO, "PuSH unsubscribed $feed for $callback"); $user = User::staticGet('id', $id);
} else { if (!$user) {
throw new ServerException("Failed PuSH unsubscription: verification failed! $feed for $callback"); throw new ClientException("Invalid hub.topic $feed; user doesn't exist.");
} else {
return true;
}
} }
} else { if ($feed == $groupFeed) {
throw new ServerException("Failed PuSH unsubscription: not subscribed! $feed for $callback"); $user = User_group::staticGet('id', $id);
if (!$user) {
throw new ClientException("Invalid hub.topic $feed; group doesn't exist.");
} else {
return true;
}
}
common_log(LOG_DEBUG, "Not a user or group feed? $feed $userFeed $groupFeed");
} }
common_log(LOG_DEBUG, "LOST $feed");
return false;
} }
/** /**
* Grab and validate a URL from POST parameters. * Grab and validate a URL from POST parameters.
* @throws ServerException for malformed or non-http/https URLs * @throws ClientException for malformed or non-http/https URLs
*/ */
protected function argUrl($arg) protected function argUrl($arg)
{ {
@ -164,7 +183,7 @@ class PushHubAction extends Action
if (Validate::uri($url, $params)) { if (Validate::uri($url, $params)) {
return $url; return $url;
} else { } else {
throw new ServerException("Invalid URL passed for $arg: '$url'", 400); throw new ClientException("Invalid URL passed for $arg: '$url'");
} }
} }

View File

@ -55,6 +55,8 @@ class UsersalmonAction extends SalmonAction
*/ */
function handlePost() function handlePost()
{ {
common_log(LOG_INFO, "Received post of '{$this->act->object->id}' from '{$this->act->actor->id}'");
switch ($this->act->object->type) { switch ($this->act->object->type) {
case ActivityObject::ARTICLE: case ActivityObject::ARTICLE:
case ActivityObject::BLOGENTRY: case ActivityObject::BLOGENTRY:
@ -80,13 +82,21 @@ class UsersalmonAction extends SalmonAction
throw new ClientException("In reply to a notice not by this user"); throw new ClientException("In reply to a notice not by this user");
} }
} else if (!empty($context->attention)) { } else if (!empty($context->attention)) {
if (!in_array($context->attention, $this->user->uri)) { if (!in_array($this->user->uri, $context->attention)) {
common_log(LOG_ERR, "{$this->user->uri} not in attention list (".implode(',', $context->attention).")");
throw new ClientException("To the attention of user(s) not including this one!"); throw new ClientException("To the attention of user(s) not including this one!");
} }
} else { } else {
throw new ClientException("Not to anyone in reply to anything!"); throw new ClientException("Not to anyone in reply to anything!");
} }
$existing = Notice::staticGet('uri', $this->act->object->id);
if (!empty($existing)) {
common_log(LOG_ERR, "Not saving notice '{$existing->uri}'; already exists.");
return;
}
$this->saveNotice(); $this->saveNotice();
} }

View File

@ -291,10 +291,9 @@ class FeedSub extends Memcached_DataObject
$headers = array('Content-Type: application/x-www-form-urlencoded'); $headers = array('Content-Type: application/x-www-form-urlencoded');
$post = array('hub.mode' => $mode, $post = array('hub.mode' => $mode,
'hub.callback' => $callback, 'hub.callback' => $callback,
'hub.verify' => 'async', 'hub.verify' => 'sync',
'hub.verify_token' => $this->verify_token, 'hub.verify_token' => $this->verify_token,
'hub.secret' => $this->secret, 'hub.secret' => $this->secret,
//'hub.lease_seconds' => 0,
'hub.topic' => $this->uri); 'hub.topic' => $this->uri);
$client = new HTTPClient(); $client = new HTTPClient();
$response = $client->post($this->huburi, $headers, $post); $response = $client->post($this->huburi, $headers, $post);
@ -317,8 +316,8 @@ class FeedSub extends Memcached_DataObject
common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri");
$orig = clone($this); $orig = clone($this);
$this->verify_token = null; $this->verify_token = '';
$this->sub_state = null; $this->sub_state = 'inactive';
$this->update($orig); $this->update($orig);
unset($orig); unset($orig);
@ -343,7 +342,7 @@ class FeedSub extends Memcached_DataObject
} else { } else {
$this->sub_end = null; $this->sub_end = null;
} }
$this->lastupdate = common_sql_now(); $this->modified = common_sql_now();
return $this->update($original); return $this->update($original);
} }
@ -362,7 +361,7 @@ class FeedSub extends Memcached_DataObject
$this->sub_state = ''; $this->sub_state = '';
$this->sub_start = ''; $this->sub_start = '';
$this->sub_end = ''; $this->sub_end = '';
$this->lastupdate = common_sql_now(); $this->modified = common_sql_now();
return $this->update($original); return $this->update($original);
} }

View File

@ -30,11 +30,11 @@ class HubSub extends Memcached_DataObject
public $topic; public $topic;
public $callback; public $callback;
public $secret; public $secret;
public $challenge;
public $lease; public $lease;
public $sub_start; public $sub_start;
public $sub_end; public $sub_end;
public $created; public $created;
public $modified;
public /*static*/ function staticGet($topic, $callback) public /*static*/ function staticGet($topic, $callback)
{ {
@ -61,11 +61,11 @@ class HubSub extends Memcached_DataObject
'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'secret' => DB_DATAOBJECT_STR, 'secret' => DB_DATAOBJECT_STR,
'challenge' => DB_DATAOBJECT_STR,
'lease' => DB_DATAOBJECT_INT, 'lease' => DB_DATAOBJECT_INT,
'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
} }
static function schemaDef() static function schemaDef()
@ -82,8 +82,6 @@ class HubSub extends Memcached_DataObject
255, false), 255, false),
new ColumnDef('secret', 'text', new ColumnDef('secret', 'text',
null, true), null, true),
new ColumnDef('challenge', 'varchar',
32, true),
new ColumnDef('lease', 'int', new ColumnDef('lease', 'int',
null, true), null, true),
new ColumnDef('sub_start', 'datetime', new ColumnDef('sub_start', 'datetime',
@ -91,6 +89,8 @@ class HubSub extends Memcached_DataObject
new ColumnDef('sub_end', 'datetime', new ColumnDef('sub_end', 'datetime',
null, true), null, true),
new ColumnDef('created', 'datetime', new ColumnDef('created', 'datetime',
null, false),
new ColumnDef('modified', 'datetime',
null, false)); null, false));
} }
@ -148,84 +148,105 @@ class HubSub extends Memcached_DataObject
} }
/** /**
* Send a verification ping to subscriber * Schedule a future verification ping to the subscriber.
* If queues are disabled, will be immediate.
*
* @param string $mode 'subscribe' or 'unsubscribe' * @param string $mode 'subscribe' or 'unsubscribe'
* @param string $token hub.verify_token value, if provided by client * @param string $token hub.verify_token value, if provided by client
*/ */
function scheduleVerify($mode, $token=null, $retries=null)
{
if ($retries === null) {
$retries = intval(common_config('ostatus', 'hub_retries'));
}
$data = array('sub' => clone($this),
'mode' => $mode,
'token' => $token,
'retries' => $retries);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubverify');
}
/**
* Send a verification ping to subscriber, and if confirmed apply the changes.
* This may create, update, or delete the database record.
*
* @param string $mode 'subscribe' or 'unsubscribe'
* @param string $token hub.verify_token value, if provided by client
* @throws ClientException on failure
*/
function verify($mode, $token=null) function verify($mode, $token=null)
{ {
assert($mode == 'subscribe' || $mode == 'unsubscribe'); assert($mode == 'subscribe' || $mode == 'unsubscribe');
// Is this needed? data object fun... $challenge = common_good_rand(32);
$clone = clone($this);
$clone->challenge = common_good_rand(16);
$clone->update($this);
$this->challenge = $clone->challenge;
unset($clone);
$params = array('hub.mode' => $mode, $params = array('hub.mode' => $mode,
'hub.topic' => $this->topic, 'hub.topic' => $this->topic,
'hub.challenge' => $this->challenge); 'hub.challenge' => $challenge);
if ($mode == 'subscribe') { if ($mode == 'subscribe') {
$params['hub.lease_seconds'] = $this->lease; $params['hub.lease_seconds'] = $this->lease;
} }
if ($token !== null) { if ($token !== null) {
$params['hub.verify_token'] = $token; $params['hub.verify_token'] = $token;
} }
$url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls
try { // Any existing query string parameters must be preserved
$request = new HTTPClient(); $url = $this->callback;
$response = $request->get($url); if (strpos('?', $url) !== false) {
$status = $response->getStatus(); $url .= '&';
if ($status >= 200 && $status < 300) {
$fail = false;
} else {
// @fixme how can we schedule a second attempt?
// Or should we?
$fail = "Returned HTTP $status";
}
} catch (Exception $e) {
$fail = $e->getMessage();
}
if ($fail) {
// @fixme how can we schedule a second attempt?
// or save a fail count?
// Or should we?
common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail");
return false;
} else { } else {
if ($mode == 'subscribe') { $url .= '?';
// Establish or renew the subscription! }
// This seems unnecessary... dataobject fun! $url .= http_build_query($params, '', '&');
$clone = clone($this);
$clone->challenge = null;
$clone->setLease($this->lease);
$clone->update($this);
unset($clone);
$this->challenge = null; $request = new HTTPClient();
$this->setLease($this->lease); $response = $request->get($url);
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); $status = $response->getStatus();
} else if ($mode == 'unsubscribe') {
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); if ($status >= 200 && $status < 300) {
$this->delete(); common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic");
} else {
throw new ClientException("Hub subscriber verification returned HTTP $status");
}
$old = HubSub::staticGet($this->topic, $this->callback);
if ($mode == 'subscribe') {
if ($old) {
$this->update($old);
} else {
$ok = $this->insert();
}
} else if ($mode == 'unsubscribe') {
if ($old) {
$old->delete();
} else {
// That's ok, we're already unsubscribed.
} }
return true;
} }
} }
/** /**
* Insert wrapper; transparently set the hash key from topic and callback columns. * Insert wrapper; transparently set the hash key from topic and callback columns.
* @return boolean success * @return mixed success
*/ */
function insert() function insert()
{ {
$this->hashkey = self::hashkey($this->topic, $this->callback); $this->hashkey = self::hashkey($this->topic, $this->callback);
$this->created = common_sql_now();
$this->modified = common_sql_now();
return parent::insert(); return parent::insert();
} }
/**
* Update wrapper; transparently update modified column.
* @return boolean success
*/
function update($old=null)
{
$this->modified = common_sql_now();
return parent::update($old);
}
/** /**
* Schedule delivery of a 'fat ping' to the subscriber's callback * Schedule delivery of a 'fat ping' to the subscriber's callback
* endpoint. If queues are disabled, this will run immediately. * endpoint. If queues are disabled, this will run immediately.

View File

@ -508,13 +508,15 @@ class Ostatus_profile extends Memcached_DataObject
} }
} }
// @fixme save detailed ostatus source info
// @fixme ensure that groups get handled correctly // @fixme ensure that groups get handled correctly
$saved = Notice::saveNew($oprofile->localProfile()->id, $saved = Notice::saveNew($oprofile->localProfile()->id,
$content, $content,
'ostatus', 'ostatus',
$params); $params);
// Record which feed this came through...
Ostatus_source::saveNew($saved, $this, 'push');
} }
/** /**
@ -522,7 +524,7 @@ class Ostatus_profile extends Memcached_DataObject
* @return Ostatus_profile * @return Ostatus_profile
* @throws FeedSubException * @throws FeedSubException
*/ */
public static function ensureProfile($profile_uri) public static function ensureProfile($profile_uri, $hints=array())
{ {
// Get the canonical feed URI and check it // Get the canonical feed URI and check it
$discover = new FeedDiscovery(); $discover = new FeedDiscovery();
@ -545,7 +547,7 @@ class Ostatus_profile extends Memcached_DataObject
if (!empty($subject)) { if (!empty($subject)) {
$subjObject = new ActivityObject($subject); $subjObject = new ActivityObject($subject);
return self::ensureActivityObjectProfile($subjObject, $feeduri, $salmonuri); return self::ensureActivityObjectProfile($subjObject, $feeduri, $salmonuri, $hints);
} }
// Otherwise, try the feed author // Otherwise, try the feed author
@ -554,7 +556,7 @@ class Ostatus_profile extends Memcached_DataObject
if (!empty($author)) { if (!empty($author)) {
$authorObject = new ActivityObject($author); $authorObject = new ActivityObject($author);
return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri); return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri, $hints);
} }
// Sheesh. Not a very nice feed! Let's try fingerpoken in the // Sheesh. Not a very nice feed! Let's try fingerpoken in the
@ -570,7 +572,7 @@ class Ostatus_profile extends Memcached_DataObject
if (!empty($actor)) { if (!empty($actor)) {
$actorObject = new ActivityObject($actor); $actorObject = new ActivityObject($actor);
return self::ensureActivityObjectProfile($actorObject, $feeduri, $salmonuri); return self::ensureActivityObjectProfile($actorObject, $feeduri, $salmonuri, $hints);
} }
@ -578,7 +580,7 @@ class Ostatus_profile extends Memcached_DataObject
if (!empty($author)) { if (!empty($author)) {
$authorObject = new ActivityObject($author); $authorObject = new ActivityObject($author);
return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri); return self::ensureActivityObjectProfile($authorObject, $feeduri, $salmonuri, $hints);
} }
} }
@ -688,11 +690,11 @@ class Ostatus_profile extends Memcached_DataObject
return self::ensureActivityObjectProfile($activity->actor, $feeduri, $salmonuri); return self::ensureActivityObjectProfile($activity->actor, $feeduri, $salmonuri);
} }
public static function ensureActivityObjectProfile($object, $feeduri=null, $salmonuri=null) public static function ensureActivityObjectProfile($object, $feeduri=null, $salmonuri=null, $hints=array())
{ {
$profile = self::getActivityObjectProfile($object); $profile = self::getActivityObjectProfile($object);
if (!$profile) { if (!$profile) {
$profile = self::createActivityObjectProfile($object, $feeduri, $salmonuri); $profile = self::createActivityObjectProfile($object, $feeduri, $salmonuri, $hints);
} }
return $profile; return $profile;
} }
@ -745,10 +747,10 @@ class Ostatus_profile extends Memcached_DataObject
self::createActivityObjectProfile($actor, $feeduri, $salmonuri); self::createActivityObjectProfile($actor, $feeduri, $salmonuri);
} }
protected static function createActivityObjectProfile($object, $feeduri=null, $salmonuri=null) protected static function createActivityObjectProfile($object, $feeduri=null, $salmonuri=null, $hints=array())
{ {
$homeuri = $object->id; $homeuri = $object->id;
$nickname = self::getActivityObjectNickname($object); $nickname = self::getActivityObjectNickname($object, $hints);
$avatar = self::getActivityObjectAvatar($object); $avatar = self::getActivityObjectAvatar($object);
if (!$homeuri) { if (!$homeuri) {
@ -756,6 +758,18 @@ class Ostatus_profile extends Memcached_DataObject
throw new ServerException("No profile URI"); throw new ServerException("No profile URI");
} }
if (empty($feeduri)) {
if (array_key_exists('feedurl', $hints)) {
$feeduri = $hints['feedurl'];
}
}
if (empty($salmonuri)) {
if (array_key_exists('salmon', $hints)) {
$salmonuri = $hints['salmon'];
}
}
if (!$feeduri || !$salmonuri) { if (!$feeduri || !$salmonuri) {
// Get the canonical feed URI and check it // Get the canonical feed URI and check it
$discover = new FeedDiscovery(); $discover = new FeedDiscovery();
@ -773,7 +787,11 @@ class Ostatus_profile extends Memcached_DataObject
$profile = new Profile(); $profile = new Profile();
$profile->nickname = $nickname; $profile->nickname = $nickname;
$profile->fullname = $object->title; $profile->fullname = $object->title;
$profile->profileurl = $object->link; if (!empty($object->link)) {
$profile->profileurl = $object->link;
} else if (array_key_exists('profileurl', $hints)) {
$profile->profileurl = $hints['profileurl'];
}
$profile->created = common_sql_now(); $profile->created = common_sql_now();
// @fixme bio // @fixme bio
@ -812,12 +830,24 @@ class Ostatus_profile extends Memcached_DataObject
} }
} }
protected static function getActivityObjectNickname($object) protected static function getActivityObjectNickname($object, $hints=array())
{ {
// XXX: check whatever PoCo calls a nickname first // XXX: check whatever PoCo calls a nickname first
// Try the definitive ID
$nickname = self::nicknameFromURI($object->id); $nickname = self::nicknameFromURI($object->id);
// Try a Webfinger if one was passed (way) down
if (empty($nickname)) {
if (array_key_exists('webfinger', $hints)) {
$nickname = self::nicknameFromURI($hints['webfinger']);
}
}
// Try the name
if (empty($nickname)) { if (empty($nickname)) {
$nickname = common_nicknamize($object->title); $nickname = common_nicknamize($object->title);
} }
@ -883,11 +913,16 @@ class Ostatus_profile extends Memcached_DataObject
} }
} }
$hints = array('webfinger' => $addr,
'profileurl' => $profileUrl,
'feedurl' => $feedUrl,
'salmon' => $salmonEndpoint);
// If we got a feed URL, try that // If we got a feed URL, try that
if (isset($feedUrl)) { if (isset($feedUrl)) {
try { try {
$oprofile = self::ensureProfile($feedUrl); $oprofile = self::ensureProfile($feedUrl, $hints);
return $oprofile; return $oprofile;
} catch (Exception $e) { } catch (Exception $e) {
common_log(LOG_WARNING, "Failed creating profile from feed URL '$feedUrl': " . $e->getMessage()); common_log(LOG_WARNING, "Failed creating profile from feed URL '$feedUrl': " . $e->getMessage());
@ -899,7 +934,7 @@ class Ostatus_profile extends Memcached_DataObject
if (isset($profileUrl)) { if (isset($profileUrl)) {
try { try {
$oprofile = self::ensureProfile($profileUrl); $oprofile = self::ensureProfile($profileUrl, $hints);
return $oprofile; return $oprofile;
} catch (Exception $e) { } catch (Exception $e) {
common_log(LOG_WARNING, "Failed creating profile from profile URL '$profileUrl': " . $e->getMessage()); common_log(LOG_WARNING, "Failed creating profile from profile URL '$profileUrl': " . $e->getMessage());
@ -922,6 +957,10 @@ class Ostatus_profile extends Memcached_DataObject
$profile->nickname = self::nicknameFromUri($uri); $profile->nickname = self::nicknameFromUri($uri);
$profile->created = common_sql_now(); $profile->created = common_sql_now();
if (isset($profileUrl)) {
$profile->profileurl = $profileUrl;
}
$profile_id = $profile->insert(); $profile_id = $profile->insert();
if (!$profile_id) { if (!$profile_id) {
@ -936,6 +975,10 @@ class Ostatus_profile extends Memcached_DataObject
$oprofile->profile_id = $profile_id; $oprofile->profile_id = $profile_id;
$oprofile->created = common_sql_now(); $oprofile->created = common_sql_now();
if (isset($feedUrl)) {
$profile->feeduri = $feedUrl;
}
$result = $oprofile->insert(); $result = $oprofile->insert();
if (!$result) { if (!$result) {

View File

@ -0,0 +1,114 @@
<?php
/*
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2010, StatusNet, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @package OStatusPlugin
* @maintainer Brion Vibber <brion@status.net>
*/
class Ostatus_source extends Memcached_DataObject
{
public $__table = 'ostatus_source';
public $notice_id; // notice we're referring to
public $profile_uri; // uri of the ostatus_profile this came through -- may be a group feed
public $method; // push or salmon
public /*static*/ function staticGet($k, $v=null)
{
return parent::staticGet(__CLASS__, $k, $v);
}
/**
* return table definition for DB_DataObject
*
* DB_DataObject needs to know something about the table to manipulate
* instances. This method provides all the DB_DataObject needs to know.
*
* @return array array of column definitions
*/
function table()
{
return array('notice_id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,
'profile_uri' => DB_DATAOBJECT_STR,
'method' => DB_DATAOBJECT_STR);
}
static function schemaDef()
{
return array(new ColumnDef('notice_id', 'integer',
null, false, 'PRI'),
new ColumnDef('profile_uri', 'varchar',
255, false),
new ColumnDef('method', "ENUM('push','salmon')",
null, false));
}
/**
* return key definitions for DB_DataObject
*
* DB_DataObject needs to know about keys that the table has; this function
* defines them.
*
* @return array key definitions
*/
function keys()
{
return array_keys($this->keyTypes());
}
/**
* return key definitions for Memcached_DataObject
*
* Our caching system uses the same key definitions, but uses a different
* method to get them.
*
* @return array key definitions
*/
function keyTypes()
{
return array('notice_id' => 'K');
}
function sequenceKey()
{
return array(false, false, false);
}
/**
* Save a remote notice source record; this helps indicate how trusted we are.
* @param string $method
*/
public static function saveNew(Notice $notice, Ostatus_profile $oprofile, $method)
{
$osource = new Ostatus_source();
$osource->notice_id = $notice->id;
$osource->profile_uri = $oprofile->uri;
$osource->method = $method;
if ($osource->insert()) {
return true;
} else {
common_log_db_error($osource, 'INSERT', __FILE__);
return false;
}
}
}

View File

@ -173,13 +173,17 @@ class SalmonAction extends Action
$html = $this->act->object->content; $html = $this->act->object->content;
$rendered = HTMLPurifier::purify($html); $purifier = new HTMLPurifier();
$rendered = $purifier->purify($html);
$content = html_entity_decode(strip_tags($rendered)); $content = html_entity_decode(strip_tags($rendered));
$options = array('is_local' => Notice::REMOTE_OMB, $options = array('is_local' => Notice::REMOTE_OMB,
'uri' => $this->act->object->id, 'uri' => $this->act->object->id,
'url' => $this->act->object->link, 'url' => $this->act->object->link,
'rendered' => $rendered); 'rendered' => $rendered,
'replies' => $this->act->context->attention);
if (!empty($this->act->context->location)) { if (!empty($this->act->context->location)) {
$options['lat'] = $location->lat; $options['lat'] = $location->lat;
@ -199,12 +203,17 @@ class SalmonAction extends Action
} }
if (!empty($this->act->time)) { if (!empty($this->act->time)) {
$options['created'] = common_sql_time($this->act->time); $options['created'] = common_sql_date($this->act->time);
} }
return Notice::saveNew($oprofile->profile_id, $saved = Notice::saveNew($oprofile->profile_id,
$content, $content,
'ostatus+salmon', 'ostatus+salmon',
$options); $options);
// Record that this was saved through a validated Salmon source
// @fixme actually do the signature validation!
Ostatus_source::saveNew($saved, $oprofile, 'salmon');
return $saved;
} }
} }