Message Queue Framework also know as MQF is a fully functional system that allows a module to publish messages to queues. It also creates consumers who receive them asynchronously. Message queue are used for various functionality in Magento 2. For example import export mass product attribute update and in many other places.
Table of Contents
What is Message Queue
A message queue is a communication mechanism that allows different parts of a software system to communicate and exchange information asynchronously. In this context, “asynchronous” means that the sender and receiver don’t need to interact with each other at the same time. Instead, the sender can post a message to the queue, and the receiver can retrieve and process the message later.
Three component of Message Queue
Sender (Producer): The component or system that generates a message and sends it to the message queue. This could be triggered by a specific event or as part of regular processing.
Message Queue: A storage system that holds messages sent by producers. Messages are typically stored in a first-in, first-out (FIFO) order, meaning the first message sent is the first one to be processed.
Receiver (Consumer): The component or system that retrieves messages from the message queue and processes them. The receiver can work independently of the sender, allowing for decoupling of components.
Message queues are valuable in scenarios where different parts of a system need to communicate without being tightly connected. In Magento we will use rabbitmq messaging broker.
What is RabbitMQ
RabbitMQ is a robust and widely-used open-source message broker software that facilitates communication between different parts of a distributed system. It is part of the message queue middleware family, serving as a reliable intermediary for asynchronous communication between applications and services.
Installing RabbitMQ
To install RabbitMQ on a unix server you need to enter the following command.
sudo apt install -y rabbitmq-server
In order to ensure its successfully installed you will need to run a unix command to be sure its running or not.
/etc/init.d/rabbitmq-server status
The above command should give you information about the rabbitmq server. If giving error that means it is not installed correctly and need to install it again.
For the windows users you can follow this link How To Install RabbitMQ on Windows. You can also watch the below videos for visually understanding.
Configuring RabbitMQ in Magento 2
Next thing you will need to open env.php file from your project. Find queue
array and add the below code in it.
'queue' => [
'amqp' => [
'host' => '127.0.0.1',
'port' => '5672',
'user' => 'root',
'password' => '',
'virtualhost' => '/',
'ssl' => ''
]
]
After saving env.php file now you will need to run php bin/magento setup:upgrade
command. If there are any error it will throw during this time.
Overall of Example in Adobe commerce/Magento
Now, let’s embark on creating a simple module to delve into the intricacies of message queues. In this workflow, we’ll utilize a web API for clarity in message handling. The journey begins with data passing from the web API to the publisher. Upon reception, the publisher routes and stores the data in the queue based on the specified topic. Subsequently, running the command php bin/magento queue:consumer
start triggers the consumer, which retrieves and executes the message, finally logging the results for further insight.
Create Module for Message Queue
You will need to create a custom module in our case will create it as LearningMagento_MessageQueue for basics files needed in Magento please check the article Create Magento 2 Custom Module Development.
Apart from the base files we will need to create webapi.xml file for our example project. Note the webapi.xml and files added in this xml file is not part of Message Queue. We are using it for an example purpose.
webapi.xml
<?xml version="1.0"?>
<routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Webapi:etc/webapi.xsd">
<route url="/V1/product/import" method="POST">
<service class="Learningmagento\MessageQueue\Api\ProductImportInterface" method="update"/>
<resources>
<resource ref="anonymous"/>
</resources>
</route>
</routes>
The above file is a configuration for our api. To know the elements of the webapi.xml please look at the article Creating Custom API in Magento 2. Now will create the interface class added inside the webapi file.
Api files | Interface Classes
<?php
/**
*
* @category Custom Development
* @email contactus@learningmagento.com
* @author Custom Development
* @website learningmagento.com
* @Date 29-02-2024
*/
namespace Learningmagento\MessageQueue\Api;
interface ProductImportInterface
{
/**
* @param ProductInterface $product[]
* @return mixed
*/ public function update(ProductInterface $product);
}
Same way will create one more interface name ProductInterface
inside same directory.
<?php
/**
*
* @category Custom Development
* @email contactus@learningmagento.com
* @author Custom Development
* @website learningmagento.com
* @Date 29-02-2024
*/
namespace Learningmagento\MessageQueue\Api;
interface ProductInterface
{
/**
* @return string
*/ public function getName();
/**
* @param string $name
* @return $this
*/ public function setName($name);
}
Model classes
For the above interface classes we will create respective models classes for each. We will create a new directory inside our project folder name “Model“.
<?php
namespace Learningmagento\MessageQueue\Model;
class Product implements \Learningmagento\MessageQueue\Api\ProductInterface
{
protected $name;
/**
* @return string
*/ public function getName()
{
return $this->name;
}
/**
* @param string $name
* @return $this
*/ public function setName($name)
{
$this->name = $name;
return $this;
}
}
<?php
namespace Learningmagento\MessageQueue\Model;
use Learningmagento\MessageQueue\Api\ProductInterface;
use Learningmagento\MessageQueue\Publisher\ProductImport as ModelProductImport;
class ProductImport implements \Learningmagento\MessageQueue\Api\ProductImportInterface
{
protected $publisher;
/**
* @param ModelProductImport $publisher
*/ public function __construct(
ModelProductImport $publisher
) {
$this->publisher = $publisher;
}
/**
* @param ProductInterface $product []
* @return mixed
*/ public function update(ProductInterface $product)
{
return $this->publisher->publish([$product]);
}
}
Abstraction-implementation mappings
The object manager employs abstraction-implementation mappings when a class constructor seeks an object by its interface. These mappings guide the object manager in identifying the default implementation for the class within a specific scope. This approach ensures seamless interaction between interfaces and implementations, enhancing the flexibility and adaptability of the system.
We will do this using file di.xml file. These file is not just used to do above but also to override any class using same preference approach like overriding a controller class, Parameter configuration, creating custom command and many more.
Now will create a di.xml file inside etc directory and place the below code. Whenever the interface class is called it will internally called model class. Example if you call setName
method of ProductInterface
it will then run the code written inside Product class’s setName
method. This kind of design pattern is know as Proxy Design Pattern.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
<preference for="Learningmagento\MessageQueue\Api\ProductImportInterface" type="Learningmagento\MessageQueue\Model\ProductImport" />
<preference for="Learningmagento\MessageQueue\Api\ProductInterface" type="Learningmagento\MessageQueue\Model\Product" />
</config>
communication.xml
This file serves the fundamental role of outlining elements in the asynchronous messaging systems that are universally applicable across all forms of communication. It contains a list of topics and handlers. Intended to contain message queue information shared between implementations.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="product.import.queue" request="string" response="string" />
</config>
In the above file we have a topic with name as product.import.queue
where we will be sending a request in terms of string and will also receive a response as a string.
queue_publisher.xml
The next file in a line is queue_publisher.xml, which includes an exchange where a topic is deliberated. It’s crucial to maintain consistency in the connection name, aligning with the one stipulated in the communication file. In our case its product.import.queue
.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<publisher topic="product.import.queue">
<connection name="amqp" exchange="product-import-exchange" />
</publisher>
</config>
The connection we are using is of Advanced Message Queuing Protocol (amqp) you can also use Database Queue (db). This connection must match the connection attribute in the queue_topology.xml file.
Publisher class
Now we need to create a directory under inside the plugin. The name of the directory is Publisher. Create a new class inside Publisher folder with a name as ProductImport.php. The file is been used in Model class ProductImport.
<?php
namespace Learningmagento\MessageQueue\Publisher;
use Magento\Framework\MessageQueue\PublisherInterface;
use Magento\Framework\Serialize\Serializer\Json;
class ProductImport
{
/**
* String
*/ public const TOPIC_NAME = "product.import.queue";
/**
* @var PublisherInterface
*/ protected $publisher;
/**
* @var Json
*/ protected $json;
/**
* @param PublisherInterface $publisher
* @param Json $json
*/ public function __construct(
PublisherInterface $publisher,
Json $json
) {
$this->publisher = $publisher;
$this->json = $json;
}
/**
* @param array $data
* @return mixed|null
*/ public function publish(array $data) {
return $this->publisher->publish(self::TOPIC_NAME, $this->json->serialize($data));
}
}
queue_topology.xml
The topology file serves to elucidate the message routing rules and declare queues and exchanges. Within this file, articulate the exchange name, type, and connection attributes. In simple it defines the message routing rules and declares queues and exchanges.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="product-import-exchange" type="topic" connection="amqp">
<binding id="productImportBinding" topic="product.import.queue" destinationType="queue" destination="productImport-queue"/>
</exchange>
</config>
queue_consumer.xml
The queue_comsumer.xml explains the link between a present queue and its consumer. The consumer has a name and the queue which need to be same like above file queue_topology.xml, connection same like above and a handle class need to be pass along with the method which will be called during the process.
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<consumer name="product.import.consume" queue="productImport-queue" connection="amqp" handler="Learningmagento\MessageQueue\Consumer\ProductImport::process"/>
</config>
Handler class
<?php
/**
*
* @category Custom Development
* @email contactus@learningmagento.com
* @author Custom Development
* @website learningmagento.com
* @Date 12-03-2024
*/
namespace Learningmagento\MessageQueue\Consumer;
class ProductImport
{
public function process($operation)
{
$writer = new \Zend_Log_Writer_Stream(BP . '/var/log/custom.log');
$logger = new \Zend_Log();
$logger->addWriter($writer);
$logger->info('Your text message');
$logger->info(print_r($operation, true));
}
}
Tables involved
There are three tables which are involved in message queue inside Magento those are queue, queue_message and queue_message_status. The table queue contains the list of queues, queue_message contains the message data in the json format and the queue_message_status tells the status of the message in the queue if a message in the queue whether the message is executed or not.
Process of Sending a Message from Publisher to a Queue
To dispatch a message from the publisher to a Magento 2 Queue, a controller must be crafted. These messages find residence in Magento’s database. Here’s the breakdown:
- The queue table manages queues.
- The queue_message table stores messages.
- The queue_message_status table navigates the overload of a message queue.
Testing
At last will be testing our messaging queue. Since we have created lots of xml files and hence we first clear the cache or just run setup:upgrade. Next we will open the postman application to call the api. We will pass a name attribute in the body. Send the request. This will add the task in the queue.
Now open the command prompt and type php bin/magento queue:consumers:list
you will now notice your consumer being listed in the list product.import.consume
Now run the command php bin/magento queue:consumers:start product.import.consume
. This will execute the code where it will write in our log file. This process need to be done without running commands and should be done by the cron jobs.