
1069 lines
33 KiB
Raw Normal View History

// This file is part of GNU social -
// GNU social is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// GNU social is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with GNU social. If not, see <>.
* @copyright 2008, 2009 StatusNet, Inc.
* @license GNU AGPL v3 or later
defined('GNUSOCIAL') || die();
class Memcached_DataObject extends Safe_DataObject
* Wrapper for DB_DataObject's static lookup using memcached
* as backing instead of an in-process cache array.
* @param string $cls classname of object type to load
* @param mixed $k key field name, or value for primary key
* @param mixed $v key field value, or leave out for primary key lookup
* @return mixed Memcached_DataObject subtype or false
public static function getClassKV($cls, $k, $v = null)
if (is_null($v)) {
$v = $k;
$keys = static::pkeyCols();
if (count($keys) > 1) {
// FIXME: maybe call pkeyGetClass() ourselves?
throw new Exception('Use pkeyGetClass() for compound primary keys');
$k = $keys[0];
$i = self::getcached($cls, $k, $v);
if ($i === false) { // false == cache miss
$i = new $cls;
$result = $i->get($k, $v);
if ($result) {
// Hit!
} else {
// save the fact that no such row exists
$c = self::memcache();
if (!empty($c)) {
$ck = self::cachekey($cls, $k, $v);
$c->set($ck, null);
$i = false;
return $i;
* Get multiple items from the database by key
* @param string $cls Class to fetch
* @param string $keyCol name of column for key
* @param array $keyVals key values to fetch
* @param bool $skipNulls return only non-null results
* @param bool $preserve return the same tuples as input
* @return object An object with tuples to be fetched, in order
public static function multiGetClass(
string $cls,
string $keyCol,
array $keyVals,
bool $skipNulls,
bool $preserve
): object {
$obj = new $cls();
// Do not select anything extra
$obj->selectAdd($obj->escapedTableName() . '.*');
// A PHP-compatible datatype to check against
$col_type = $obj->columnType($keyCol);
// The code below assumes one of the two results
if (!in_array($col_type, ['int', 'string'])) {
throw new ServerException(
'Cannot do multiGet on anything but integer or string columns'
// Actually need to know if MariaDB or Oracle MySQL this time
$db_type = common_config('db', 'type');
if ($db_type === 'mysql') {
$tmp_obj = new $cls();
$tmp_obj->query('SELECT 0 /*M! + 1 */ AS is_mariadb;');
if ($tmp_obj->fetch() && $tmp_obj->is_mariadb) {
$db_type = 'mariadb';
// Since we're inputting straight to a query: format and escape
$vals_escaped = [];
foreach (array_values($keyVals) as $i => $val) {
if (is_null($val)) {
$val_escaped = 'NULL';
} elseif ($col_type === 'int') {
$val_escaped = (string)(int) $val;
} else {
$val_escaped = "'{$obj->escape($val)}'";
if ($db_type !== 'mariadb') {
$vals_escaped[] = $val_escaped;
} else {
// A completely different approach for MariaDB (see below)
$vals_escaped[] = "({$val_escaped},{$i})";
// One way to guarantee that there is no name collision
$join_tablename = common_database_tablename(
$obj->tableName() . '_vals'
$join_keyword = ($preserve ? 'RIGHT' : 'LEFT') . ' JOIN';
$vals_cast_type = ($col_type === 'int') ? 'INTEGER' : 'TEXT';
// A lot of magic to ensure we get an ordered reply with the same exact
// values as on input.
switch ($db_type) {
case 'pgsql':
// Explicit casting is done to cast empty arrays
$obj->_join = "\n" . sprintf(
{$join_keyword} unnest(
CAST(ARRAY[%s] AS {$vals_cast_type}[])
AS {$join_tablename} ({$keyCol}, {$keyCol}_pos)
USING ({$keyCol})
implode(',', $vals_escaped)
case 'mariadb':
// Delivers an empty set
if (count($vals_escaped) == 0) {
$vals_escaped[] = '(NULL,0) LIMIT 0';
// MariaDB doesn't support JSON_TABLE, but Oracle MySQL does,
// which doesn't support VALUES without a ROW keyword though.
$obj->_join = "\n" . sprintf(
{$join_keyword} (
WITH t1 ({$keyCol}, {$keyCol}_pos) AS (VALUES %s)
) AS {$join_tablename} USING ({$keyCol})
implode(',', $vals_escaped)
case 'mysql':
$obj->_join = "\n" . sprintf(
{$join_keyword} JSON_TABLE(
JSON_ARRAY(%s), '$[*]' COLUMNS (
{$keyCol} {$vals_cast_type} PATH '$',
{$keyCol}_pos FOR ORDINALITY
) AS {$join_tablename} USING ({$keyCol})
implode(',', $vals_escaped)
if (!$preserve) {
// Implements a left semi-join
$obj->whereAdd("{$join_tablename}.{$keyCol}_pos IS NOT NULL");
// Filters both NULLs requested and non-matching NULLs
if ($skipNulls) {
$obj->whereAdd("{$obj->escapedTableName()}.{$keyCol} IS NOT NULL");
return $obj;
* Get multiple items from the database by key
* @param string $cls Class to fetch
* @param string $keyCol name of column for key
* @param array $keyVals key values to fetch
* @param boolean $otherCols Other columns to hold fixed
* @return array Array mapping $keyVals to objects, or null if not found
public static function pivotGetClass(
array $keyVals,
array $otherCols = []
) {
if (is_array($keyCol)) {
foreach ($keyVals as $keyVal) {
if (!is_array($keyVal)) {
throw new ServerException(
'keyVals passed to pivotGet must be an array of arrays '
. 'if keyCol is an array'
$result[implode(',', $keyVal)] = null;
} else {
$result = array_fill_keys($keyVals, null);
$toFetch = array();
foreach ($keyVals as $keyVal) {
if (is_array($keyCol)) {
$kv = array_combine($keyCol, $keyVal);
} else {
$kv = array($keyCol => $keyVal);
$kv = array_merge($otherCols, $kv);
$i = self::multicache($cls, $kv);
if ($i !== false) {
if (is_array($keyCol)) {
$result[implode(',', $keyVal)] = $i;
} else {
$result[$keyVal] = $i;
} elseif (!empty($keyVal)) {
$toFetch[] = $keyVal;
if (count($toFetch) > 0) {
$i = new $cls;
foreach ($otherCols as $otherKeyCol => $otherKeyVal) {
$i->$otherKeyCol = $otherKeyVal;
if (is_array($keyCol)) {
$i->whereAdd(self::_inMultiKey($i, $keyCol, $toFetch));
} else {
$i->whereAddIn($keyCol, $toFetch, $i->columnType($keyCol));
if ($i->find()) {
while ($i->fetch()) {
$copy = clone($i);
if (is_array($keyCol)) {
$vals = array();
foreach ($keyCol as $k) {
$vals[] = $i->$k;
$result[implode(',', $vals)] = $copy;
} else {
$result[$i->$keyCol] = $copy;
// Save state of DB misses
foreach ($toFetch as $keyVal) {
$r = null;
if (is_array($keyCol)) {
$r = $result[implode(',', $keyVal)];
} else {
$r = $result[$keyVal];
if (empty($r)) {
if (is_array($keyCol)) {
$kv = array_combine($keyCol, $keyVal);
} else {
$kv = array($keyCol => $keyVal);
$kv = array_merge($otherCols, $kv);
// save the fact that no such row exists
$c = self::memcache();
if (!empty($c)) {
$ck = self::multicacheKey($cls, $kv);
$c->set($ck, null);
return $result;
public static function _inMultiKey($i, $cols, $values)
$types = array();
foreach ($cols as $col) {
$types[$col] = $i->columnType($col);
$first = true;
$query = '';
foreach ($values as $value) {
if ($first) {
$query .= '( ';
$first = false;
} else {
$query .= ' OR ';
$query .= '( ';
$i = 0;
$firstc = true;
foreach ($cols as $col) {
if (!$firstc) {
$query .= ' AND ';
} else {
$firstc = false;
switch ($types[$col]) {
case 'string':
case 'datetime':
$query .= sprintf("%s = %s", $col, $i->_quote($value[$i]));
$query .= sprintf("%s = %s", $col, $value[$i]);
$query .= ') ';
if (!$first) {
$query .= ' )';
return $query;
public static function pkeyColsClass($cls)
$i = new $cls;
$types = $i->keyTypes();
$pkey = array();
foreach ($types as $key => $type) {
if ($type == 'K' || $type == 'N') {
$pkey[] = $key;
return $pkey;
public static function listFindClass($cls, $keyCol, array $keyVals)
$i = new $cls;
$i->whereAddIn($keyCol, $keyVals, $i->columnType($keyCol));
if (!$i->find()) {
throw new NoResultException($i);
return $i;
public static function listGetClass($cls, $keyCol, array $keyVals)
2011-08-03 06:20:51 +09:00
$pkeyMap = array_fill_keys($keyVals, array());
2011-08-09 01:01:15 +09:00
$result = array_fill_keys($keyVals, array());
$pkeyCols = static::pkeyCols();
2011-08-09 01:01:15 +09:00
$toFetch = array();
$allPkeys = array();
// We only cache keys -- not objects!
foreach ($keyVals as $keyVal) {
$l = self::cacheGet(sprintf('%s:list-ids:%s:%s', strtolower($cls), $keyCol, $keyVal));
if ($l !== false) {
$pkeyMap[$keyVal] = $l;
2011-08-09 01:01:15 +09:00
foreach ($l as $pkey) {
$allPkeys[] = $pkey;
} else {
$toFetch[] = $keyVal;
2011-08-09 01:01:15 +09:00
if (count($allPkeys) > 0) {
2013-08-29 17:13:07 +09:00
$keyResults = self::pivotGetClass($cls, $pkeyCols, $allPkeys);
2011-08-09 01:01:15 +09:00
foreach ($pkeyMap as $keyVal => $pkeyList) {
foreach ($pkeyList as $pkeyVal) {
$i = $keyResults[implode(',', $pkeyVal)];
2011-08-09 01:01:15 +09:00
if (!empty($i)) {
$result[$keyVal][] = $i;
if (count($toFetch) > 0) {
try {
$i = self::listFindClass($cls, $keyCol, $toFetch);
while ($i->fetch()) {
$copy = clone($i);
$result[$i->$keyCol][] = $copy;
$pkeyVal = array();
foreach ($pkeyCols as $pkeyCol) {
$pkeyVal[] = $i->$pkeyCol;
$pkeyMap[$i->$keyCol][] = $pkeyVal;
} catch (NoResultException $e) {
// no results found for our keyVals, so we leave them as empty arrays
foreach ($toFetch as $keyVal) {
sprintf("%s:list-ids:%s:%s", strtolower($cls), $keyCol, $keyVal),
2011-08-03 06:20:51 +09:00
2011-08-09 01:01:15 +09:00
return $result;
public function escapedTableName()
return common_database_tablename($this->tableName());
public function columnType($columnName)
$keys = $this->table();
if (!array_key_exists($columnName, $keys)) {
throw new Exception('Unknown key column ' . $columnName . ' in ' . join(',', array_keys($keys)));
$def = $keys[$columnName];
if ($def & DB_DATAOBJECT_INT) {
return 'int';
} else {
return 'string';
2011-08-03 06:20:51 +09:00
* @todo FIXME: Should this return false on lookup fail to match getKV?
public static function pkeyGetClass($cls, array $kv)
$i = self::multicache($cls, $kv);
if ($i !== false) { // false == cache miss
return $i;
} else {
$i = new $cls;
foreach ($kv as $k => $v) {
if (is_null($v)) {
// XXX: possible SQL injection...? Don't
// pass keys from the browser, eh.
$i->whereAdd("$k is null");
} else {
$i->$k = $v;
if ($i->find(true)) {
} else {
$i = null;
$c = self::memcache();
if (!empty($c)) {
$ck = self::multicacheKey($cls, $kv);
$c->set($ck, null);
return $i;
public function insert()
$result = parent::insert();
if ($result !== false) {
// In case of cached negative lookups
return $result;
public function update($dataObject = false)
if (is_object($dataObject) && $dataObject instanceof Memcached_DataObject) {
$dataObject->decache(); // might be different keys
$result = parent::update($dataObject);
if ($result !== false) {
// Cannot encache yet, so decache instead
return $result;
public function delete($useWhere = false)
$this->decache(); # while we still have the values!
return parent::delete($useWhere);
public static function memcache()
2010-09-06 22:56:45 +09:00
return Cache::instance();
public static function cacheKey($cls, $k, $v)
if (is_object($cls) || is_object($k) || (is_object($v) && !($v instanceof DB_DataObject_Cast))) {
$e = new Exception();
common_log(LOG_ERR, __METHOD__ . ' object in param: ' .
str_replace("\n", " ", $e->getTraceAsString()));
$vstr = self::valueString($v);
2010-09-06 23:07:43 +09:00
return Cache::key(strtolower($cls).':'.$k.':'.$vstr);
public static function getcached($cls, $k, $v)
$c = self::memcache();
if (!$c) {
return false;
} else {
$obj = $c->get(self::cacheKey($cls, $k, $v));
if (0 == strcasecmp($cls, 'User')) {
// Special case for User
if (is_object($obj) && is_object($obj->id)) {
common_log(LOG_ERR, "User " . $obj->nickname . " was cached with User as ID; deleting");
$c->delete(self::cacheKey($cls, $k, $v));
return false;
return $obj;
public function keyTypes()
// ini-based classes return number-indexed arrays. handbuilt
// classes return column => keytype. Make this uniform.
$keys = $this->keys();
$keyskeys = array_keys($keys);
if (is_string($keyskeys[0])) {
return $keys;
if (!isset($_DB_DATAOBJECT['INI'][$this->_database][$this->tableName()."__keys"])) {
return $_DB_DATAOBJECT['INI'][$this->_database][$this->tableName()."__keys"];
public function encache()
if ($this->N < 1) {
// Caching breaks when it is too early.
$e = new Exception();
'DataObject must be the result of a query (N>=1) before encache() '
. str_replace("\n", ' ', $e->getTraceAsString())
return false;
$c = self::memcache();
if (!$c) {
return false;
} elseif ($this->tableName() === 'user' && is_object($this->id)) {
// Special case for User bug
$e = new Exception();
common_log(LOG_ERR, __METHOD__ . ' caching user with User object as ID ' .
str_replace("\n", " ", $e->getTraceAsString()));
return false;
} else {
$keys = $this->_allCacheKeys();
foreach ($keys as $key) {
$c->set($key, $this);
public function decache()
$c = self::memcache();
if (!$c) {
return false;
$keys = $this->_allCacheKeys();
foreach ($keys as $key) {
$c->delete($key, $this);
public function _allCacheKeys()
$ckeys = array();
$types = $this->keyTypes();
$pkey = array();
$pval = array();
foreach ($types as $key => $type) {
if ($type == 'U') {
if (empty($this->$key)) {
$ckeys[] = self::cacheKey($this->tableName(), $key, self::valueString($this->$key));
} elseif (in_array($type, ['K', 'N'])) {
$pkey[] = $key;
$pval[] = self::valueString($this->$key);
} else {
// Low level exception. No need for i18n as discussed with Brion.
throw new Exception("Unknown key type $key => $type for " . $this->tableName());
assert(count($pkey) > 0);
// XXX: should work for both compound and scalar pkeys
$pvals = implode(',', $pval);
$pkeys = implode(',', $pkey);
$ckeys[] = self::cacheKey($this->tableName(), $pkeys, $pvals);
return $ckeys;
public static function multicache($cls, array $kv)
$c = self::memcache();
if (!$c) {
return false;
} else {
return $c->get(self::multicacheKey($cls, $kv));
public static function multicacheKey($cls, array $kv)
$pkeys = implode(',', array_keys($kv));
$pvals = implode(',', array_values($kv));
return self::cacheKey($cls, $pkeys, $pvals);
public function getSearchEngine($table)
2019-08-23 21:36:02 +09:00
require_once INSTALLDIR . '/lib/search/search_engines.php';
2011-04-12 07:59:58 +09:00
if (Event::handle('GetSearchEngine', [$this, $table, &$search_engine])) {
$type = common_config('search', 'type');
if ($type === 'like') {
$search_engine = new SQLLikeSearch($this, $table);
} elseif ($type === 'fulltext') {
switch (common_config('db', 'type')) {
case 'pgsql':
$search_engine = new PostgreSQLSearch($this, $table);
case 'mysql':
$search_engine = new MySQLSearch($this, $table);
throw new ServerException('Unknown DB type selected.');
2011-04-12 07:59:58 +09:00
} else {
// Low level exception. No need for i18n as discussed with Brion.
throw new ServerException('Unknown search type: ' . $type);
2011-04-12 07:59:58 +09:00
return $search_engine;
public static function cachedQuery($cls, $qry, $expiry = 3600)
$c = self::memcache();
if (!$c) {
$inst = new $cls();
return $inst;
2010-09-06 23:03:51 +09:00
$key_part = Cache::keyize($cls).':'.md5($qry);
2010-09-06 23:07:43 +09:00
$ckey = Cache::key($key_part);
$stored = $c->get($ckey);
if ($stored !== false) {
return new ArrayWrapper($stored);
$inst = new $cls();
$cached = array();
while ($inst->fetch()) {
$cached[] = clone($inst);
$c->set($ckey, $cached, Cache::COMPRESSED, $expiry);
2009-01-23 05:16:19 +09:00
return new ArrayWrapper($cached);
* sends query to database - this is the private one that must work
* - internal functions use this rather than $this->query()
* Overridden to do logging.
* @param string $string
* @access private
* @return mixed none or PEAR_Error
public function _query($string)
if (common_config('db', 'annotate_queries')) {
$string = $this->annotateQuery($string);
$start = hrtime(true);
$fail = false;
2010-12-18 04:46:11 +09:00
$result = null;
if (Event::handle('StartDBQuery', array($this, $string, &$result))) {
common_perf_counter('query', $string);
try {
$result = parent::_query($string);
} catch (Exception $e) {
$fail = $e;
2010-12-18 04:46:11 +09:00
Event::handle('EndDBQuery', array($this, $string, &$result));
$delta = (hrtime(true) - $start) / 1000000000;
$limit = common_config('db', 'log_slow_queries');
if (($limit > 0 && $delta >= $limit) || common_config('db', 'log_queries')) {
$clean = $this->sanitizeQuery($string);
if ($fail) {
$msg = sprintf("FAILED DB query (%0.3fs): %s - %s", $delta, $fail->getMessage(), $clean);
} else {
$msg = sprintf("DB query (%0.3fs): %s", $delta, $clean);
common_log(LOG_DEBUG, $msg);
if ($fail) {
throw $fail;
return $result;
* Find the first caller in the stack trace that's not a
* low-level database function and add a comment to the
* query string. This should then be visible in process lists
* and slow query logs, to help identify problem areas.
* Also marks whether this was a web GET/POST or which daemon
* was running it.
* @param string $string SQL query string
* @return string SQL query string, with a comment in it
public function annotateQuery($string)
$ignore = array('annotateQuery',
$ignoreStatic = array('getKV',
$here = get_class($this); // if we get confused
$bt = debug_backtrace();
// Find the first caller that's not us?
foreach ($bt as $frame) {
$func = $frame['function'];
if (isset($frame['type']) && $frame['type'] == '::') {
if (in_array($func, $ignoreStatic)) {
$here = $frame['class'] . '::' . $func;
} elseif (isset($frame['type']) && $frame['type'] === '->') {
if ($frame['object'] === $this && in_array($func, $ignore)) {
if (in_array($func, $ignoreStatic)) {
continue; // @todo FIXME: This shouldn't be needed?
$here = get_class($frame['object']) . '->' . $func;
$here = $func;
if (php_sapi_name() == 'cli') {
$context = basename($_SERVER['PHP_SELF']);
} else {
$context = $_SERVER['REQUEST_METHOD'];
// Slip the comment in after the first command,
// or DB_DataObject gets confused about handling inserts and such.
$parts = explode(' ', $string, 2);
$parts[0] .= " /* $context $here */";
return implode(' ', $parts);
// Sanitize a query for logging
// @fixme don't trim spaces in string literals
public function sanitizeQuery($string)
$string = preg_replace('/\s+/', ' ', $string);
$string = trim($string);
return $string;
// We overload so that 'SET NAMES "utf8mb4"' is called for
// each connection
public function _connect()
$sum = $this->_getDbDsnMD5();
if (!empty($_DB_DATAOBJECT['CONNECTIONS'][$sum]) &&
!$_PEAR->isError($_DB_DATAOBJECT['CONNECTIONS'][$sum])) {
$exists = true;
} else {
$exists = false;
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch:
2010-01-13 12:57:15 +09:00
// @fixme horrible evil hack!
// In multisite configuration we don't want to keep around a separate
// connection for every database; we could end up with thousands of
// connections open per thread. In an ideal world we might keep
// a connection per server and select different databases, but that'd
// be reliant on having the same db username/pass as well.
// MySQL connections are cheap enough we're going to try just
// closing out the old connection and reopening when we encounter
// a new DSN.
// WARNING WARNING if we end up actually using multiple DBs at a time
// we'll need some fancier logic here.
if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS']) && php_sapi_name() == 'cli') {
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch:
2010-01-13 12:57:15 +09:00
foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $index => $conn) {
if ($_PEAR->isError($conn)) {
common_log(LOG_WARNING, __METHOD__ . " cannot disconnect failed DB connection: '".$conn->getMessage()."'.");
} elseif (!empty($conn)) {
Major refactoring of queue handlers to support running multiple sites in one daemon. Key changes: * Initialization code moved from common.php to StatusNet class; can now switch configurations during runtime. * As a consequence, configuration files must now be idempotent... Be careful with constant, function or class definitions. * Control structure for daemons/QueueManager/QueueHandler has been refactored; the run loop is now managed by IoMaster run via scripts/queuedaemon.php IoManager subclasses are woken to handle socket input or polling, and may cover multiple sites. * Plugins can implement notice queue handlers more easily by registering a QueueHandler class; no more need to add a daemon. The new QueueDaemon runs from scripts/queuedaemon.php: * This replaces most of the old *handler.php scripts; they've been refactored to the bare handler classes. * Spawns multiple child processes to spread load; defaults to CPU count on Linux and Mac OS X systems, or override with --threads=N * When multithreaded, child processes are automatically respawned on failure. * Threads gracefully shut down and restart when passing a soft memory limit (defaults to 90% of memory_limit), limiting damage from memory leaks. * Support for UDP-based monitoring: Rough control flow diagram: QueueDaemon -> IoMaster -> IoManager QueueManager [listen or poll] -> QueueHandler XmppManager [ping & keepalive] XmppConfirmManager [poll updates] Todo: * Respawning features not currently available running single-threaded. * When running single-site, configuration changes aren't picked up. * New sites or config changes affecting queue subscriptions are not yet handled without a daemon restart. * SNMP monitoring output to integrate with general tools (nagios, ganglia) * Convert XMPP confirmation message sends to use stomp queue instead of polling * Convert xmppdaemon.php to IoManager? * Convert Twitter status, friends import polling daemons to IoManager * Clean up some error reporting and failure modes * May need to adjust queue priorities for best perf in backlog/flood cases Detailed code history available in my daemon-work branch:
2010-01-13 12:57:15 +09:00
$result = parent::_connect();
if ($result && !$exists) {
// Required to make timestamp values usefully comparable.
// And set the character set to UTF-8.
if (common_config('db', 'type') !== 'mysql') {
parent::_query("SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE");
parent::_query("SET NAMES 'UTF8'");
} else {
parent::_query("SET time_zone = '+0:00'");
parent::_query("SET NAMES 'utf8mb4'");
return $result;
// XXX: largely cadged from DB_DataObject
public function _getDbDsnMD5()
if ($this->_database_dsn_md5) {
return $this->_database_dsn_md5;
$dsn = $this->_getDbDsn();
if (is_string($dsn)) {
$sum = md5($dsn);
} else {
/// support array based dsn's
$sum = md5(serialize($dsn));
return $sum;
public function _getDbDsn()
if (empty($_DB_DATAOBJECT['CONFIG'])) {
$options = &$_DB_DATAOBJECT['CONFIG'];
// if the databse dsn dis defined in the object..
$dsn = isset($this->_database_dsn) ? $this->_database_dsn : null;
if (!$dsn) {
if (!$this->_database) {
$this->_database = isset($options["table_{$this->tableName()}"]) ? $options["table_{$this->tableName()}"] : null;
if ($this->_database && !empty($options["database_{$this->_database}"])) {
$dsn = $options["database_{$this->_database}"];
} elseif (!empty($options['database'])) {
$dsn = $options['database'];
if (!$dsn) {
// TRANS: Exception thrown when database name or Data Source Name could not be found.
throw new Exception(_('No database name or DSN found anywhere.'));
return $dsn;
Offload inbox updates to a queue handler to speed up posting online Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou <> Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou <> Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou <> Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou <> Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou <> Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou <> Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou <> Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou <> Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler
2010-01-26 08:08:21 +09:00
public static function blow()
Offload inbox updates to a queue handler to speed up posting online Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou <> Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou <> Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou <> Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou <> Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou <> Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou <> Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou <> Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou <> Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler
2010-01-26 08:08:21 +09:00
$c = self::memcache();
if (empty($c)) {
return false;
$args = func_get_args();
$format = array_shift($args);
$keyPart = vsprintf($format, $args);
2010-09-06 23:07:43 +09:00
$cacheKey = Cache::key($keyPart);
Offload inbox updates to a queue handler to speed up posting online Moved much of the writing that happens when posting a notice to a new queuehandler, distribqueuehandler. This updates tags, groups, replies and inboxes at queue time (or at Web time, if queues are disabled). To make this work well, I had to break up the monolithic Notice::blowCaches() and make cache blowing happen closer to where data is updated. Squashed commit of the following: commit 5257626c62750ac4ac1db0ce2b71410c5711cfa3 Author: Evan Prodromou <> Date: Mon Jan 25 14:56:41 2010 -0500 slightly better handling of blowing tag memory cache commit 8a22a3cdf6ec28685da129a0313e7b2a0837c9ef Author: Evan Prodromou <> Date: Mon Jan 25 01:42:56 2010 -0500 change 'distribute' to 'distrib' so not too long for dbqueue commit 7a063315b0f7fad27cb6fbd2bdd74e253af83e4f Author: Evan Prodromou <> Date: Mon Jan 25 01:39:15 2010 -0500 change handle_notice() to handle() in distributqueuehandler commit 1a39ccd28b9994137d7bfd21bb4f230546938e77 Author: Evan Prodromou <> Date: Mon Jan 25 16:05:25 2010 -0500 error with queuemanager commit e6b3bb93f305cfd2de71a6340b8aa6fb890049b7 Author: Evan Prodromou <> Date: Mon Jan 25 01:11:34 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 94d557cdc016187d1d0647ae1794cd94d6fb8ac8 Author: Evan Prodromou <> Date: Mon Jan 25 00:48:44 2010 -0500 Blow memcache at different point rather than one big function for Notice class commit 1c781dd08c88a35dafc5c01230b4872fd6b95182 Author: Evan Prodromou <> Date: Wed Jan 20 08:54:18 2010 -0500 move broadcasting and distributing to new queuehandler commit da3e46d26b84e4f028f34a13fd2ee373e4c1b954 Author: Evan Prodromou <> Date: Wed Jan 20 08:53:12 2010 -0500 Move distribution of notices to new distribute queue handler
2010-01-26 08:08:21 +09:00
return $c->delete($cacheKey);
public function raiseError($message, $type = null, $behavior = null)
2010-01-29 06:26:55 +09:00
$id = get_class($this);
if (!empty($this->id)) {
$id .= ':' . $this->id;
if ($message instanceof PEAR_Error) {
$message = $message->getMessage();
// Low level exception. No need for i18n as discussed with Brion.
throw new ServerException("[$id] DB_DataObject error [$type]: $message");
2010-01-29 06:26:55 +09:00
public static function cacheGet($keyPart)
$c = self::memcache();
if (empty($c)) {
return false;
2010-09-06 23:07:43 +09:00
$cacheKey = Cache::key($keyPart);
return $c->get($cacheKey);
public static function cacheSet($keyPart, $value, $flag = null, $expiry = null)
$c = self::memcache();
if (empty($c)) {
return false;
2010-09-06 23:07:43 +09:00
$cacheKey = Cache::key($keyPart);
return $c->set($cacheKey, $value, $flag, $expiry);
public static function valueString($v)
$vstr = null;
if (is_object($v) && $v instanceof DB_DataObject_Cast) {
switch ($v->type) {
case 'date':
$vstr = "{$v->year} - {$v->month} - {$v->day}";
case 'sql':
if (strcasecmp($v->value, 'NULL') == 0) {
// Very selectively handling NULLs.
$vstr = '';
// no break
case 'blob':
case 'string':
case 'datetime':
case 'time':
// Low level exception. No need for i18n as discussed with Brion.
throw new ServerException("Unhandled DB_DataObject_Cast type passed as cacheKey value: '$v->type'");
// Low level exception. No need for i18n as discussed with Brion.
throw new ServerException("Unknown DB_DataObject_Cast type passed as cacheKey value: '$v->type'");
} else {
$vstr = strval($v);
return $vstr;