Detect when queuedaemon/xmppdaemon parent processes die and kill the child processes.
Keeps stray daemon subprocesses from floating around when we kill the parents via a signal! Accomplished by opening a bidirectional pipe in the parent process; the children close out the writer end and keep the reader in their open sockets list. When the parent dies, the children see that the socket's been closed out and can perform an orderly shutdown.
This commit is contained in:
parent
55e8473a7a
commit
532e486a93
84
lib/processmanager.php
Normal file
84
lib/processmanager.php
Normal file
|
@ -0,0 +1,84 @@
|
|||
<?php
|
||||
/**
|
||||
* StatusNet, the distributed open-source microblogging tool
|
||||
*
|
||||
* i/o manager to watch for a dead parent process
|
||||
*
|
||||
* PHP version 5
|
||||
*
|
||||
* LICENCE: This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @category QueueManager
|
||||
* @package StatusNet
|
||||
* @author Brion Vibber <brion@status.net>
|
||||
* @copyright 2010 StatusNet, Inc.
|
||||
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
|
||||
* @link http://status.net/
|
||||
*/
|
||||
|
||||
class ProcessManager extends IoManager
|
||||
{
|
||||
protected $socket;
|
||||
|
||||
public static function get()
|
||||
{
|
||||
throw new Exception("Must pass ProcessManager per-instance");
|
||||
}
|
||||
|
||||
public function __construct($socket)
|
||||
{
|
||||
$this->socket = $socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell the i/o queue master if and how we can handle multi-site
|
||||
* processes.
|
||||
*
|
||||
* Return one of:
|
||||
* IoManager::SINGLE_ONLY
|
||||
* IoManager::INSTANCE_PER_SITE
|
||||
* IoManager::INSTANCE_PER_PROCESS
|
||||
*/
|
||||
public static function multiSite()
|
||||
{
|
||||
return IoManager::INSTANCE_PER_PROCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* We won't get any input on it, but if it's broken we'll
|
||||
* know something's gone horribly awry.
|
||||
*
|
||||
* @return array of resources
|
||||
*/
|
||||
function getSockets()
|
||||
{
|
||||
return array($this->socket);
|
||||
}
|
||||
|
||||
/**
|
||||
* See if the parent died and request a shutdown...
|
||||
*
|
||||
* @param resource $socket
|
||||
* @return boolean success
|
||||
*/
|
||||
function handleInput($socket)
|
||||
{
|
||||
if (feof($socket)) {
|
||||
common_log(LOG_INFO, "Parent process exited; shutting down child.");
|
||||
$this->master->requestShutdown();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
@ -71,6 +71,8 @@ abstract class SpawningDaemon extends Daemon
|
|||
*/
|
||||
function run()
|
||||
{
|
||||
$this->initPipes();
|
||||
|
||||
$children = array();
|
||||
for ($i = 1; $i <= $this->threads; $i++) {
|
||||
$pid = pcntl_fork();
|
||||
|
@ -128,6 +130,34 @@ abstract class SpawningDaemon extends Daemon
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an IPC socket pair which child processes can use to detect
|
||||
* if the parent process has been killed.
|
||||
*/
|
||||
function initPipes()
|
||||
{
|
||||
$sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
|
||||
if ($sockets) {
|
||||
$this->parentWriter = $sockets[0];
|
||||
$this->parentReader = $sockets[1];
|
||||
} else {
|
||||
$this->log(LOG_ERROR, "Couldn't create inter-process sockets");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an IOManager that simply ensures that we have a connection
|
||||
* to the parent process open. If it breaks, the child process will
|
||||
* die.
|
||||
*
|
||||
* @return ProcessManager
|
||||
*/
|
||||
public function processManager()
|
||||
{
|
||||
return new ProcessManager($this->parentReader);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether to respawn an exited subprocess based on its exit code.
|
||||
* Otherwise we'll respawn all exits by default.
|
||||
|
@ -152,6 +182,8 @@ abstract class SpawningDaemon extends Daemon
|
|||
*/
|
||||
protected function initAndRunChild($thread)
|
||||
{
|
||||
// Close the writer end of our parent<->children pipe.
|
||||
fclose($this->parentWriter);
|
||||
$this->set_id($this->get_id() . "." . $thread);
|
||||
$this->resetDb();
|
||||
$exitCode = $this->runThread();
|
||||
|
|
|
@ -105,7 +105,7 @@ class QueueDaemon extends SpawningDaemon
|
|||
{
|
||||
$this->log(LOG_INFO, 'checking for queued notices');
|
||||
|
||||
$master = new QueueMaster($this->get_id());
|
||||
$master = new QueueMaster($this->get_id(), $this->processManager());
|
||||
$master->init($this->allsites);
|
||||
try {
|
||||
$master->service();
|
||||
|
@ -125,6 +125,14 @@ class QueueDaemon extends SpawningDaemon
|
|||
|
||||
class QueueMaster extends IoMaster
|
||||
{
|
||||
protected $processManager;
|
||||
|
||||
function __construct($id, $processManager)
|
||||
{
|
||||
parent::__construct($id);
|
||||
$this->processManager = $processManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize IoManagers which are appropriate to this instance.
|
||||
*/
|
||||
|
@ -135,6 +143,7 @@ class QueueMaster extends IoMaster
|
|||
$qm = QueueManager::get();
|
||||
$qm->setActiveGroup('main');
|
||||
$managers[] = $qm;
|
||||
$managers[] = $this->processManager;
|
||||
}
|
||||
Event::handle('EndQueueDaemonIoManagers', array(&$managers));
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ class XMPPDaemon extends SpawningDaemon
|
|||
{
|
||||
common_log(LOG_INFO, 'Waiting to listen to XMPP and queues');
|
||||
|
||||
$master = new XmppMaster($this->get_id());
|
||||
$master = new XmppMaster($this->get_id(), $this->processManager());
|
||||
$master->init($this->allsites);
|
||||
$master->service();
|
||||
|
||||
|
@ -68,6 +68,14 @@ class XMPPDaemon extends SpawningDaemon
|
|||
|
||||
class XmppMaster extends IoMaster
|
||||
{
|
||||
protected $processManager;
|
||||
|
||||
function __construct($id, $processManager)
|
||||
{
|
||||
parent::__construct($id);
|
||||
$this->processManager = $processManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize IoManagers for the currently configured site
|
||||
* which are appropriate to this instance.
|
||||
|
@ -79,6 +87,7 @@ class XmppMaster extends IoMaster
|
|||
$qm->setActiveGroup('xmpp');
|
||||
$this->instantiate($qm);
|
||||
$this->instantiate(XmppManager::get());
|
||||
$this->instantiate($this->processManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user