[StompQueue] Changed StompQueueManager to use polling rather than sockets
This commit is contained in:
parent
7cdd64f594
commit
b0e10f01cb
42
plugins/StompQueue/README
Normal file
42
plugins/StompQueue/README
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
StompQueuePlugin wraps the StompQueueManager class which is a queue manager
|
||||||
|
that uses STOMP as a communication method to some form of backing storage.
|
||||||
|
|
||||||
|
Installation
|
||||||
|
============
|
||||||
|
|
||||||
|
This plugin is replaces other queue manager plugins, such as UnQueue,
|
||||||
|
which enabled by default and which should, but is not required to be
|
||||||
|
disabled.
|
||||||
|
|
||||||
|
addPlugin('StompQueue', ['servers' => ['your-redis-instance-and-port'],
|
||||||
|
'vhost' => 'your-vhost',
|
||||||
|
'username' => 'your-username',
|
||||||
|
'password' => 'your-password']);
|
||||||
|
|
||||||
|
Options
|
||||||
|
=======
|
||||||
|
|
||||||
|
servers (default: null) - array of server addresses to use
|
||||||
|
vhost (default: '') - configured vhost -- required
|
||||||
|
username (default: 'guest') -- configured username -- don't use the default
|
||||||
|
password (default: 'guest') -- configured password -- don't use the default
|
||||||
|
basename (default: "queue:gnusocial-{$site_name}") -- prefix for all queue names,
|
||||||
|
useful to avoid collisions. Cannot contain `/`
|
||||||
|
control (default: 'gnusocial:control') -- control channel name. Cannot contain `/`
|
||||||
|
breakout (default: null) -- array of queue names which should be broken out into a previously unused server
|
||||||
|
useTransactions (default: false) -- whether to use transactions, allowing rollbacks in case of failure
|
||||||
|
useAcks (default: false) -- whether to explicitly use acknowledgements when receiving a message.
|
||||||
|
Usefull to avoid timeouts and possibly reduce load on the STOMP server
|
||||||
|
manualFailover (default: false) -- whether to coordinate failover in PHP or to let all servers act
|
||||||
|
as one coordinated unit
|
||||||
|
defaultIdx (default: 0) -- index in the servers array which is used by default. Will be updated in case of an error
|
||||||
|
persistent (default: []) -- list of queues which should be persistent
|
||||||
|
|
||||||
|
Example
|
||||||
|
=======
|
||||||
|
|
||||||
|
In config.php
|
||||||
|
|
||||||
|
addPlugin('StompQueue', ['servers' => 'tcp://localhost:61613', 'vhost' => '/',
|
||||||
|
// Please don't actually use the default credentials
|
||||||
|
'username' => 'guest', 'password' => 'guest']);
|
|
@ -36,7 +36,7 @@ class StompQueuePlugin extends Plugin
|
||||||
public $password = 'guest';
|
public $password = 'guest';
|
||||||
public $basename = '';
|
public $basename = '';
|
||||||
public $control = 'gnusocial:control';
|
public $control = 'gnusocial:control';
|
||||||
public $breakout;
|
public $breakout = [];
|
||||||
public $useTransactions = false;
|
public $useTransactions = false;
|
||||||
public $useAcks = false;
|
public $useAcks = false;
|
||||||
public $manualFailover = false;
|
public $manualFailover = false;
|
||||||
|
@ -62,7 +62,7 @@ class StompQueuePlugin extends Plugin
|
||||||
public function onPluginVersion(array &$versions): bool
|
public function onPluginVersion(array &$versions): bool
|
||||||
{
|
{
|
||||||
$versions[] = array('name' => 'StompQueue',
|
$versions[] = array('name' => 'StompQueue',
|
||||||
'version' => self::VERSION,
|
'version' => self::PLUGIN_VERSION,
|
||||||
'author' => 'Miguel Dantas',
|
'author' => 'Miguel Dantas',
|
||||||
'description' =>
|
'description' =>
|
||||||
// TRANS: Plugin description.
|
// TRANS: Plugin description.
|
||||||
|
|
|
@ -35,7 +35,6 @@ class StompQueueManager extends QueueManager
|
||||||
public function start($master)
|
public function start($master)
|
||||||
{
|
{
|
||||||
parent::start($master);
|
parent::start($master);
|
||||||
common_debug("Starting STOMP queue manager");
|
|
||||||
return $this->_ensureConn();
|
return $this->_ensureConn();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +171,7 @@ class StompQueueManager extends QueueManager
|
||||||
*/
|
*/
|
||||||
protected function subscribe(StatefulStomp $st)
|
protected function subscribe(StatefulStomp $st)
|
||||||
{
|
{
|
||||||
|
$this->_ensureConn();
|
||||||
$host = $st->getClient()->getConnection()->getHost();
|
$host = $st->getClient()->getConnection()->getHost();
|
||||||
foreach ($this->subscriptions() as $sub) {
|
foreach ($this->subscriptions() as $sub) {
|
||||||
if (!in_array($sub, $this->subscriptions)) {
|
if (!in_array($sub, $this->subscriptions)) {
|
||||||
|
@ -293,40 +293,19 @@ class StompQueueManager extends QueueManager
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send any sockets we're listening on to the IO manager
|
* Poll every 10 seconds for new events during idle periods.
|
||||||
* to wait for input.
|
* We'll look in more often when there's data available.
|
||||||
|
* Must be greater than 0 for the poll method to be called
|
||||||
*
|
*
|
||||||
* @return array of resources
|
* @return int seconds
|
||||||
*/
|
*/
|
||||||
public function getSockets()
|
public function pollInterval()
|
||||||
{
|
{
|
||||||
$sockets = [];
|
return 10;
|
||||||
foreach ($this->stomps as $st) {
|
|
||||||
if ($st) {
|
|
||||||
$sockets[] = $st->getClient()->getConnection();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return $sockets;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the Stomp connection object associated with the given socket.
|
* Poll a queue and Handle an event
|
||||||
* @param resource $socket
|
|
||||||
* @return int index into connections list
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
protected function connectionFromSocket($socket)
|
|
||||||
{
|
|
||||||
foreach ($this->stomps as $i => $st) {
|
|
||||||
if ($st && $st->getConnection() === $socket) {
|
|
||||||
return $i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Handle and acknowledge an event that's come in through a queue.
|
|
||||||
*
|
*
|
||||||
* If the queue handler reports failure, the message is requeued for later.
|
* If the queue handler reports failure, the message is requeued for later.
|
||||||
* Missing notices or handler classes will drop the message.
|
* Missing notices or handler classes will drop the message.
|
||||||
|
@ -334,15 +313,22 @@ class StompQueueManager extends QueueManager
|
||||||
* Side effects: in multi-site mode, may reset site configuration to
|
* Side effects: in multi-site mode, may reset site configuration to
|
||||||
* match the site that queued the event.
|
* match the site that queued the event.
|
||||||
*
|
*
|
||||||
* @param Frame $frame
|
|
||||||
* @return bool success
|
* @return bool success
|
||||||
* @throws ConfigException
|
* @throws ConfigException
|
||||||
* @throws NoConfigException
|
* @throws NoConfigException
|
||||||
* @throws ServerException
|
* @throws ServerException
|
||||||
* @throws StompException
|
* @throws StompException
|
||||||
*/
|
*/
|
||||||
protected function handleItem($frame): bool
|
public function poll(): bool
|
||||||
{
|
{
|
||||||
|
$this->_ensureConn();
|
||||||
|
|
||||||
|
$frame = $this->stomps[$this->pl->defaultIdx]->getClient()->readFrame();
|
||||||
|
|
||||||
|
if (empty($frame)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
$host = $this->stomps[$this->pl->defaultIdx]->getHost();
|
$host = $this->stomps[$this->pl->defaultIdx]->getHost();
|
||||||
$message = unserialize(base64_decode($frame->body));
|
$message = unserialize(base64_decode($frame->body));
|
||||||
|
|
||||||
|
@ -396,7 +382,7 @@ class StompQueueManager extends QueueManager
|
||||||
$this->stats('requeued', $queue);
|
$this->stats('requeued', $queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
return $ok;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -584,7 +570,7 @@ class StompQueueManager extends QueueManager
|
||||||
* @throws NoConfigException
|
* @throws NoConfigException
|
||||||
* @throws ServerException
|
* @throws ServerException
|
||||||
*/
|
*/
|
||||||
function switchSite($site)
|
private function switchSite(string $site): void
|
||||||
{
|
{
|
||||||
if ($site != GNUsocial::currentSite()) {
|
if ($site != GNUsocial::currentSite()) {
|
||||||
$this->stats('switch');
|
$this->stats('switch');
|
||||||
|
@ -601,12 +587,12 @@ class StompQueueManager extends QueueManager
|
||||||
* files might not.
|
* files might not.
|
||||||
*
|
*
|
||||||
* @param $nickname
|
* @param $nickname
|
||||||
* @return void true to continue; false to stop further processing.
|
* @return bool true to continue; false to stop further processing.
|
||||||
* @throws ConfigException
|
* @throws ConfigException
|
||||||
* @throws NoConfigException
|
* @throws NoConfigException
|
||||||
* @throws ServerException
|
* @throws ServerException
|
||||||
*/
|
*/
|
||||||
protected function updateSiteConfig($nickname)
|
protected function updateSiteConfig($nickname): bool
|
||||||
{
|
{
|
||||||
$sn = Status_network::getKV('nickname', $nickname);
|
$sn = Status_network::getKV('nickname', $nickname);
|
||||||
if ($sn) {
|
if ($sn) {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user