Как мне использовать очередь отложенных сообщений RabbitMQ из PHP?
Я пытаюсь использовать Очередь отложенных сообщений для RabbitMQ с PHP, но мои сообщения просто исчезают.
Я объявляю обмен со следующим кодом:
$this->channel->exchange_declare(
'delay',
'x-delayed-message',
false, /* passive, create if exchange doesn't exist */
true, /* durable, persist through server reboots */
false, /* autodelete */
false, /* internal */
false, /* nowait */
['x-delayed-type' => ['S', 'direct']]);
Я связываю очередь этим кодом:
$this->channel->queue_declare(
$queueName,
false, /* Passive */
true, /* Durable */
false, /* Exclusive */
false /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);
И я публикую сообщение с этим кодом:
$msg = new AMQPMessage(json_encode($msgData), [
'delivery_mode' => 2,
'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);
Но сообщение не задерживается; оно все равно доставляется немедленно. Чего мне не хватает?
3 answers
Из здесь,
Создание сообщения должно быть
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$msg = new AMQPMessage($data,
array(
'delivery_mode' => 2, # make message persistent
'application_headers' => new AMQPTable([
'x-delay' => 5000
])
)
);
Ответ для тех, кому нужна задержка сообщения, но кто не хочет вдаваться в подробности. Вам нужно всего несколько вещей, чтобы заставить его работать:
Установите взаимодействие amqp совместимый транспорт, напримерenqueue/amqp-bunny
и enqueue/amqp-tools
.
composer require enqueue/amqp-bunny enqueue/amqp-tools
Создайте контекст amqp, добавьте стратегию задержки и отправляйте отложенные сообщения:
<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;
$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())
$queue = $context->createQueue('foo');
$context->declareQueue($queue);
$message = $context->createMessage('Hello world!');
$context->createProducer()
->setDeliveryDelay(5000) // 5 sec
->send($queue, $message)
;
Кстати, это не единственная доступная стратегия. существует один, основанный на очередях мертвых писем RabbitMQ + ttl. Его можно было бы использовать таким же образом.
Вам нужен ключ маршрутизации для публикации с биржи в соответствующую очередь.
Причина, по которой публикация во встроенном прямом обмене работает, заключается в том, что этот обмен является особым случаем, в котором в качестве имени очереди назначения используется ключ маршрутизации.
Для всех создаваемых вами обменов и очередей необходимо создать привязку между обменом и очередью с ключом маршрутизации. затем вы публикуете сообщение с этим ключом маршрутизации вместо очереди назначения имя.
Я не знаю PHP-кода для создания привязки... но в целом это выглядит примерно так:
channel.bind(exhange_name, queue_name, routing_key)
Затем в вашей публикации сообщения:
$this->channel->basic_publish($msg, 'delay', $routing_key);