Allow for instances as well as class names to be passed as queue handlers and iomanagers.

Introduce IoManager::GLOBAL_SINGLE_ONLY which indicates that only one instance of this iomanager will be run, regardless of how many threads/processes and sites there are.
This commit is contained in:
Craig Andrews 2010-01-15 22:28:43 -05:00
parent 77ea02cac3
commit bd72e8b96e
4 changed files with 32 additions and 13 deletions

View File

@ -31,6 +31,7 @@
abstract class IoManager abstract class IoManager
{ {
const GLOBAL_SINGLE_ONLY = -1;
const SINGLE_ONLY = 0; const SINGLE_ONLY = 0;
const INSTANCE_PER_SITE = 1; const INSTANCE_PER_SITE = 1;
const INSTANCE_PER_PROCESS = 2; const INSTANCE_PER_PROCESS = 2;

View File

@ -32,6 +32,7 @@ class IoMaster
public $id; public $id;
protected $multiSite = false; protected $multiSite = false;
protected $includeGlobalSingletons = true;
protected $managers = array(); protected $managers = array();
protected $singletons = array(); protected $singletons = array();
@ -47,8 +48,9 @@ class IoMaster
$this->monitor = new QueueMonitor(); $this->monitor = new QueueMonitor();
} }
public function init($multiSite=null) public function init($multiSite=null, $includeGlobalSingletons = true)
{ {
$this->includeGlobalSingletons = $includeGlobalSingletons;
if ($multiSite !== null) { if ($multiSite !== null) {
$this->multiSite = $multiSite; $this->multiSite = $multiSite;
} }
@ -107,7 +109,7 @@ class IoMaster
*/ */
protected function instantiate($class) protected function instantiate($class)
{ {
if (isset($this->singletons[$class])) { if (is_string($class) && isset($this->singletons[$class])) {
// Already instantiated a multi-site-capable handler. // Already instantiated a multi-site-capable handler.
// Just let it know it should listen to this site too! // Just let it know it should listen to this site too!
$this->singletons[$class]->addSite(common_config('site', 'server')); $this->singletons[$class]->addSite(common_config('site', 'server'));
@ -116,25 +118,34 @@ class IoMaster
$manager = $this->getManager($class); $manager = $this->getManager($class);
$caps = $manager->multiSite();
if ($this->multiSite) { if ($this->multiSite) {
$caps = $manager->multiSite();
if ($caps == IoManager::SINGLE_ONLY) { if ($caps == IoManager::SINGLE_ONLY) {
throw new Exception("$class can't run with --all; aborting."); throw new Exception("$class can't run with --all; aborting.");
} }
if ($caps == IoManager::INSTANCE_PER_PROCESS) { if ($caps == IoManager::INSTANCE_PER_PROCESS ||
( $this->includeGlobalSingletons && $caps == IoManager::GLOBAL_SINGLE_ONLY )) {
// Save this guy for later! // Save this guy for later!
// We'll only need the one to cover multiple sites. // We'll only need the one to cover multiple sites.
$this->singletons[$class] = $manager; if (is_string($class)){
$this->singletons[$class] = $manager;
}
$manager->addSite(common_config('site', 'server')); $manager->addSite(common_config('site', 'server'));
} }
} }
$this->managers[] = $manager; if( $this->includeGlobalSingletons || $caps != IoManager::GLOBAL_SINGLE_ONLY ) {
$this->managers[] = $manager;
}
} }
protected function getManager($class) protected function getManager($class)
{ {
return call_user_func(array($class, 'get')); if(is_object($class)){
return $class;
}else{
return call_user_func(array($class, 'get'));
}
} }
/** /**

View File

@ -119,7 +119,9 @@ abstract class QueueManager extends IoManager
{ {
if (isset($this->handlers[$queue])) { if (isset($this->handlers[$queue])) {
$class = $this->handlers[$queue]; $class = $this->handlers[$queue];
if (class_exists($class)) { if(is_object($class)) {
return $class;
} else if (class_exists($class)) {
return new $class(); return new $class();
} else { } else {
common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'"); common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
@ -182,7 +184,7 @@ abstract class QueueManager extends IoManager
* Only registered transports will be reliably picked up! * Only registered transports will be reliably picked up!
* *
* @param string $transport * @param string $transport
* @param string $class * @param string $class class name or object instance
*/ */
public function connect($transport, $class) public function connect($transport, $class)
{ {

View File

@ -122,7 +122,7 @@ class QueueDaemon extends Daemon
if ($this->threads > 1) { if ($this->threads > 1) {
return $this->runThreads(); return $this->runThreads();
} else { } else {
return $this->runLoop(); return $this->runLoop(true);
} }
} }
@ -176,7 +176,8 @@ class QueueDaemon extends Daemon
{ {
$this->set_id($this->get_id() . "." . $thread); $this->set_id($this->get_id() . "." . $thread);
$this->resetDb(); $this->resetDb();
$this->runLoop(); //only include global singletons on the first thread
$this->runLoop($thread == 1);
} }
/** /**
@ -213,14 +214,18 @@ class QueueDaemon extends Daemon
* *
* Most of the time this won't need to be overridden in a subclass. * Most of the time this won't need to be overridden in a subclass.
* *
* @param boolean $includeGlobalSingletons Include IoManagers that are
* global singletons (should only be one instance - regardless of how
* many processes or sites there are)
*
* @return boolean true on success, false on failure * @return boolean true on success, false on failure
*/ */
function runLoop() function runLoop($includeGlobalSingletons)
{ {
$this->log(LOG_INFO, 'checking for queued notices'); $this->log(LOG_INFO, 'checking for queued notices');
$master = new IoMaster($this->get_id()); $master = new IoMaster($this->get_id());
$master->init($this->all); $master->init($this->all, $includeGlobalSingletons);
$master->service(); $master->service();
$this->log(LOG_INFO, 'finished servicing the queue'); $this->log(LOG_INFO, 'finished servicing the queue');