Message Queues
Message queues provide an asynchronous communications mechanism in which the sender and the receiver of a message do not contact each other. Nor do they need to communicate with the message queue at the same time. When a sender places a messages onto a queue, it is stored until the recipient receives them.
In Magento Commerce, the Message Queue Framework (MQF) is a fully-functional system that allows a module to publish messages to queues. It also creates consumers to receive them asynchronously. The MQF primarily uses RabbitMQ as the messaging broker, which provides a scalable platform for sending and receiving messages. It also includes a mechanism for storing undelivered messages. RabbitMQ is based on the Advanced Message Queuing Protocol (AMQP) 0.9.1 specification.
A basic message queue system can also be set up without using RabbitMQ. In this system, a MySQL adapter stores messages in the database. Three database tables (queue
, queue_message
, and queue_message_status
) manage the message queue workload. Cron jobs ensure the consumers are able to receive messages. This solution is not very scalable. RabbitMQ should be used whenever possible.
See Configure message queues for information about setting up the message queue system.
Send a message from the publisher to a queue
The following code sends a message to the queue. The publish
method is defined in PublisherInterface
1
$publisher->publish($topic, $message)
In a MySQL adapter environment, when a message is published to multiple queues, create a single record in queue_message
and multiple records in queue_message_status
: one for each queue. (A join on the queue
, queue_message
, and queue_message_status
tables is required).
Instantiate a consumer
The procedure for instantiating a consumer differs, depending on which message queue system is being used.
RabbitMQ
This instantiates a consumer that is defined in a queue_consumer.xml
file. The consumer (customer_created_listener
) listens to the queue and receives all new messages. For every message, it invokes Magento\Some\Class::processMessage($message)
1
2
$this->consumerFactory->get('customer_created_listener')
->process();
MySQL adapter
Implement \Magento\Framework\MessageQueue\ConsumerInterface::process($maxNumberOfMessages)
to instantiate a consumer.
Perform the following actions:
- Define the queue name associated with current consumer using
\Magento\Framework\MessageQueue\ConsumerConfigurationInterface::getQueueName
. - Select
$maxNumberOfMessages
message records, filtering on thequeue_name
field. You must join on all 3 tables. To accomplish this, you may want to extract fewer records at a time to improve load distribution between multiple consumers. - Decode the message using topic name taken from the
\Magento\Framework\MessageQueue\ConsumerConfigurationInterface
. - Invoke callback
Magento\Framework\MessageQueue\ConsumerConfigurationInterface::getCallback
and pass the decoded data as an argument.
Override topic configuration
The following sample introduces a runtime configuration that allows you to redefine the adapter for a topic.
1
2
3
4
5
6
7
'queue' =>
array(
'topics' => array(
'customer.created' => [publisher="default-rabitmq"],
'order.created' => [publisher="default-rabitmq"],
),
),