From e5b758dbbef6774943abf453a43114a2c3371b4a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:31 -0400 Subject: [PATCH 01/21] start of queuemanager code --- lib/dbqueuemanager.php | 106 +++++++++++++++++++++++++++++++++ lib/queuemanager.php | 78 ++++++++++++++++++++++++ lib/stompqueuemanager.php | 122 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 306 insertions(+) create mode 100644 lib/dbqueuemanager.php create mode 100644 lib/queuemanager.php create mode 100644 lib/stompqueuemanager.php diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php new file mode 100644 index 0000000000..c0d4dcd293 --- /dev/null +++ b/lib/dbqueuemanager.php @@ -0,0 +1,106 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class DBQueueManager extends QueueManager +{ + var $qis = array(); + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $qi = new Queue_item(); + + $qi->notice_id = $notice->id; + $qi->transport = $queue; + $qi->created = $notice->created; + $result = $qi->insert(); + + if (!$result) { + common_log_db_error($qi, 'INSERT', __FILE__); + throw new ServerException('DB error inserting queue item'); + } + + return true; + } + + function nextItem($queue, $timeout=null) + { + $start = time(); + $result = null; + + do { + $qi = Queue_item::top($queue); + if (!empty($qi)) { + $notice = Notice::staticGet('id', $qi->notice_id); + if (!empty($notice)) { + $result = $notice; + } else { + $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id); + $qi->delete(); + $qi->free(); + $qi = null; + } + } + } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); + + return $result; + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } + $qi->delete(); + $qi->free(); + $qi = null; + } + + $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _log($level, $msg) + { + common_log($level, 'DBQueueManager: '.$msg); + } +} diff --git a/lib/queuemanager.php b/lib/queuemanager.php new file mode 100644 index 0000000000..64aca1bc12 --- /dev/null +++ b/lib/queuemanager.php @@ -0,0 +1,78 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class QueueManager +{ + static $qm = null; + + static function get() + { + if (empty(self::$qm)) { + + if (Event::handle('StartNewQueueManager', self::$qm)) { + + $type = common_config('queue', 'sub'); + + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } + } + + return self::$qm; + } + } + + function enqueue($object, $queue) + { + throw ServerException("Unimplemented function 'enqueue' called"); + } + + function peek($queue) + { + throw ServerException("Unimplemented function 'peek' called"); + } + + function nextItem($queue, $timeout=null) + { + throw ServerException("Unimplemented function 'nextItem' called"); + } + + function done($object, $queue) + { + throw ServerException("Unimplemented function 'done' called"); + } +} diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php new file mode 100644 index 0000000000..20c6e7a341 --- /dev/null +++ b/lib/stompqueuemanager.php @@ -0,0 +1,122 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +require_once 'Stomp.php'; + +class QueueManager +{ + var $server = null; + var $username = null; + var $password = null; + var $base = null; + var $con = null; + var $frames = array(); + + function __construct() + { + $this->server = common_config('queue', 'stomp_server'); + $this->username = common_config('queue', 'stomp_username'); + $this->password = common_config('queue', 'stomp_password'); + $this->base = common_config('queue', 'queue_basename'); + } + + function _connect() + { + if (empty($this->con)) { + $this->con = new Stomp($this->server); + + if (!$this->con->connect($this->username, $this->password)) { + $this->_log(LOG_ERR, 'Failed to connect to queue server'); + throw new ServerException('Failed to connect to queue server'); + } + } + } + + function enqueue($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $result = $this->con->send($this->_queueName($queue), + $notice->id, // BODY of the message + array ('created' => $notice->created)); + + if (!$result) { + common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + return false; + } + + common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' + . $notice->id . ' for ' . $transport); + } + + function nextItem($queue, $timeout=null) + { + $result = null; + + $this->_connect(); + + $frame = $this->con->readFrame(); + + if ($frame) { + $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + + // XXX: Now the queue handler receives only the ID of the + // notice, and it has to get it from the DB + // A massive improvement would be avoid DB query by transmitting + // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); + + if ($notice) { + } else { + $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + } + } + } + + function done($object, $queue) + { + $notice = (Notice)$object; + + $this->_connect(); + + $frame = $this->_getFrame($notice, $queue); + + if (empty($frame)) { + $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); + } else { + // if the msg has been handled positively, ack it + // and the queue server will remove it from the queue + $this->con->ack($frame); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + } + } +} From 854c82cfd53cb071afa39259fb467b4730bd6494 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 28 Jun 2009 14:38:34 -0400 Subject: [PATCH 02/21] start of queuemanager code --- classes/Queue_item.php | 11 +++++-- lib/util.php | 75 ------------------------------------------ 2 files changed, 8 insertions(+), 78 deletions(-) diff --git a/classes/Queue_item.php b/classes/Queue_item.php index 9b909ec22b..295c321b57 100644 --- a/classes/Queue_item.php +++ b/classes/Queue_item.php @@ -4,7 +4,7 @@ */ require_once INSTALLDIR.'/classes/Memcached_DataObject.php'; -class Queue_item extends Memcached_DataObject +class Queue_item extends Memcached_DataObject { ###START_AUTOCODE /* the code below is auto generated do not remove the above tag */ @@ -13,7 +13,7 @@ class Queue_item extends Memcached_DataObject public $notice_id; // int(4) primary_key not_null public $transport; // varchar(8) primary_key not_null public $created; // datetime() not_null - public $claimed; // datetime() + public $claimed; // datetime() /* Static get */ function staticGet($k,$v=null) @@ -24,7 +24,7 @@ class Queue_item extends Memcached_DataObject function sequenceKey() { return array(false, false); } - + static function top($transport) { $qi = new Queue_item(); @@ -54,4 +54,9 @@ class Queue_item extends Memcached_DataObject $qi = null; return null; } + + function &pkeyGet($kv) + { + return Memcached_DataObject::pkeyGet('Queue_item', $kv); + } } diff --git a/lib/util.php b/lib/util.php index 9c1af7a0dc..3f924c8dec 100644 --- a/lib/util.php +++ b/lib/util.php @@ -889,69 +889,6 @@ function common_enqueue_notice($notice) return $result; } -function common_enqueue_notice_stomp($notice, $transports) -{ - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - common_log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - - foreach ($transports as $transport) { - $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE - $notice->id, // BODY of the message - array ('created' => $notice->created)); - if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); - return false; - } - common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport); - } - - //send tags as headers, so they can be used as JMS selectors - common_log(LOG_DEBUG, 'searching for tags ' . $notice->id); - $tags = array(); - $tag = new Notice_tag(); - $tag->notice_id = $notice->id; - if ($tag->find()) { - while ($tag->fetch()) { - common_log(LOG_DEBUG, 'tag found = ' . $tag->tag); - array_push($tags,$tag->tag); - } - } - $tag->free(); - - $con->send('/topic/laconica.'.$notice->profile_id, - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id); - $con->send('/topic/laconica.allusers', - $notice->content, - array( - 'profile_id' => $notice->profile_id, - 'created' => $notice->created, - 'tags' => implode($tags,' - ') - ) - ); - common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id); - $result = true; -} - function common_enqueue_notice_db($notice, $transports) { // in any other case, 'internal' @@ -962,18 +899,6 @@ function common_enqueue_notice_db($notice, $transports) function common_enqueue_notice_transport($notice, $transport) { - $qi = new Queue_item(); - $qi->notice_id = $notice->id; - $qi->transport = $transport; - $qi->created = $notice->created; - $result = $qi->insert(); - if (!$result) { - $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError'); - common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message); - throw new ServerException('DB error inserting queue item: ' . $last_error->message); - } - common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport); - return true; } function common_real_broadcast($notice, $remote=false) From 58b427869a001a91d66cff497f1563b8277f1a67 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:42 -0400 Subject: [PATCH 03/21] compile errors in DBQueueManager --- lib/dbqueuemanager.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c0d4dcd293..46be54b30b 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -34,7 +34,7 @@ class DBQueueManager extends QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $qi = new Queue_item(); @@ -76,7 +76,9 @@ class DBQueueManager extends QueueManager function done($object, $queue) { - $notice = (Notice)$object; + // XXX: right now, we only handle notices + + $notice = $object; $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, 'transport' => $queue)); From 4c256a6d7ee287def5c26f401c8caa6bfe0b8dff Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:09:58 -0400 Subject: [PATCH 04/21] better hook variables for StartQueueManager --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 64aca1bc12..92f0e10de6 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,9 @@ class QueueManager { if (empty(self::$qm)) { - if (Event::handle('StartNewQueueManager', self::$qm)) { + $type = common_config('queue', 'sub'); - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { switch ($type) { case 'db': From 7b66a129139d8c2f03677f6a5b71412a111f655d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:10:23 -0400 Subject: [PATCH 05/21] save frames for StompQueueManager --- lib/stompqueuemanager.php | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 20c6e7a341..1b4a26f2ea 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -30,7 +30,7 @@ require_once 'Stomp.php'; -class QueueManager +class StompQueueManager { var $server = null; var $username = null; @@ -61,10 +61,12 @@ class QueueManager function enqueue($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); + // XXX: serialize and send entire notice + $result = $this->con->send($this->_queueName($queue), $notice->id, // BODY of the message array ('created' => $notice->created)); @@ -93,9 +95,11 @@ class QueueManager // notice, and it has to get it from the DB // A massive improvement would be avoid DB query by transmitting // all the notice details via queue server... + $notice = Notice::staticGet($frame->body); if ($notice) { + $this->_saveFrame($notice, $queue, $frame); } else { $this->log(LOG_WARNING, 'queue item for notice that does not exist'); } @@ -104,7 +108,7 @@ class QueueManager function done($object, $queue) { - $notice = (Notice)$object; + $notice = $object; $this->_connect(); @@ -116,7 +120,33 @@ class QueueManager // if the msg has been handled positively, ack it // and the queue server will remove it from the queue $this->con->ack($frame); + $this->_clearFrame($notice, $queue); + $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); } } + + function _frameKey($notice, $queue) + { + return ((string)$notice->id) . '-' . $queue; + } + + function _saveFrame($notice, $queue, $frame) + { + $k = $this->_frameKey($notice, $queue); + $this->_frames[$k] = $frame; + return true; + } + + function _getFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + return $this->_frames[$k]; + } + + function _clearFrame($notice, $queue) + { + $k = $this->_frameKey($notice, $queue); + unset($this->_frames[$k]); + } } From e0bf8ad95b2d2ddc7b988c25e9cffa20075a5d8c Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 11:34:12 -0400 Subject: [PATCH 06/21] Add UnQueueManager for immediate handling Perhaps it's a little precious, but I took out the switches in util.php to determine what's supposed to be sent when, and made a queuemanager class that will just do things when they're supposed to be done. --- lib/queuemanager.php | 10 ++++- lib/unqueuemanager.php | 85 +++++++++++++++++++++++++++++++++++++++ lib/util.php | 91 +++++++++--------------------------------- 3 files changed, 112 insertions(+), 74 deletions(-) create mode 100644 lib/unqueuemanager.php diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 92f0e10de6..6bb21de9b6 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -36,9 +36,15 @@ class QueueManager { if (empty(self::$qm)) { - $type = common_config('queue', 'sub'); + if (Event::handle('StartNewQueueManager', array(&self::$qm))) { - if (Event::handle('StartNewQueueManager', array($type, &self::$qm))) { + $enabled = common_config('queue', 'enabled'); + $type = common_config('queue', 'sub'); + + if (!$enabled) { + // does everything immediately + return new UnQueueManager(); + } switch ($type) { case 'db': diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php new file mode 100644 index 0000000000..5154610725 --- /dev/null +++ b/lib/unqueuemanager.php @@ -0,0 +1,85 @@ +. + * + * @category QueueManager + * @package Laconica + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 Control Yourself, Inc. + * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 + * @link http://laconi.ca/ + */ + +class UnQueueManager +{ + function enqueue($object, $queue) + { + $notice = $object; + + switch ($queue) + { + case 'omb': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/omb.php'); + omb_broadcast_remote_subscribers($notice); + } + break; + case 'public': + if ($this->_isLocal($notice)) { + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_public_notice($notice); + } + break; + case 'twitter': + if ($this->_isLocal($notice)) { + broadcast_twitter($notice); + } + break; + case 'facebook': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/facebookutil.php'; + return facebookBroadcastNotice($notice); + } + break; + case 'ping': + if ($this->_isLocal($notice)) { + require_once INSTALLDIR . '/lib/ping.php'; + return ping_broadcast_notice($notice); + } + case 'sms': + require_once(INSTALLDIR.'/lib/mail.php'); + mail_broadcast_notice_sms($notice); + break; + case 'jabber': + require_once(INSTALLDIR.'/lib/jabber.php'); + jabber_broadcast_notice($notice); + break; + default: + throw ServerException("UnQueueManager: Unknown queue: $type"); + } + } + + function _isLocal($notice) + { + return ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC); + } +} \ No newline at end of file diff --git a/lib/util.php b/lib/util.php index 3f924c8dec..b1b4faa7e6 100644 --- a/lib/util.php +++ b/lib/util.php @@ -861,88 +861,35 @@ function common_redirect($url, $code=307) function common_broadcast_notice($notice, $remote=false) { - if (common_config('queue', 'enabled')) { - // Do it later! - return common_enqueue_notice($notice); - } else { - return common_real_broadcast($notice, $remote); - } + return common_enqueue_notice($notice); } // Stick the notice on the queue function common_enqueue_notice($notice) { - $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping'); + static $localTransports = array('omb', + 'public', + 'twitter', + 'facebook', + 'ping'); + static $allTransports = array('sms', 'jabber'); - if (common_config('xmpp', 'enabled')) + $transports = $allTransports; + + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || + $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { + $transports = array_merge($transports, $localTransports); + } + + $qm = QueueManager::get(); + + foreach ($transports as $transport) { - $transports[] = 'jabber'; + $qm->enqueue($notice, $transport); } - if (common_config('queue','subsystem') == 'stomp') { - common_enqueue_notice_stomp($notice, $transports); - } - else { - common_enqueue_notice_db($notice, $transports); - } - return $result; -} - -function common_enqueue_notice_db($notice, $transports) -{ - // in any other case, 'internal' - foreach ($transports as $transport) { - common_enqueue_notice_transport($notice, $transport); - } -} - -function common_enqueue_notice_transport($notice, $transport) -{ -} - -function common_real_broadcast($notice, $remote=false) -{ - $success = true; - if (!$remote) { - // Make sure we have the OMB stuff - require_once(INSTALLDIR.'/lib/omb.php'); - $success = omb_broadcast_remote_subscribers($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/jabber.php'); - $success = jabber_broadcast_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id); - } - } - if ($success) { - require_once(INSTALLDIR.'/lib/mail.php'); - $success = mail_broadcast_notice_sms($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = jabber_public_notice($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id); - } - } - if ($success) { - $success = broadcast_twitter($notice); - if (!$success) { - common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id); - } - } - - // XXX: Do a real-time FB broadcast here? - - // XXX: broadcast notices to other IM - return $success; + return true; } function common_broadcast_profile($profile) From 887d35cfc8c1d42e5af67d0161b244545cda464a Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:09:18 -0400 Subject: [PATCH 07/21] better queue manager detection, new method fail() --- lib/queuemanager.php | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 6bb21de9b6..1bf4d4decc 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -39,22 +39,22 @@ class QueueManager if (Event::handle('StartNewQueueManager', array(&self::$qm))) { $enabled = common_config('queue', 'enabled'); - $type = common_config('queue', 'sub'); + $type = common_config('queue', 'subsystem'); if (!$enabled) { // does everything immediately - return new UnQueueManager(); - } - - switch ($type) { - case 'db': - self::$qm = new DBQueueManager(); - break; - case 'stomp': - self::$qm = new StompQueueManager(); - break; - default: - throw new ServerException("No queue manager class for type '$type'"); + self::$qm = new UnQueueManager(); + } else { + switch ($type) { + case 'db': + self::$qm = new DBQueueManager(); + break; + case 'stomp': + self::$qm = new StompQueueManager(); + break; + default: + throw new ServerException("No queue manager class for type '$type'"); + } } } @@ -81,4 +81,9 @@ class QueueManager { throw ServerException("Unimplemented function 'done' called"); } + + function fail($object, $queue) + { + throw ServerException("Unimplemented function 'fail' called"); + } } From 557418bc1e4e9d8a06025910ad7be5f60557f71e Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:09:41 -0400 Subject: [PATCH 08/21] better transport choices when xmpp is disabled --- lib/util.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/util.php b/lib/util.php index b1b4faa7e6..6563745161 100644 --- a/lib/util.php +++ b/lib/util.php @@ -869,17 +869,25 @@ function common_broadcast_notice($notice, $remote=false) function common_enqueue_notice($notice) { static $localTransports = array('omb', - 'public', 'twitter', 'facebook', 'ping'); - static $allTransports = array('sms', 'jabber'); + static $allTransports = array('sms'); $transports = $allTransports; + $xmpp = common_config('xmpp', 'enabled'); + + if ($xmpp) { + $transports[] = 'jabber'; + } + if ($notice->is_local == NOTICE_LOCAL_PUBLIC || $notice->is_local == NOTICE_LOCAL_NONPUBLIC) { $transports = array_merge($transports, $localTransports); + if ($xmpp) { + $transports[] = 'public'; + } } $qm = QueueManager::get(); From 2325d934a8abfc611f455d4f0b816e2dd62c5ec4 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:10:11 -0400 Subject: [PATCH 09/21] add fail() method to stompqueuemanager --- lib/stompqueuemanager.php | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1b4a26f2ea..badcd4abb0 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -126,6 +126,17 @@ class StompQueueManager } } + function fail($object, $queue) + { + $notice = $object; + + // STOMP server will requeue it after a while anyways, + // so no need to notify. Just get it out of our little + // array + + $this->_clearFrame($notice, $queue); + } + function _frameKey($notice, $queue) { return ((string)$notice->id) . '-' . $queue; From a35138b2684ec5275a1ffd7badfe7826cf2173b1 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:10:25 -0400 Subject: [PATCH 10/21] add fail() method to dbqueuemanager and fix logging --- lib/dbqueuemanager.php | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 46be54b30b..c9e5ef243f 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -84,10 +84,10 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); } else { if (empty($qi->claimed)) { - $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. 'for '.$notice->id.', queue '.$queue); } $qi->delete(); @@ -95,7 +95,36 @@ class DBQueueManager extends QueueManager $qi = null; } - $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); $notice->free(); $notice = null; From e52997e52fe02960908eb6a9637a3349a2c74dad Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Wed, 1 Jul 2009 12:11:02 -0400 Subject: [PATCH 11/21] change queuehandler class to use queuemanager interface --- lib/queuehandler.php | 153 ++++++++++++------------------------------- 1 file changed, 42 insertions(+), 111 deletions(-) diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ae403c65e2..045432ae52 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -17,17 +17,18 @@ * along with this program. If not, see . */ -define('CLAIM_TIMEOUT', 1200); - if (!defined('LACONICA')) { exit(1); } require_once(INSTALLDIR.'/lib/daemon.php'); require_once(INSTALLDIR.'/classes/Queue_item.php'); require_once(INSTALLDIR.'/classes/Notice.php'); +define('CLAIM_TIMEOUT', 1200); +define('QUEUE_HANDLER_MISS_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 10); + class QueueHandler extends Daemon { - var $_id = 'generic'; function QueueHandler($id=null) @@ -37,6 +38,11 @@ class QueueHandler extends Daemon } } + function timeout() + { + return null; + } + function class_name() { return ucfirst($this->transport()) . 'Handler'; @@ -75,110 +81,45 @@ class QueueHandler extends Daemon return true; } - function db_dispatch() { - do { - $qi = Queue_item::top($this->transport()); - if ($qi) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created)); - $notice = Notice::staticGet($qi->notice_id); - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - # XXX: what to do if broadcast fails? - $result = $this->handle_notice($notice); - if (!$result) { - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - $orig = $qi; - $qi->claimed = null; - $qi->update($orig); - $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id); - continue; - } - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - $qi->delete(); - $qi->free(); - unset($qi); - $this->idle(0); - } else { - $this->clear_old_claims(); - $this->idle(5); - } - } while (true); - } - - function stomp_dispatch() { - - // use an external message queue system via STOMP - require_once("Stomp.php"); - - $server = common_config('queue','stomp_server'); - $username = common_config('queue', 'stomp_username'); - $password = common_config('queue', 'stomp_password'); - - $con = new Stomp($server); - - if (!$con->connect($username, $password)) { - $this->log(LOG_ERR, 'Failed to connect to queue server'); - return false; - } - - $queue_basename = common_config('queue','queue_basename'); - // subscribe to the relevant queue (format: basename-transport) - $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport()); - - do { - $frame = $con->readFrame(); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); - - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... - $notice = Notice::staticGet($frame->body); - - if ($notice) { - $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id); - $result = $this->handle_notice($notice); - if ($result) { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $con->ack($frame); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - else { - // no ack - $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id); - } - $notice->free(); - unset($notice); - $notice = null; - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } while (true); - - $con->disconnect(); - } - function run() { if (!$this->start()) { return false; } + $this->log(LOG_INFO, 'checking for queued notices'); - if (common_config('queue','subsystem') == 'stomp') { - $this->stomp_dispatch(); - } - else { - $this->db_dispatch(); + + $queue = $this->transport(); + $timeout = $this->timeout(); + + $qm = QueueManager::get(); + + while (true) { + $this->log(LOG_DEBUG, 'Checking for notices...'); + $notice = $qm->nextItem($queue, $timeout); + if (empty($notice)) { + $this->log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $this->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($this->handle_notice($notice)) { + $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $qm->done($notice, $queue); + } else { + $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $qm->fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->log(LOG_DEBUG, 'Idling after success.'); + $this->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? } + if (!$this->finish()) { return false; } @@ -187,21 +128,11 @@ class QueueHandler extends Daemon function idle($timeout=0) { - if ($timeout>0) { + if ($timeout > 0) { sleep($timeout); } } - function clear_old_claims() - { - $qi = new Queue_item(); - $qi->transport = $this->transport(); - $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT); - $qi->update(DB_DATAOBJECT_WHEREADD_ONLY); - $qi->free(); - unset($qi); - } - function log($level, $msg) { common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg); From e8f27025ba7869057d86fe37a5264e1c742969f5 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Thu, 2 Jul 2009 12:43:09 -0400 Subject: [PATCH 12/21] more logging in stompqueuemanager --- lib/stompqueuemanager.php | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index badcd4abb0..08a5790d41 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -49,10 +49,14 @@ class StompQueueManager function _connect() { + $this->_log(LOG_DEBUG, "Connecting to $this->server..."); if (empty($this->con)) { + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); $this->con = new Stomp($this->server); - if (!$this->con->connect($this->username, $this->password)) { + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { $this->_log(LOG_ERR, 'Failed to connect to queue server'); throw new ServerException('Failed to connect to queue server'); } @@ -160,4 +164,9 @@ class StompQueueManager $k = $this->_frameKey($notice, $queue); unset($this->_frames[$k]); } + + function _log($level, $msg) + { + common_log($level, 'StompQueueManager: '.$msg); + } } From 3e4be98ff6de7a1044f0d7b0deef4f6054e64464 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Fri, 3 Jul 2009 10:05:07 -0400 Subject: [PATCH 13/21] add _queueName function --- lib/stompqueuemanager.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 08a5790d41..1ad6870363 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -165,6 +165,11 @@ class StompQueueManager unset($this->_frames[$k]); } + function _queueName($queue) + { + return common_config('queue', 'queue_basename') . $queue; + } + function _log($level, $msg) { common_log($level, 'StompQueueManager: '.$msg); From 49c5c6f92bc1d06e6464eade81eead891d86f10d Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 00:31:28 -0400 Subject: [PATCH 14/21] move handling code into queuemanager --- lib/dbqueuemanager.php | 35 +++++++++++++++-- lib/queuehandler.php | 30 ++------------ lib/queuemanager.php | 19 +-------- lib/stompqueuemanager.php | 82 +++++++-------------------------------- 4 files changed, 52 insertions(+), 114 deletions(-) diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243f..6e7172de00 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -51,7 +51,36 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $notice = $this->_nextItem($queue, null); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; @@ -74,7 +103,7 @@ class DBQueueManager extends QueueManager return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -101,7 +130,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices diff --git a/lib/queuehandler.php b/lib/queuehandler.php index ddb47a28e9..c0f38f4e35 100644 --- a/lib/queuehandler.php +++ b/lib/queuehandler.php @@ -25,7 +25,7 @@ require_once(INSTALLDIR.'/classes/Notice.php'); define('CLAIM_TIMEOUT', 1200); define('QUEUE_HANDLER_MISS_IDLE', 10); -define('QUEUE_HANDLER_HIT_IDLE', 10); +define('QUEUE_HANDLER_HIT_IDLE', 0); class QueueHandler extends Daemon { @@ -42,7 +42,7 @@ class QueueHandler extends Daemon function timeout() { - return null; + return 60; } function class_name() @@ -96,31 +96,7 @@ class QueueHandler extends Daemon $qm = QueueManager::get(); - while (true) { - $this->log(LOG_DEBUG, 'Checking for notices...'); - $notice = $qm->nextItem($queue, $timeout); - if (empty($notice)) { - $this->log(LOG_DEBUG, 'No notices waiting; idling.'); - // Nothing in the queue. Do you - // have other tasks, like servicing your - // XMPP connection, to do? - $this->idle(QUEUE_HANDLER_MISS_IDLE); - } else { - $this->log(LOG_INFO, 'Got notice '. $notice->id); - // Yay! Got one! - if ($this->handle_notice($notice)) { - $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id); - $qm->done($notice, $queue); - } else { - $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id); - $qm->fail($notice, $queue); - } - // Chance to e.g. service your XMPP connection - $this->log(LOG_DEBUG, 'Idling after success.'); - $this->idle(QUEUE_HANDLER_HIT_IDLE); - } - // XXX: when do we give up? - } + $qm->service($queue, $this); if (!$this->finish()) { return false; diff --git a/lib/queuemanager.php b/lib/queuemanager.php index 1bf4d4decc..f36e99d16a 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -67,23 +67,8 @@ class QueueManager throw ServerException("Unimplemented function 'enqueue' called"); } - function peek($queue) + function service($queue, $handler) { - throw ServerException("Unimplemented function 'peek' called"); - } - - function nextItem($queue, $timeout=null) - { - throw ServerException("Unimplemented function 'nextItem' called"); - } - - function done($object, $queue) - { - throw ServerException("Unimplemented function 'done' called"); - } - - function fail($object, $queue) - { - throw ServerException("Unimplemented function 'fail' called"); + throw ServerException("Unimplemented function 'service' called"); } } diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 1ad6870363..b8731d5439 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -84,85 +84,33 @@ class StompQueueManager . $notice->id . ' for ' . $transport); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + $frame = $this->con->readFrame(); - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); + if ($frame) { + $notice = Notice::staticGet($frame->body); + + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->con->ack($frame); + } } + + $handler->idle(0); } - } - function done($object, $queue) - { - $notice = $object; - - $this->_connect(); - - $frame = $this->_getFrame($notice, $queue); - - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); - - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); - } - } - - function fail($object, $queue) - { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array - - $this->_clearFrame($notice, $queue); - } - - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } - - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; - } - - function _getFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; - } - - function _clearFrame($notice, $queue) - { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue) From f63702579a672d35c5db262873a4a22835301074 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:16:58 -0400 Subject: [PATCH 15/21] don't say we're connecting if we're not --- lib/stompqueuemanager.php | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index b8731d5439..a6bac861b3 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -49,7 +49,6 @@ class StompQueueManager function _connect() { - $this->_log(LOG_DEBUG, "Connecting to $this->server..."); if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); $this->con = new Stomp($this->server); From 6d72864618b73271a83aa566f35838bb1a5c57c7 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:17:37 -0400 Subject: [PATCH 16/21] don't try to show non-object --- lib/util.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/util.php b/lib/util.php index a40cd3d54b..9e8ec41d25 100644 --- a/lib/util.php +++ b/lib/util.php @@ -1028,6 +1028,9 @@ function common_log_objstring(&$object) if (is_null($object)) { return "null"; } + if (!($object instanceof DB_DataObject)) { + return "(unknown)"; + } $arr = $object->toArray(); $fields = array(); foreach ($arr as $k => $v) { From 9dee9e1612ebe6d6f28c21bce8c426658d60f171 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:20:39 -0400 Subject: [PATCH 17/21] new default daemon jid --- lib/xmppqueuehandler.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/xmppqueuehandler.php b/lib/xmppqueuehandler.php index 986e09c25e..c8b5ad1fb4 100644 --- a/lib/xmppqueuehandler.php +++ b/lib/xmppqueuehandler.php @@ -91,7 +91,7 @@ class XmppQueueHandler extends QueueHandler if (common_config('xmpp', 'listener')) { return common_config('xmpp', 'listener'); } else { - return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener'; + return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon'; } } } From 49eaa04b508f6e27533f494dedd4997416670bef Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:42:42 -0400 Subject: [PATCH 18/21] return singleton if initialized --- lib/queuemanager.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/queuemanager.php b/lib/queuemanager.php index f36e99d16a..582c247901 100644 --- a/lib/queuemanager.php +++ b/lib/queuemanager.php @@ -57,9 +57,9 @@ class QueueManager } } } - - return self::$qm; } + + return self::$qm; } function enqueue($object, $queue) From 66a4a60e0bb67ba9094cd94be5992c70e5352e54 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:43:18 -0400 Subject: [PATCH 19/21] better debug logging in stomp queue manager --- lib/stompqueuemanager.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index a6bac861b3..5f0b88d8ad 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -75,12 +75,12 @@ class StompQueueManager array ('created' => $notice->created)); if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $transport); + . $notice->id . ' for ' . $queue); } function service($queue, $handler) @@ -101,7 +101,7 @@ class StompQueueManager $notice = Notice::staticGet($frame->body); if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); $this->con->ack($frame); } } From cb019f7aad9c4a618316fb3c2e4a36bc013c8da3 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sat, 4 Jul 2009 01:43:35 -0400 Subject: [PATCH 20/21] don't send unused variable for streams --- classes/Notice.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/classes/Notice.php b/classes/Notice.php index 8a018068ae..5ec0692d90 100644 --- a/classes/Notice.php +++ b/classes/Notice.php @@ -1210,7 +1210,7 @@ class Notice extends Memcached_DataObject $window = explode(',', $laststr); $last_id = $window[0]; $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - $last_id, 0, null, $tag))); + $last_id, 0, null))); $new_window = array_merge($new_ids, $window); @@ -1225,7 +1225,7 @@ class Notice extends Memcached_DataObject } $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW, - 0, 0, null, $tag))); + 0, 0, null))); $windowstr = implode(',', $window); From 23e6dafff6d82492aa7ab2addc2fae99bd609b57 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 5 Jul 2009 11:01:07 -0400 Subject: [PATCH 21/21] better handling of frames and notices --- lib/stompqueuemanager.php | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index 5f0b88d8ad..e7e1e00dd9 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -37,7 +37,6 @@ class StompQueueManager var $password = null; var $base = null; var $con = null; - var $frames = array(); function __construct() { @@ -97,13 +96,19 @@ class StompQueueManager $frame = $this->con->readFrame(); - if ($frame) { - $notice = Notice::staticGet($frame->body); + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); - if ($handler->handle_notice($notice)) { + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice'); + $this->con->ack($frame); + } else if ($handler->handle_notice($notice)) { $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); $this->con->ack($frame); + unset($notice); } + + unset($frame); } $handler->idle(0);