Twitter streaming API reader: Cleanup input handling & split from HTTP headers to body

This commit is contained in:
Brion Vibber 2010-10-05 12:17:16 -07:00
parent 3b304fc0ef
commit 5058e8fd14
2 changed files with 49 additions and 19 deletions

View File

@ -93,18 +93,24 @@ abstract class JsonStreamReader
$this->state = 'waiting'; $this->state = 'waiting';
} }
/**
* Send some fun data off to the server.
*
* @param string $buffer
*/
function send($buffer) function send($buffer)
{ {
echo "Writing...\n";
var_dump($buffer);
fwrite($this->socket, $buffer); fwrite($this->socket, $buffer);
} }
/**
* Read next packet of data from the socket.
*
* @return string
*/
function read() function read()
{ {
echo "Reading...\n";
$buffer = fread($this->socket, 65536); $buffer = fread($this->socket, 65536);
var_dump($buffer);
return $buffer; return $buffer;
} }
@ -195,12 +201,16 @@ abstract class JsonStreamReader
{ {
$lines = explode(self::CRLF, $buffer); $lines = explode(self::CRLF, $buffer);
foreach ($lines as $line) { foreach ($lines as $line) {
if ($line == '') { if ($this->state == 'headers') {
$this->state = 'active'; if ($line == '') {
common_log(LOG_DEBUG, "$this->id connection is active!"); $this->state = 'active';
} else { common_log(LOG_DEBUG, "$this->id connection is active!");
common_log(LOG_DEBUG, "$this->id read HTTP header: $line"); } else {
$this->responseHeaders[] = $line; common_log(LOG_DEBUG, "$this->id read HTTP header: $line");
$this->responseHeaders[] = $line;
}
} else if ($this->state == 'active') {
$this->handleLineActive($line);
} }
} }
} }
@ -211,12 +221,21 @@ abstract class JsonStreamReader
// Will we always deliver on packet boundaries? // Will we always deliver on packet boundaries?
$lines = explode("\n", $buffer); $lines = explode("\n", $buffer);
foreach ($lines as $line) { foreach ($lines as $line) {
$data = json_decode($line, true); $this->handleLineActive($line);
if ($data) { }
$this->handleJson($data); }
} else {
common_log(LOG_ERR, "$this->id received bogus JSON data: " . $line); function handleLineActive($line)
} {
if ($line == '') {
// Server sends empty lines as keepalive.
return;
}
$data = json_decode($line, true);
if ($data) {
$this->handleJson($data);
} else {
common_log(LOG_ERR, "$this->id received bogus JSON data: " . $line);
} }
} }

View File

@ -94,11 +94,22 @@ abstract class TwitterStreamReader extends JsonStreamReader
abstract function routeMessage($data); abstract function routeMessage($data);
function handleMessage($data, $forUserId=null) /**
* Send the decoded JSON object out to any event listeners.
*
* @param array $data
* @param int $forUserId
*/
function handleMessage(array $data, $forUserId=null)
{ {
$this->fireEvent('raw', $data, $forUserId); $this->fireEvent('raw', $data, $forUserId);
$known = array('friends');
foreach ($known as $key) { if (isset($data['id']) && isset($data['text']) && isset($data['user'])) {
$this->fireEvent('status', $data);
}
$knownMeta = array('friends', 'delete', 'scrubgeo', 'limit', 'event', 'direct_message');
foreach ($knownMeta as $key) {
if (isset($data[$key])) { if (isset($data[$key])) {
$this->fireEvent($key, $data[$key], $forUserId); $this->fireEvent($key, $data[$key], $forUserId);
} }