Work in progress on site streams-aware TwitterDaemon

This commit is contained in:
Brion Vibber 2010-10-29 13:18:03 -07:00
parent 86adc575ec
commit 47eada3a95

View File

@ -114,7 +114,7 @@ class TwitterManager extends IoManager
* @fixme check their last-id and check whether we'll need to do a manual pull. * @fixme check their last-id and check whether we'll need to do a manual pull.
* @fixme abstract out the fetching so we can work over multiple sites. * @fixme abstract out the fetching so we can work over multiple sites.
*/ */
function initStreams() protected function initStreams()
{ {
// Pull Twitter user IDs for all users we want to pull data for // Pull Twitter user IDs for all users we want to pull data for
$flink = new Foreign_link(); $flink = new Foreign_link();
@ -146,7 +146,7 @@ class TwitterManager extends IoManager
* *
* @param $users array of Twitter-side user IDs * @param $users array of Twitter-side user IDs
*/ */
function spawnStream($users) protected function spawnStream($users)
{ {
$stream = $this->initSiteStream(); $stream = $this->initSiteStream();
$stream->followUsers($userIds); $stream->followUsers($userIds);
@ -168,7 +168,7 @@ class TwitterManager extends IoManager
* *
* @return TwitterStreamReader * @return TwitterStreamReader
*/ */
function initSiteStream() protected function initSiteStream()
{ {
$auth = $this->siteStreamAuth(); $auth = $this->siteStreamAuth();
$stream = new TwitterSiteStream($auth); $stream = new TwitterSiteStream($auth);
@ -190,7 +190,7 @@ class TwitterManager extends IoManager
* *
* @return TwitterOAuthClient * @return TwitterOAuthClient
*/ */
function siteStreamAuth() protected function siteStreamAuth()
{ {
$token = common_config('twitter', 'stream_token'); $token = common_config('twitter', 'stream_token');
$secret = common_config('twitter', 'stream_secret'); $secret = common_config('twitter', 'stream_secret');
@ -205,7 +205,7 @@ class TwitterManager extends IoManager
* *
* @return array of resources * @return array of resources
*/ */
function getSockets() public function getSockets()
{ {
$sockets = array(); $sockets = array();
foreach ($this->streams as $stream) { foreach ($this->streams as $stream) {
@ -223,7 +223,7 @@ class TwitterManager extends IoManager
* @param resource $socket * @param resource $socket
* @return boolean success * @return boolean success
*/ */
function handleInput($socket) public function handleInput($socket)
{ {
foreach ($this->streams as $stream) { foreach ($this->streams as $stream) {
foreach ($stream->getSockets() as $aSocket) { foreach ($stream->getSockets() as $aSocket) {
@ -236,11 +236,12 @@ class TwitterManager extends IoManager
} }
/** /**
* Start the system up! * Start the i/o system up! Prepare our connections and start opening them.
*
* @fixme do some rate-limiting on the stream setup * @fixme do some rate-limiting on the stream setup
* @fixme do some sensible backoff on failure etc * @fixme do some sensible backoff on failure etc
*/ */
function start() public function start()
{ {
$this->initStreams(); $this->initStreams();
foreach ($this->streams as $stream) { foreach ($this->streams as $stream) {
@ -249,7 +250,10 @@ class TwitterManager extends IoManager
return true; return true;
} }
function finish() /**
* Close down our connections when the daemon wraps up for business.
*/
public function finish()
{ {
foreach ($this->streams as $index => $stream) { foreach ($this->streams as $index => $stream) {
$stream->close(); $stream->close();
@ -280,33 +284,21 @@ class TwitterManager extends IoManager
/** /**
* Event callback notifying that a user has a new message in their home timeline. * Event callback notifying that a user has a new message in their home timeline.
* We store the incoming message into the queues for processing, keeping our own
* daemon running as shiny-fast as possible.
* *
* @param object $data JSON data: Twitter status update * @param object $status JSON data: Twitter status update
* @fixme in all-sites mode we may need to route queue items into another site's
* destination queues, or multiple sites.
*/ */
protected function onTwitterStatus($data, $context) protected function onTwitterStatus($status, $context)
{ {
$importer = new TwitterImport(); $data = array(
$notice = $importer->importStatus($data); 'status' => $status,
if ($notice) { 'for_user' => $context->for_user,
$user = $this->getTwitterUser($context); );
Inbox::insertNotice($user->id, $notice->id); $qm = QueueManager::get();
} $qm->enqueue($data, 'tweetin');
}
/**
* @fixme what about handling multiple sites?
*/
function getTwitterUser($context)
{
if ($context->source != 'sitestream') {
throw new ServerException("Unexpected stream source");
}
$flink = Foreign_link::getByForeignID(TWITTER_SERVICE, $context->for_user);
if ($flink) {
return $flink->getUser();
} else {
throw new ServerException("No local user for this Twitter ID");
}
} }
} }