OStatus PuSH fixes:

- hub now defers subscription state updates until after verification, per spec
- hub now supports synchronous verification when requested (if async is not requested after)
- client now requests synchronous verification (it's a bit safer)
- cleanup on subscription logging/error responses
This commit is contained in:
Brion Vibber 2010-02-21 14:46:26 -08:00
parent aa0b2ce81a
commit 78ca45c7a0
4 changed files with 185 additions and 140 deletions

View File

@ -72,7 +72,7 @@ class PushCallbackAction extends Action
} }
/** /**
* Handler for GET verification requests from the hub * Handler for GET verification requests from the hub.
*/ */
function handleGet() function handleGet()
{ {
@ -81,31 +81,37 @@ class PushCallbackAction extends Action
$challenge = $this->arg('hub_challenge'); $challenge = $this->arg('hub_challenge');
$lease_seconds = $this->arg('hub_lease_seconds'); $lease_seconds = $this->arg('hub_lease_seconds');
$verify_token = $this->arg('hub_verify_token'); $verify_token = $this->arg('hub_verify_token');
if ($mode != 'subscribe' && $mode != 'unsubscribe') { if ($mode != 'subscribe' && $mode != 'unsubscribe') {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with mode \"$mode\""); throw new ClientException("Bad hub.mode $mode", 404);
throw new ServerException("Bogus hub callback: bad mode", 404);
} }
$feedsub = FeedSub::staticGet('uri', $topic); $feedsub = FeedSub::staticGet('uri', $topic);
if (!$feedsub) { if (!$feedsub) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback for unknown feed $topic"); throw new ClientException("Bad hub.topic feed $topic", 404);
throw new ServerException("Bogus hub callback: unknown feed", 404);
} }
if ($feedsub->verify_token !== $verify_token) { if ($feedsub->verify_token !== $verify_token) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad token \"$verify_token\" for feed $topic"); throw new ClientException("Bad hub.verify_token $token for $topic", 404);
throw new ServerException("Bogus hub callback: bad token", 404);
} }
if ($mode != $feedsub->sub_state) {
common_log(LOG_WARNING, __METHOD__ . ": bogus hub callback with bad mode \"$mode\" for feed $topic in state \"{$feedsub->sub_state}\"");
throw new ServerException("Bogus hub callback: mode doesn't match subscription state.", 404);
}
// OK!
if ($mode == 'subscribe') { if ($mode == 'subscribe') {
common_log(LOG_INFO, __METHOD__ . ': sub confirmed'); // We may get re-sub requests legitimately.
if ($feedsub->sub_state != 'subscribe' && $feedsub->sub_state != 'active') {
throw new ClientException("Unexpected subscribe request for $topic.", 404);
}
} else {
if ($feedsub->sub_state != 'unsubscribe') {
throw new ClientException("Unexpected unsubscribe request for $topic.", 404);
}
}
if ($mode == 'subscribe') {
if ($feedsub->sub_state == 'active') {
common_log(LOG_INFO, __METHOD__ . ': sub update confirmed');
} else {
common_log(LOG_INFO, __METHOD__ . ': sub confirmed');
}
$feedsub->confirmSubscribe($lease_seconds); $feedsub->confirmSubscribe($lease_seconds);
} else { } else {
common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic"); common_log(LOG_INFO, __METHOD__ . ": unsub confirmed; deleting sub record for $topic");

View File

@ -59,102 +59,121 @@ class PushHubAction extends Action
$mode = $this->trimmed('hub.mode'); $mode = $this->trimmed('hub.mode');
switch ($mode) { switch ($mode) {
case "subscribe": case "subscribe":
$this->subscribe();
break;
case "unsubscribe": case "unsubscribe":
$this->unsubscribe(); $this->subunsub($mode);
break; break;
case "publish": case "publish":
throw new ServerException("Publishing outside feeds not supported.", 400); throw new ClientException("Publishing outside feeds not supported.", 400);
default: default:
throw new ServerException("Unrecognized mode '$mode'.", 400); throw new ClientException("Unrecognized mode '$mode'.", 400);
} }
} }
/** /**
* Process a PuSH feed subscription request. * Process a request for a new or modified PuSH feed subscription.
* If asynchronous verification is requested, updates won't be saved immediately.
* *
* HTTP return codes: * HTTP return codes:
* 202 Accepted - request saved and awaiting verification * 202 Accepted - request saved and awaiting verification
* 204 No Content - already subscribed * 204 No Content - already subscribed
* 403 Forbidden - rejecting this (not specifically spec'd) * 400 Bad Request - rejecting this (not specifically spec'd)
*/ */
function subscribe() function subunsub($mode)
{ {
$feed = $this->argUrl('hub.topic');
$callback = $this->argUrl('hub.callback'); $callback = $this->argUrl('hub.callback');
$topic = $this->argUrl('hub.topic');
if (!$this->recognizedFeed($topic)) {
throw new ClientException("Unsupported hub.topic $topic; this hub only serves local user and group Atom feeds.");
}
$verify = $this->arg('hub.verify'); // @fixme may be multiple
if ($verify != 'sync' && $verify != 'async') {
throw new ClientException("Invalid hub.verify $verify; must be sync or async.");
}
$lease = $this->arg('hub.lease_seconds', null);
if ($mode == 'subscribe' && $lease != '' && !preg_match('/^\d+$/', $lease)) {
throw new ClientException("Invalid hub.lease $lease; must be empty or positive integer.");
}
$token = $this->arg('hub.verify_token', null); $token = $this->arg('hub.verify_token', null);
common_log(LOG_DEBUG, __METHOD__ . ": checking sub'd to $feed $callback"); $secret = $this->arg('hub.secret', null);
if ($this->getSub($feed, $callback)) { if ($secret != '' && strlen($secret) >= 200) {
// Already subscribed; return 204 per spec. throw new ClientException("Invalid hub.secret $secret; must be under 200 bytes.");
}
$sub = HubSub::staticGet($sub->topic, $sub->callback);
if (!$sub) {
// Creating a new one!
$sub = new HubSub();
$sub->topic = $topic;
$sub->callback = $callback;
}
if ($mode == 'subscribe') {
if ($secret) {
$sub->secret = $secret;
}
if ($lease) {
$sub->setLease(intval($lease));
}
}
if (!common_config('queue', 'enabled')) {
// Won't be able to background it.
$verify = 'sync';
}
if ($verify == 'async') {
$sub->scheduleVerify($mode, $token);
header('HTTP/1.1 202 Accepted');
} else {
$sub->verify($mode, $token);
header('HTTP/1.1 204 No Content'); header('HTTP/1.1 204 No Content');
common_log(LOG_DEBUG, __METHOD__ . ': already subscribed');
return;
} }
common_log(LOG_DEBUG, __METHOD__ . ': setting up');
$sub = new HubSub();
$sub->topic = $feed;
$sub->callback = $callback;
$sub->secret = $this->arg('hub.secret', null);
if (strlen($sub->secret) > 200) {
throw new ClientException("hub.secret must be no longer than 200 chars", 400);
}
$sub->setLease(intval($this->arg('hub.lease_seconds')));
// @fixme check for feeds we don't manage
// @fixme check the verification mode, might want a return immediately?
common_log(LOG_DEBUG, __METHOD__ . ': inserting');
$ok = $sub->insert();
if (!$ok) {
throw new ServerException("Failed to save subscription record", 500);
}
// @fixme check errors ;)
$data = array('sub' => $sub, 'mode' => 'subscribe', 'token' => $token);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubverify');
header('HTTP/1.1 202 Accepted');
common_log(LOG_DEBUG, __METHOD__ . ': done');
} }
/** /**
* Process a PuSH feed unsubscription request. * Check whether the given URL represents one of our canonical
* user or group Atom feeds.
* *
* HTTP return codes: * @param string $feed URL
* 202 Accepted - request saved and awaiting verification * @return boolean true if it matches
* 204 No Content - already subscribed
* 400 Bad Request - invalid params or rejected feed
*
* @fixme background this
*/ */
function unsubscribe() function recognizedFeed($feed)
{ {
$feed = $this->argUrl('hub.topic'); $matches = array();
$callback = $this->argUrl('hub.callback'); if (preg_match('!/(\d+)\.atom$!', $feed, $matches)) {
$sub = $this->getSub($feed, $callback); $id = $matches[1];
$params = array('id' => $id, 'format' => 'atom');
if ($sub) { $userFeed = common_local_url('ApiTimelineUser', $params);
$token = $this->arg('hub.verify_token', null); $groupFeed = common_local_url('ApiTimelineGroup', $params);
if ($sub->verify('unsubscribe', $token)) {
$sub->delete(); if ($feed == $userFeed) {
common_log(LOG_INFO, "PuSH unsubscribed $feed for $callback"); $user = User::staticGet('id', $id);
} else { if (!$user) {
throw new ServerException("Failed PuSH unsubscription: verification failed! $feed for $callback"); throw new ClientException("Invalid hub.topic $feed; user doesn't exist.");
} else {
return true;
}
} }
} else { if ($feed == $groupFeed) {
throw new ServerException("Failed PuSH unsubscription: not subscribed! $feed for $callback"); $user = User_group::staticGet('id', $id);
if (!$user) {
throw new ClientException("Invalid hub.topic $feed; group doesn't exist.");
} else {
return true;
}
}
common_log(LOG_DEBUG, "Not a user or group feed? $feed $userFeed $groupFeed");
} }
common_log(LOG_DEBUG, "LOST $feed");
return false;
} }
/** /**
* Grab and validate a URL from POST parameters. * Grab and validate a URL from POST parameters.
* @throws ServerException for malformed or non-http/https URLs * @throws ClientException for malformed or non-http/https URLs
*/ */
protected function argUrl($arg) protected function argUrl($arg)
{ {
@ -164,7 +183,7 @@ class PushHubAction extends Action
if (Validate::uri($url, $params)) { if (Validate::uri($url, $params)) {
return $url; return $url;
} else { } else {
throw new ServerException("Invalid URL passed for $arg: '$url'", 400); throw new ClientException("Invalid URL passed for $arg: '$url'");
} }
} }

View File

@ -291,10 +291,9 @@ class FeedSub extends Memcached_DataObject
$headers = array('Content-Type: application/x-www-form-urlencoded'); $headers = array('Content-Type: application/x-www-form-urlencoded');
$post = array('hub.mode' => $mode, $post = array('hub.mode' => $mode,
'hub.callback' => $callback, 'hub.callback' => $callback,
'hub.verify' => 'async', 'hub.verify' => 'sync',
'hub.verify_token' => $this->verify_token, 'hub.verify_token' => $this->verify_token,
'hub.secret' => $this->secret, 'hub.secret' => $this->secret,
//'hub.lease_seconds' => 0,
'hub.topic' => $this->uri); 'hub.topic' => $this->uri);
$client = new HTTPClient(); $client = new HTTPClient();
$response = $client->post($this->huburi, $headers, $post); $response = $client->post($this->huburi, $headers, $post);
@ -317,8 +316,8 @@ class FeedSub extends Memcached_DataObject
common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri"); common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri");
$orig = clone($this); $orig = clone($this);
$this->verify_token = null; $this->verify_token = '';
$this->sub_state = null; $this->sub_state = 'inactive';
$this->update($orig); $this->update($orig);
unset($orig); unset($orig);
@ -343,7 +342,7 @@ class FeedSub extends Memcached_DataObject
} else { } else {
$this->sub_end = null; $this->sub_end = null;
} }
$this->lastupdate = common_sql_now(); $this->modified = common_sql_now();
return $this->update($original); return $this->update($original);
} }
@ -362,7 +361,7 @@ class FeedSub extends Memcached_DataObject
$this->sub_state = ''; $this->sub_state = '';
$this->sub_start = ''; $this->sub_start = '';
$this->sub_end = ''; $this->sub_end = '';
$this->lastupdate = common_sql_now(); $this->modified = common_sql_now();
return $this->update($original); return $this->update($original);
} }

View File

@ -30,11 +30,11 @@ class HubSub extends Memcached_DataObject
public $topic; public $topic;
public $callback; public $callback;
public $secret; public $secret;
public $challenge;
public $lease; public $lease;
public $sub_start; public $sub_start;
public $sub_end; public $sub_end;
public $created; public $created;
public $modified;
public /*static*/ function staticGet($topic, $callback) public /*static*/ function staticGet($topic, $callback)
{ {
@ -61,11 +61,11 @@ class HubSub extends Memcached_DataObject
'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL, 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
'secret' => DB_DATAOBJECT_STR, 'secret' => DB_DATAOBJECT_STR,
'challenge' => DB_DATAOBJECT_STR,
'lease' => DB_DATAOBJECT_INT, 'lease' => DB_DATAOBJECT_INT,
'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME, 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL); 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
} }
static function schemaDef() static function schemaDef()
@ -82,8 +82,6 @@ class HubSub extends Memcached_DataObject
255, false), 255, false),
new ColumnDef('secret', 'text', new ColumnDef('secret', 'text',
null, true), null, true),
new ColumnDef('challenge', 'varchar',
32, true),
new ColumnDef('lease', 'int', new ColumnDef('lease', 'int',
null, true), null, true),
new ColumnDef('sub_start', 'datetime', new ColumnDef('sub_start', 'datetime',
@ -91,6 +89,8 @@ class HubSub extends Memcached_DataObject
new ColumnDef('sub_end', 'datetime', new ColumnDef('sub_end', 'datetime',
null, true), null, true),
new ColumnDef('created', 'datetime', new ColumnDef('created', 'datetime',
null, false),
new ColumnDef('modified', 'datetime',
null, false)); null, false));
} }
@ -148,84 +148,105 @@ class HubSub extends Memcached_DataObject
} }
/** /**
* Send a verification ping to subscriber * Schedule a future verification ping to the subscriber.
* If queues are disabled, will be immediate.
*
* @param string $mode 'subscribe' or 'unsubscribe' * @param string $mode 'subscribe' or 'unsubscribe'
* @param string $token hub.verify_token value, if provided by client * @param string $token hub.verify_token value, if provided by client
*/ */
function scheduleVerify($mode, $token=null, $retries=null)
{
if ($retries === null) {
$retries = intval(common_config('ostatus', 'hub_retries'));
}
$data = array('sub' => clone($this),
'mode' => $mode,
'token' => $token,
'retries' => $retries);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubverify');
}
/**
* Send a verification ping to subscriber, and if confirmed apply the changes.
* This may create, update, or delete the database record.
*
* @param string $mode 'subscribe' or 'unsubscribe'
* @param string $token hub.verify_token value, if provided by client
* @throws ClientException on failure
*/
function verify($mode, $token=null) function verify($mode, $token=null)
{ {
assert($mode == 'subscribe' || $mode == 'unsubscribe'); assert($mode == 'subscribe' || $mode == 'unsubscribe');
// Is this needed? data object fun... $challenge = common_good_rand(32);
$clone = clone($this);
$clone->challenge = common_good_rand(16);
$clone->update($this);
$this->challenge = $clone->challenge;
unset($clone);
$params = array('hub.mode' => $mode, $params = array('hub.mode' => $mode,
'hub.topic' => $this->topic, 'hub.topic' => $this->topic,
'hub.challenge' => $this->challenge); 'hub.challenge' => $challenge);
if ($mode == 'subscribe') { if ($mode == 'subscribe') {
$params['hub.lease_seconds'] = $this->lease; $params['hub.lease_seconds'] = $this->lease;
} }
if ($token !== null) { if ($token !== null) {
$params['hub.verify_token'] = $token; $params['hub.verify_token'] = $token;
} }
$url = $this->callback . '?' . http_build_query($params, '', '&'); // @fixme ugly urls
try { // Any existing query string parameters must be preserved
$request = new HTTPClient(); $url = $this->callback;
$response = $request->get($url); if (strpos('?', $url) !== false) {
$status = $response->getStatus(); $url .= '&';
if ($status >= 200 && $status < 300) {
$fail = false;
} else {
// @fixme how can we schedule a second attempt?
// Or should we?
$fail = "Returned HTTP $status";
}
} catch (Exception $e) {
$fail = $e->getMessage();
}
if ($fail) {
// @fixme how can we schedule a second attempt?
// or save a fail count?
// Or should we?
common_log(LOG_ERR, "Failed to verify $mode for $this->topic at $this->callback: $fail");
return false;
} else { } else {
if ($mode == 'subscribe') { $url .= '?';
// Establish or renew the subscription! }
// This seems unnecessary... dataobject fun! $url .= http_build_query($params, '', '&');
$clone = clone($this);
$clone->challenge = null;
$clone->setLease($this->lease);
$clone->update($this);
unset($clone);
$this->challenge = null; $request = new HTTPClient();
$this->setLease($this->lease); $response = $request->get($url);
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic for $this->lease seconds"); $status = $response->getStatus();
} else if ($mode == 'unsubscribe') {
common_log(LOG_ERR, "Verified $mode of $this->callback:$this->topic"); if ($status >= 200 && $status < 300) {
$this->delete(); common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic");
} else {
throw new ClientException("Hub subscriber verification returned HTTP $status");
}
$old = HubSub::staticGet($this->topic, $this->callback);
if ($mode == 'subscribe') {
if ($old) {
$this->update($old);
} else {
$ok = $this->insert();
}
} else if ($mode == 'unsubscribe') {
if ($old) {
$old->delete();
} else {
// That's ok, we're already unsubscribed.
} }
return true;
} }
} }
/** /**
* Insert wrapper; transparently set the hash key from topic and callback columns. * Insert wrapper; transparently set the hash key from topic and callback columns.
* @return boolean success * @return mixed success
*/ */
function insert() function insert()
{ {
$this->hashkey = self::hashkey($this->topic, $this->callback); $this->hashkey = self::hashkey($this->topic, $this->callback);
$this->created = common_sql_now();
$this->modified = common_sql_now();
return parent::insert(); return parent::insert();
} }
/**
* Update wrapper; transparently update modified column.
* @return boolean success
*/
function update($old=null)
{
$this->modified = common_sql_now();
return parent::update($old);
}
/** /**
* Schedule delivery of a 'fat ping' to the subscriber's callback * Schedule delivery of a 'fat ping' to the subscriber's callback
* endpoint. If queues are disabled, this will run immediately. * endpoint. If queues are disabled, this will run immediately.