Retry using the waiting queue so as to preserve message ordering

This commit is contained in:
Luke Fitzgerald 2010-08-10 19:23:45 -07:00
parent 8005bdb421
commit 9da2368383
3 changed files with 31 additions and 10 deletions

View File

@ -164,6 +164,7 @@ class IrcPlugin extends ImPlugin {
false, 'PRI', null, null, true), false, 'PRI', null, null, true),
new ColumnDef('data', 'blob', null, false), new ColumnDef('data', 'blob', null, false),
new ColumnDef('prioritise', 'tinyint', 1, false), new ColumnDef('prioritise', 'tinyint', 1, false),
new ColumnDef('attempts', 'integer', null, false),
new ColumnDef('created', 'datetime', null, false), new ColumnDef('created', 'datetime', null, false),
new ColumnDef('claimed', 'datetime'))); new ColumnDef('claimed', 'datetime')));

View File

@ -10,6 +10,7 @@ class Irc_waiting_message extends Memcached_DataObject {
public $id; // int primary_key not_null auto_increment public $id; // int primary_key not_null auto_increment
public $data; // blob not_null public $data; // blob not_null
public $prioritise; // tinyint(1) not_null public $prioritise; // tinyint(1) not_null
public $attempts; // int not_null
public $created; // datetime() not_null public $created; // datetime() not_null
public $claimed; // datetime() public $claimed; // datetime()
@ -111,6 +112,22 @@ class Irc_waiting_message extends Memcached_DataObject {
return null; return null;
} }
/**
* Increment the attempts count
*
* @return void
* @throws Exception
*/
public function incAttempts() {
$orig = clone($this);
$this->attempts++;
$result = $this->update($orig);
if (!$result) {
throw Exception(sprintf(_m("Could not increment attempts count for %d"), $this->id));
}
}
/** /**
* Release a claimed item. * Release a claimed item.
*/ */

View File

@ -101,19 +101,21 @@ class IrcManager extends ImManager {
$this->messageWaiting = false; $this->messageWaiting = false;
return; return;
} }
$data = unserialize($wm->data); $data = unserialize($wm->data);
$wm->incAttempts();
if (!$this->send_raw_message($data)) { if ($this->send_raw_message($data)) {
$this->plugin->enqueue_outgoing_raw( $wm->delete();
array( } else {
'type' => 'message', if ($wm->attempts <= common_config('queue', 'max_retries')) {
'prioritise' => $data['prioritise'], // Try again next idle
'data' => $data['data'] $wm->releaseClaim();
) } else {
); // Exceeded the maximum number of retries
$wm->delete();
}
} }
$wm->delete();
} }
} }
} }
@ -276,6 +278,7 @@ class IrcManager extends ImManager {
$wm->data = serialize($data); $wm->data = serialize($data);
$wm->prioritise = $data['prioritise']; $wm->prioritise = $data['prioritise'];
$wm->attempts = 0;
$wm->created = common_sql_now(); $wm->created = common_sql_now();
$result = $wm->insert(); $result = $wm->insert();