move handling code into queuemanager

This commit is contained in:
Evan Prodromou 2009-07-04 00:31:28 -04:00
parent 741eb1a28b
commit 49c5c6f92b
4 changed files with 52 additions and 114 deletions

View File

@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager
return true;
}
function nextItem($queue, $timeout=null)
function service($queue, $handler)
{
while (true) {
$this->_log(LOG_DEBUG, 'Checking for notices...');
$notice = $this->_nextItem($queue, null);
if (empty($notice)) {
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
// Nothing in the queue. Do you
// have other tasks, like servicing your
// XMPP connection, to do?
$handler->idle(QUEUE_HANDLER_MISS_IDLE);
} else {
$this->_log(LOG_INFO, 'Got notice '. $notice->id);
// Yay! Got one!
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
$this->_done($notice, $queue);
} else {
$this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
$this->_fail($notice, $queue);
}
// Chance to e.g. service your XMPP connection
$this->_log(LOG_DEBUG, 'Idling after success.');
$handler->idle(QUEUE_HANDLER_HIT_IDLE);
}
// XXX: when do we give up?
}
}
function _nextItem($queue, $timeout=null)
{
$start = time();
$result = null;
@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager
return $result;
}
function done($object, $queue)
function _done($object, $queue)
{
// XXX: right now, we only handle notices
@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager
$notice = null;
}
function fail($object, $queue)
function _fail($object, $queue)
{
// XXX: right now, we only handle notices

View File

@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php');
define('CLAIM_TIMEOUT', 1200);
define('QUEUE_HANDLER_MISS_IDLE', 10);
define('QUEUE_HANDLER_HIT_IDLE', 10);
define('QUEUE_HANDLER_HIT_IDLE', 0);
class QueueHandler extends Daemon
{
@ -42,7 +42,7 @@ class QueueHandler extends Daemon
function timeout()
{
return null;
return 60;
}
function class_name()
@ -96,31 +96,7 @@ class QueueHandler extends Daemon
$qm = QueueManager::get();
while (true) {
$this->log(LOG_DEBUG, 'Checking for notices...');
$notice = $qm->nextItem($queue, $timeout);
if (empty($notice)) {
$this->log(LOG_DEBUG, 'No notices waiting; idling.');
// Nothing in the queue. Do you
// have other tasks, like servicing your
// XMPP connection, to do?
$this->idle(QUEUE_HANDLER_MISS_IDLE);
} else {
$this->log(LOG_INFO, 'Got notice '. $notice->id);
// Yay! Got one!
if ($this->handle_notice($notice)) {
$this->log(LOG_INFO, 'Successfully handled notice '. $notice->id);
$qm->done($notice, $queue);
} else {
$this->log(LOG_INFO, 'Failed to handle notice '. $notice->id);
$qm->fail($notice, $queue);
}
// Chance to e.g. service your XMPP connection
$this->log(LOG_DEBUG, 'Idling after success.');
$this->idle(QUEUE_HANDLER_HIT_IDLE);
}
// XXX: when do we give up?
}
$qm->service($queue, $this);
if (!$this->finish()) {
return false;

View File

@ -67,23 +67,8 @@ class QueueManager
throw ServerException("Unimplemented function 'enqueue' called");
}
function peek($queue)
function service($queue, $handler)
{
throw ServerException("Unimplemented function 'peek' called");
}
function nextItem($queue, $timeout=null)
{
throw ServerException("Unimplemented function 'nextItem' called");
}
function done($object, $queue)
{
throw ServerException("Unimplemented function 'done' called");
}
function fail($object, $queue)
{
throw ServerException("Unimplemented function 'fail' called");
throw ServerException("Unimplemented function 'service' called");
}
}

View File

@ -84,85 +84,33 @@ class StompQueueManager
. $notice->id . ' for ' . $transport);
}
function nextItem($queue, $timeout=null)
function service($queue, $handler)
{
$result = null;
$this->_connect();
$frame = $this->con->readFrame();
$this->con->setReadTimeout($handler->timeout());
if ($frame) {
$this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
$this->con->subscribe($this->_queueName($queue));
// XXX: Now the queue handler receives only the ID of the
// notice, and it has to get it from the DB
// A massive improvement would be avoid DB query by transmitting
// all the notice details via queue server...
while (true) {
$notice = Notice::staticGet($frame->body);
$frame = $this->con->readFrame();
if ($notice) {
$this->_saveFrame($notice, $queue, $frame);
} else {
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
if ($frame) {
$notice = Notice::staticGet($frame->body);
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
$this->con->ack($frame);
}
}
$handler->idle(0);
}
}
function done($object, $queue)
{
$notice = $object;
$this->_connect();
$frame = $this->_getFrame($notice, $queue);
if (empty($frame)) {
$this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue);
} else {
// if the msg has been handled positively, ack it
// and the queue server will remove it from the queue
$this->con->ack($frame);
$this->_clearFrame($notice, $queue);
$this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
}
}
function fail($object, $queue)
{
$notice = $object;
// STOMP server will requeue it after a while anyways,
// so no need to notify. Just get it out of our little
// array
$this->_clearFrame($notice, $queue);
}
function _frameKey($notice, $queue)
{
return ((string)$notice->id) . '-' . $queue;
}
function _saveFrame($notice, $queue, $frame)
{
$k = $this->_frameKey($notice, $queue);
$this->_frames[$k] = $frame;
return true;
}
function _getFrame($notice, $queue)
{
$k = $this->_frameKey($notice, $queue);
return $this->_frames[$k];
}
function _clearFrame($notice, $queue)
{
$k = $this->_frameKey($notice, $queue);
unset($this->_frames[$k]);
$this->con->unsubscribe($this->_queueName($queue));
}
function _queueName($queue)