Enable json-specified objects in queue_item frames
This commit is contained in:
parent
2ef9beb4b1
commit
e7a4ccb7b5
|
@ -79,27 +79,28 @@ class DBQueueManager extends QueueManager
|
||||||
}
|
}
|
||||||
|
|
||||||
$queue = $qi->transport;
|
$queue = $qi->transport;
|
||||||
$item = $this->decode($qi->frame);
|
try {
|
||||||
|
$item = $this->decode($qi->frame);
|
||||||
|
} catch (Exception $e) {
|
||||||
|
$this->_log(LOG_INFO, "[$queue] Discarding: ".$e->getMessage());
|
||||||
|
$this->_done($qi);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
if ($item) {
|
$rep = $this->logrep($item);
|
||||||
$rep = $this->logrep($item);
|
$this->_log(LOG_DEBUG, "Got $rep for transport $queue");
|
||||||
$this->_log(LOG_DEBUG, "Got $rep for transport $queue");
|
|
||||||
|
$handler = $this->getHandler($queue);
|
||||||
$handler = $this->getHandler($queue);
|
if ($handler) {
|
||||||
if ($handler) {
|
if ($handler->handle($item)) {
|
||||||
if ($handler->handle($item)) {
|
$this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
|
|
||||||
$this->_done($qi);
|
|
||||||
} else {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
|
|
||||||
$this->_fail($qi);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
$this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
|
|
||||||
$this->_done($qi);
|
$this->_done($qi);
|
||||||
|
} else {
|
||||||
|
$this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
|
||||||
|
$this->_fail($qi);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
$this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
|
$this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
|
||||||
$this->_done($qi);
|
$this->_done($qi);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -177,7 +177,39 @@ abstract class QueueManager extends IoManager
|
||||||
*/
|
*/
|
||||||
protected function decode($frame)
|
protected function decode($frame)
|
||||||
{
|
{
|
||||||
return unserialize($frame);
|
$object = unserialize($frame);
|
||||||
|
|
||||||
|
// If it is a string, we really store a JSON object in there
|
||||||
|
if (is_string($object)) {
|
||||||
|
$json = json_decode($object);
|
||||||
|
if ($json === null) {
|
||||||
|
throw new Exception('Bad frame in queue item');
|
||||||
|
}
|
||||||
|
|
||||||
|
// The JSON object has a type parameter which contains the class
|
||||||
|
if (empty($json->type)) {
|
||||||
|
throw new Exception('Type not specified for queue item');
|
||||||
|
}
|
||||||
|
if (!is_a($json->type, 'Managed_DataObject', true)) {
|
||||||
|
throw new Exception('Managed_DataObject class does not exist for queue item');
|
||||||
|
}
|
||||||
|
|
||||||
|
// And each of these types should have a unique id (or uri)
|
||||||
|
if (isset($json->id) && !empty($json->id)) {
|
||||||
|
$object = call_user_func(array($json->type, 'getKV'), 'id', $json->id);
|
||||||
|
} elseif (isset($json->uri) && !empty($json->uri)) {
|
||||||
|
$object = call_user_func(array($json->type, 'getKV'), 'uri', $json->uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
// But if no object was found, there's nothing we can handle
|
||||||
|
if (!$object instanceof Managed_DataObject) {
|
||||||
|
throw new Exception('Queue item frame referenced a non-existant object');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the frame was not a string, it's either an array or an object.
|
||||||
|
|
||||||
|
return $object;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -498,8 +498,9 @@ class StompQueueManager extends QueueManager
|
||||||
// @fixme detect failing site switches
|
// @fixme detect failing site switches
|
||||||
$this->switchSite($site);
|
$this->switchSite($site);
|
||||||
|
|
||||||
$item = $this->decode($message['payload']);
|
try {
|
||||||
if (empty($item)) {
|
$item = $this->decode($message['payload']);
|
||||||
|
} catch (Exception $e) {
|
||||||
$this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
|
$this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
|
||||||
$this->stats('baditem', $queue);
|
$this->stats('baditem', $queue);
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user