Basic function to store forwards and redistribute

This commit is contained in:
Evan Prodromou 2009-12-08 16:30:33 -05:00
parent 6bc6af667e
commit 72c82a2e29
3 changed files with 106 additions and 34 deletions

View File

@ -42,4 +42,68 @@ class Forward extends Memcached_DataObject
/* the code above is auto generated do not remove the tag below */
###END_AUTOCODE
static function saveNew($profile_id, $notice_id)
{
$forward = new Forward();
$forward->profile_id = $profile_id;
$forward->notice_id = $notice_id;
$forward->created = common_sql_now();
$forward->query('BEGIN');
if (!$forward->insert()) {
throw new ServerException(_("Couldn't insert forward."));
}
$ni = $forward->addToInboxes();
$forward->query('COMMIT');
$forward->blowCache($ni);
}
function addToInboxes()
{
$inbox = new Notice_inbox();
$user = new User();
$user->query('SELECT id FROM user JOIN subscription ON user.id = subscription.subscriber '.
'WHERE subscription.subscribed = '.$this->profile_id);
$ni = array();
while ($user->fetch()) {
$inbox = Notice_inbox::pkeyGet(array('user_id' => $user->id,
'notice_id' => $this->notice_id));
if (empty($inbox)) {
$ni[$user->id] = NOTICE_INBOX_SOURCE_FORWARD;
} else {
$inbox->free();
}
}
$user->free();
Notice_inbox::bulkInsert($this->notice_id, $this->created, $ni);
return $ni;
}
function blowCache($ni)
{
$cache = common_memcache();
if (!empty($cache)) {
foreach ($ni as $id => $source) {
$cache->delete(common_cache_key('notice_inbox:by_user:'.$id));
$cache->delete(common_cache_key('notice_inbox:by_user_own:'.$id));
$cache->delete(common_cache_key('notice_inbox:by_user:'.$id.';last'));
$cache->delete(common_cache_key('notice_inbox:by_user_own:'.$id.';last'));
}
}
}
}

View File

@ -948,39 +948,7 @@ class Notice extends Memcached_DataObject
}
}
$cnt = 0;
$qryhdr = 'INSERT INTO notice_inbox (user_id, notice_id, source, created) VALUES ';
$qry = $qryhdr;
foreach ($ni as $id => $source) {
if ($cnt > 0) {
$qry .= ', ';
}
$qry .= '('.$id.', '.$this->id.', '.$source.", '".$this->created. "') ";
$cnt++;
if (rand() % NOTICE_INBOX_SOFT_LIMIT == 0) {
// FIXME: Causes lag in replicated servers
// Notice_inbox::gc($id);
}
if ($cnt >= MAX_BOXCARS) {
$inbox = new Notice_inbox();
$result = $inbox->query($qry);
if (PEAR::isError($result)) {
common_log_db_error($inbox, $qry);
}
$qry = $qryhdr;
$cnt = 0;
}
}
if ($cnt > 0) {
$inbox = new Notice_inbox();
$result = $inbox->query($qry);
if (PEAR::isError($result)) {
common_log_db_error($inbox, $qry);
}
}
Notice_inbox::bulkInsert($this->id, $this->created, $ni);
return;
}

View File

@ -32,6 +32,7 @@ define('NOTICE_INBOX_SOFT_LIMIT', 1000);
define('NOTICE_INBOX_SOURCE_SUB', 1);
define('NOTICE_INBOX_SOURCE_GROUP', 2);
define('NOTICE_INBOX_SOURCE_REPLY', 3);
define('NOTICE_INBOX_SOURCE_FORWARD', 4);
define('NOTICE_INBOX_SOURCE_GATEWAY', -1);
class Notice_inbox extends Memcached_DataObject
@ -83,7 +84,7 @@ class Notice_inbox extends Memcached_DataObject
$inbox->whereAdd('created > \'' . date('Y-m-d H:i:s', $since) . '\'');
}
$inbox->orderBy('notice_id DESC');
$inbox->orderBy('created DESC');
if (!is_null($offset)) {
$inbox->limit($offset, $limit);
@ -141,4 +142,43 @@ class Notice_inbox extends Memcached_DataObject
'WHERE user_id = ' . $user_id . ' ' .
'AND notice_id in ('.implode(',', $notices).')');
}
static function bulkInsert($notice_id, $created, $ni)
{
$cnt = 0;
$qryhdr = 'INSERT INTO notice_inbox (user_id, notice_id, source, created) VALUES ';
$qry = $qryhdr;
foreach ($ni as $id => $source) {
if ($cnt > 0) {
$qry .= ', ';
}
$qry .= '('.$id.', '.$notice_id.', '.$source.", '".$created. "') ";
$cnt++;
if (rand() % NOTICE_INBOX_SOFT_LIMIT == 0) {
// FIXME: Causes lag in replicated servers
// Notice_inbox::gc($id);
}
if ($cnt >= MAX_BOXCARS) {
$inbox = new Notice_inbox();
$result = $inbox->query($qry);
if (PEAR::isError($result)) {
common_log_db_error($inbox, $qry);
}
$qry = $qryhdr;
$cnt = 0;
}
}
if ($cnt > 0) {
$inbox = new Notice_inbox();
$result = $inbox->query($qry);
if (PEAR::isError($result)) {
common_log_db_error($inbox, $qry);
}
}
return;
}
}