Как мне использовать очередь отложенных сообщений RabbitMQ из PHP?


Я пытаюсь использовать Очередь отложенных сообщений для RabbitMQ с PHP, но мои сообщения просто исчезают.

Я объявляю обмен со следующим кодом:

$this->channel->exchange_declare(
    'delay',
    'x-delayed-message',
    false,  /* passive, create if exchange doesn't exist */
    true,   /* durable, persist through server reboots */
    false,  /* autodelete */
    false,  /* internal */
    false,  /* nowait */
    ['x-delayed-type' => ['S', 'direct']]);

Я связываю очередь этим кодом:

$this->channel->queue_declare(
    $queueName,
    false,  /* Passive */
    true,   /* Durable */
    false,  /* Exclusive */
    false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

И я публикую сообщение с этим кодом:

$msg = new AMQPMessage(json_encode($msgData), [
    'delivery_mode' => 2,
    'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

Но сообщение не задерживается; оно все равно доставляется немедленно. Чего мне не хватает?

Author: Jesse Weigert, 0000-00-00

3 answers

Из здесь,

Создание сообщения должно быть

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new AMQPMessage($data,
            array(
                'delivery_mode' => 2, # make message persistent
                'application_headers' => new AMQPTable([
                    'x-delay' => 5000
                ])
            )
        );
 3
Author: Toosick, 2016-10-12 00:39:42

Ответ для тех, кому нужна задержка сообщения, но кто не хочет вдаваться в подробности. Вам нужно всего несколько вещей, чтобы заставить его работать:

Установите взаимодействие amqp совместимый транспорт, напримерenqueue/amqp-bunny и enqueue/amqp-tools.

composer require enqueue/amqp-bunny enqueue/amqp-tools

Создайте контекст amqp, добавьте стратегию задержки и отправляйте отложенные сообщения:

<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

Кстати, это не единственная доступная стратегия. существует один, основанный на очередях мертвых писем RabbitMQ + ttl. Его можно было бы использовать таким же образом.

 3
Author: Maksim Kotlyar, 2017-08-26 17:43:19

Вам нужен ключ маршрутизации для публикации с биржи в соответствующую очередь.

Причина, по которой публикация во встроенном прямом обмене работает, заключается в том, что этот обмен является особым случаем, в котором в качестве имени очереди назначения используется ключ маршрутизации.

Для всех создаваемых вами обменов и очередей необходимо создать привязку между обменом и очередью с ключом маршрутизации. затем вы публикуете сообщение с этим ключом маршрутизации вместо очереди назначения имя.

Я не знаю PHP-кода для создания привязки... но в целом это выглядит примерно так:

channel.bind(exhange_name, queue_name, routing_key)

Затем в вашей публикации сообщения:

$this->channel->basic_publish($msg, 'delay', $routing_key);

 1
Author: Derick Bailey, 2015-08-31 19:04:14