214 lines
5.8 KiB
PHP
214 lines
5.8 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace Enqueue\Stomp;
|
|
|
|
use Interop\Queue\Consumer;
|
|
use Interop\Queue\Context;
|
|
use Interop\Queue\Destination;
|
|
use Interop\Queue\Exception\InvalidDestinationException;
|
|
use Interop\Queue\Exception\PurgeQueueNotSupportedException;
|
|
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
|
|
use Interop\Queue\Message;
|
|
use Interop\Queue\Producer;
|
|
use Interop\Queue\Queue;
|
|
use Interop\Queue\SubscriptionConsumer;
|
|
use Interop\Queue\Topic;
|
|
|
|
class StompContext implements Context
|
|
{
|
|
/**
|
|
* @var BufferedStompClient
|
|
*/
|
|
private $stomp;
|
|
|
|
/**
|
|
* @var bool
|
|
*/
|
|
private $useExchangePrefix;
|
|
|
|
/**
|
|
* @var callable
|
|
*/
|
|
private $stompFactory;
|
|
|
|
/**
|
|
* @param BufferedStompClient|callable $stomp
|
|
* @param bool $useExchangePrefix
|
|
*/
|
|
public function __construct($stomp, $useExchangePrefix = true)
|
|
{
|
|
if ($stomp instanceof BufferedStompClient) {
|
|
$this->stomp = $stomp;
|
|
} elseif (is_callable($stomp)) {
|
|
$this->stompFactory = $stomp;
|
|
} else {
|
|
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
|
|
}
|
|
|
|
$this->useExchangePrefix = $useExchangePrefix;
|
|
}
|
|
|
|
/**
|
|
* @return StompMessage
|
|
*/
|
|
public function createMessage(string $body = '', array $properties = [], array $headers = []): Message
|
|
{
|
|
return new StompMessage($body, $properties, $headers);
|
|
}
|
|
|
|
/**
|
|
* @return StompDestination
|
|
*/
|
|
public function createQueue(string $name): Queue
|
|
{
|
|
if (0 !== strpos($name, '/')) {
|
|
$destination = new StompDestination();
|
|
$destination->setType(StompDestination::TYPE_QUEUE);
|
|
$destination->setStompName($name);
|
|
|
|
return $destination;
|
|
}
|
|
|
|
return $this->createDestination($name);
|
|
}
|
|
|
|
/**
|
|
* @return StompDestination
|
|
*/
|
|
public function createTemporaryQueue(): Queue
|
|
{
|
|
$queue = $this->createQueue(uniqid('', true));
|
|
$queue->setType(StompDestination::TYPE_TEMP_QUEUE);
|
|
|
|
return $queue;
|
|
}
|
|
|
|
/**
|
|
* @return StompDestination
|
|
*/
|
|
public function createTopic(string $name): Topic
|
|
{
|
|
if (0 !== strpos($name, '/')) {
|
|
$destination = new StompDestination();
|
|
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
|
|
$destination->setStompName($name);
|
|
|
|
return $destination;
|
|
}
|
|
|
|
return $this->createDestination($name);
|
|
}
|
|
|
|
public function createDestination(string $destination): StompDestination
|
|
{
|
|
$types = [
|
|
StompDestination::TYPE_TOPIC,
|
|
StompDestination::TYPE_EXCHANGE,
|
|
StompDestination::TYPE_QUEUE,
|
|
StompDestination::TYPE_AMQ_QUEUE,
|
|
StompDestination::TYPE_TEMP_QUEUE,
|
|
StompDestination::TYPE_REPLY_QUEUE,
|
|
];
|
|
|
|
$dest = $destination;
|
|
$type = null;
|
|
$name = null;
|
|
$routingKey = null;
|
|
|
|
foreach ($types as $_type) {
|
|
$typePrefix = '/'.$_type.'/';
|
|
if (0 === strpos($dest, $typePrefix)) {
|
|
$type = $_type;
|
|
$dest = substr($dest, strlen($typePrefix));
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (null === $type) {
|
|
throw new \LogicException(sprintf('Destination name is invalid, cant find type: "%s"', $destination));
|
|
}
|
|
|
|
$pieces = explode('/', $dest);
|
|
|
|
if (count($pieces) > 2) {
|
|
throw new \LogicException(sprintf('Destination name is invalid, found extra / char: "%s"', $destination));
|
|
}
|
|
|
|
if (empty($pieces[0])) {
|
|
throw new \LogicException(sprintf('Destination name is invalid, name is empty: "%s"', $destination));
|
|
}
|
|
|
|
$name = $pieces[0];
|
|
|
|
if (isset($pieces[1])) {
|
|
if (empty($pieces[1])) {
|
|
throw new \LogicException(sprintf('Destination name is invalid, routing key is empty: "%s"', $destination));
|
|
}
|
|
|
|
$routingKey = $pieces[1];
|
|
}
|
|
|
|
$destination = new StompDestination();
|
|
$destination->setType($type);
|
|
$destination->setStompName($name);
|
|
$destination->setRoutingKey($routingKey);
|
|
|
|
return $destination;
|
|
}
|
|
|
|
/**
|
|
* @param StompDestination $destination
|
|
*
|
|
* @return StompConsumer
|
|
*/
|
|
public function createConsumer(Destination $destination): Consumer
|
|
{
|
|
InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);
|
|
|
|
return new StompConsumer($this->getStomp(), $destination);
|
|
}
|
|
|
|
/**
|
|
* @return StompProducer
|
|
*/
|
|
public function createProducer(): Producer
|
|
{
|
|
return new StompProducer($this->getStomp());
|
|
}
|
|
|
|
public function close(): void
|
|
{
|
|
$this->getStomp()->disconnect();
|
|
}
|
|
|
|
public function createSubscriptionConsumer(): SubscriptionConsumer
|
|
{
|
|
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
|
|
}
|
|
|
|
public function purgeQueue(Queue $queue): void
|
|
{
|
|
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
|
|
}
|
|
|
|
public function getStomp(): BufferedStompClient
|
|
{
|
|
if (false == $this->stomp) {
|
|
$stomp = call_user_func($this->stompFactory);
|
|
if (false == $stomp instanceof BufferedStompClient) {
|
|
throw new \LogicException(sprintf(
|
|
'The factory must return instance of BufferedStompClient. It returns %s',
|
|
is_object($stomp) ? get_class($stomp) : gettype($stomp)
|
|
));
|
|
}
|
|
|
|
$this->stomp = $stomp;
|
|
}
|
|
|
|
return $this->stomp;
|
|
}
|
|
}
|