浏览文章
文章信息
How to configure and use RabbitMQ in Magento 2.3 | Magento2 | Magento2怎样配置和使用RabbitMQ
10587
步骤 1# 要在 Magento 中配置 RabbitMQ,您需要在 Magento env.php 文件中添加 RabbitMQ 详细信息
magento 中的 env.php 文件位置:magento-root-dir/app/etc/env.php
'queue' => [ 'amqp' => [ 'host' => '34.222.345.76', //RabbitMQ的主机 'port' => '5672', //运行RabbitMQ的端口。5672 是默认端口 'user' => 'admin', // RabbitMQ 用户名 'password' => 'xxxxxxxxxxxxx', //RabbitMQ 密码 'virtualhost' => '/', //连接RabbitMQ的虚拟主机。默认值为 /。 'ssl' => '', ], ],现在配置完成后,我们将在自定义模块中使用 RabbitMQ
为此,我们需要在模块 etc 文件夹中创建以下四个文件
模块中 etc 文件夹的位置: magento-root/app/code/NameSpace/ModuleName/etc
Step 2# 1st 我们将在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建communication.xml
该文件包含一个主题列表。旨在包含实现之间共享的消息队列信息。
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd"> <!-- 你可以根据自己命名主题 --> <topic name="yourtopibname.topic" request="string"/> </config>步骤 3# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_topology.xml
此文件定义消息路由规则并声明队列和交换。
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_topology.xsd"> <!-- name : 交换的唯一 ID --> <!-- type : 指定交换的类型。必须是主题 --> <!-- connection:对于 AMQP 连接,标识连接的字符串。对于 MySQL 连接,连接名称必须是 db --> <exchange name="magento" type="topic" connection="amqp"> <!-- id: 此绑定的唯一 ID --> <!-- topic: 主题名称 --> <!-- destinationType: 必须是队列 --> <!-- destination:目标队列名字--> <binding id="uniqueIdBinding" topic="yourtopibname.topic" destinationType="queue" destination="yourQueue"/> </exchange> </config>步骤 4# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_consumer.xml
此文件定义现有队列与其使用者之间的关系。
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_consumer.xsd"> <!-- 名称: 消费者名称 --> <!-- queue: 定义将消息发送到的队列名称 --> <!-- connection: 对于AMQP连接,连接名称必须与queue_topology.xml中的connection属性匹配文件。否则,连接名必须是 db --> <!-- consumerInstance: 消费消息的 Magento 类名 --> <consumer name="consumerName" queue="yourQueue" connection="amqp" consumerInstance="NameSpace\ModuleName\Model\MassConsumer"/> </config>步骤 5# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_publisher.xml
此文件定义用于发布特定主题的消息的连接和交换。
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_publisher.xsd"> <!-- topic: 主题名称--> <publisher topic="yourtopibname.topic"> <!-- name: 对于 AMQP 连接,连接名称必须与 queue_topology.xml 文件中的连接属性匹配。否则,连接名称必须是 db --> <!-- exchange: 要发布到的交换的名称。默认系统交换名是magento --> <connection name="amqp" exchange="magento" /> <!-- 高级消息队列协议 --> </publisher> </config>现在我将登录 RabbitMQ 面板并检查队列列表
现在面板中没有队列我将在 magento 中安装我们的模块
php bin/magento setup:upgrade安装后,我们在 RabbitMQ 面板中得到了我们的队列,如下所示
我们的队列“yourQueue”添加到 RabbitMQ 现在我们将在这个队列中编写用于发布消息的代码
步骤 6# 在这里,我将创建一个控制器,从中我将在我们的队列中添加消息
创建文件:NameSpace\ModuleName\Controller\Adminhtml\PublishMessage\InRabbitMQ.php
<?php namespace NameSpace\ModuleName\Controller\Adminhtml\PublishMessage; use Magento\Backend\App\Action\Context; use Magento\Framework\Controller\Result\JsonFactory; use Magento\Framework\Json\Helper\Data as JsonHelper; use NameSpace\ModuleName\Logger\Logger; class InRabbitMQ extends \Magento\Backend\App\Action { /** * @var \Magento\Framework\Controller\Result\JsonFactory */ private $resultJsonFactory; /** * @var \Magento\Framework\Json\Helper\Data */ private $jsonHelper; /** * @var \Magento\Framework\MessageQueue\PublisherInterface */ private $publisher; /** * @var \NameSpace\ModuleName\Logger\Logger */ private $logger; /** * @param Context $context * @param JsonFactory $resultJsonFactory * @param JsonHelper $jsonHelper * @param \Magento\Framework\MessageQueue\PublisherInterface $publisher * @param Logger $logger */ public function __construct( Context $context, JsonFactory $resultJsonFactory, JsonHelper $jsonHelper, \Magento\Framework\MessageQueue\PublisherInterface $publisher, // 用于在 RabbitMQ 中发布消息 Logger $logger ) { parent::__construct($context); $this->resultJsonFactory = $resultJsonFactory; $this->jsonHelper = $jsonHelper; $this->publisher = $publisher; $this->logger = $logger; } /** * @return \Magento\Backend\Model\View\Result\Page */ public function execute() { try { $resultJson = $this->resultJsonFactory->create(); /** * 这里我们use随机产品id和产品数据as消息发布的$item * @var int $productId * @var array $item */ $publishData = ['mage_pro_id' => $productId, 'item' => $item]; // yourtopibname.topic 与您在communication.xml 文件中添加的相同 $this->publisher->publish('yourtopibname.topic', $this->jsonHelper->jsonEncode($publishData)); $result = ['msg' => '成功']; return $resultJson->setData($result); } catch (\Exception $e) { $result = ['error' => $e->getMessage()]; return $resultJson->setData($result); } } /** * 检查产品导入权限。 * */ function _isAllowed() { return $this->_authorization->isAllowed('NameSpace_ModuleName::product_import'); } }步骤 7# 我们将为进程队列消息(数据)创建模型类并将结果返回给消费者。
创建文件:NameSpace\ModuleName\Model\ProcessQueueMsg.php
<?php namespace NameSpace\ModuleName\Model; /** * ProcessQueueMsg 模型 */ class ProcessQueueMsg { /** * process * @param $message * @return */ public function process($message) { // 在这里你可以处理你的数据并返回结果 } }步骤 8# 现在我们将为进程队列消息创建消费者类
创建文件:NameSpace\ModuleName\Model\MassConsumer.php
<?php namespace NameSpace\ModuleName\Model; use Magento\Framework\App\ResourceConnection; use NameSpace\ModuleName\Logger\Logger; use Magento\Framework\MessageQueue\MessageLockException; use Magento\Framework\MessageQueue\ConnectionLostException; use Magento\Framework\Exception\NotFoundException; use Magento\Framework\MessageQueue\CallbackInvoker; use Magento\Framework\MessageQueue\ConsumerConfigurationInterface; use Magento\Framework\MessageQueue\EnvelopeInterface; use Magento\Framework\MessageQueue\QueueInterface; use Magento\Framework\MessageQueue\LockInterface; use Magento\Framework\MessageQueue\MessageController; use Magento\Framework\MessageQueue\ConsumerInterface; /** * 类 Consumer 用于处理 OperationInterface 消息。 * * @SuppressWarnings(PHPMD.CouplingBetweenObjects) */ class MassConsumer implements ConsumerInterface { /** * @var \Magento\Framework\MessageQueue\CallbackInvoker */ private $invoker; /** * @var \Magento\Framework\App\ResourceConnection */ private $resource; /** * @var \Magento\Framework\MessageQueue\ConsumerConfigurationInterface */ private $configuration; /** * @var \Magento\Framework\MessageQueue\MessageController */ private $messageController; /** * @var LoggerInterface */ private $logger; /** * @var OperationProcessor */ private $operationProcessor; /** * 初始化依赖。 * * @param CallbackInvoker $invoker * @param ResourceConnection $resource * @param MessageController $messageController * @param ConsumerConfigurationInterface $configuration * @param OperationProcessorFactory $operationProcessorFactory * @param LoggerInterface $logger */ public function __construct( CallbackInvoker $invoker, ResourceConnection $resource, MessageController $messageController, ConsumerConfigurationInterface $configuration, \NameSpace\ModuleName\Model\ProcessQueueMsg $processQueueMsg, Logger $logger ) { $this->invoker = $invoker; $this->resource = $resource; $this->messageController = $messageController; $this->configuration = $configuration; $this->processQueueMsg = $processQueueMsg; $this->logger = $logger; } /** * {@inheritdoc} */ public function process($maxNumberOfMessages = null) { $queue = $this->configuration->getQueue(); if (!isset($maxNumberOfMessages)) { $queue->subscribe($this->getTransactionCallback($queue)); } else { $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue)); } } /** * 获取交易回调。这处理异步的情况。 * * @param QueueInterface $queue * @return \Closure */ private function getTransactionCallback(QueueInterface $queue) { return function (EnvelopeInterface $message) use ($queue) { /** @var LockInterface $lock */ $lock = null; try { $lock = $this->messageController->lock($message, $this->configuration->getConsumerName()); $message = $message->getBody(); /** * $this->processQueueMsg->process() 用于处理您在队列中发布的消息 */ $data = $this->processQueueMsg->process($message); if ($data === false) { $queue->reject($message); // 如果在消息过程中出现错误 } $queue->acknowledge($message); // 向队列发送确认 } catch (MessageLockException $exception) { $queue->acknowledge($message); } catch (ConnectionLostException $e) { $queue->acknowledge($message); if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } catch (NotFoundException $e) { $queue->acknowledge($message); $this->logger->warning($e->getMessage()); $queue->reject($message, false, $e->getMessage()); $queue->acknowledge($message); if ($lock) { $this->resource->getConnection() ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]); } } }; } }步骤 9# 现在我们将从终端检查我们的消费者
从 magento 根目录运行以下命令
php bin/magento queue:consumers:list运行上述命令后,您将在终端上获得以下结果
步骤 10# 现在对于运行消费者,我们需要从终端执行以下命令
php bin/magento queue:consumers:start consumerName要在后端运行此过程,您需要在上面的命令中添加“&”,如下所示
php bin/magento queue:consumers:start consumerName &
完毕!