浏览文章

文章信息

How to configure and use RabbitMQ in Magento 2.3 | Magento2 | Magento2怎样配置和使用RabbitMQ 10587

步骤 1# 要在 Magento 中配置 RabbitMQ,您需要在 Magento env.php 文件中添加 RabbitMQ 详细信息

magento 中的 env.php 文件位置:magento-root-dir/app/etc/env.php

'queue' => [
        'amqp' => [
            'host' => '34.222.345.76', //RabbitMQ的主机
            'port' => '5672', //运行RabbitMQ的端口。5672 是默认端口
            'user' => 'admin', // RabbitMQ 用户名
            'password' => 'xxxxxxxxxxxxx', //RabbitMQ 密码
            'virtualhost' => '/', //连接RabbitMQ的虚拟主机。默认值为 /。
            'ssl' => '',
        ],
    ],

现在配置完成后,我们将在自定义模块中使用 RabbitMQ

为此,我们需要在模块 etc 文件夹中创建以下四个文件

模块中 etc 文件夹的位置: magento-root/app/code/NameSpace/ModuleName/etc

Step 2# 1st 我们将在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建communication.xml

该文件包含一个主题列表。旨在包含实现之间共享的消息队列信息。

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
    <!-- 你可以根据自己命名主题 -->
    <topic name="yourtopibname.topic" request="string"/>
</config>

步骤 3# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_topology.xml

此文件定义消息路由规则并声明队列和交换。

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_topology.xsd">
    <!-- name : 交换的唯一 ID -->
    <!-- type : 指定交换的类型。必须是主题 -->
    <!-- connection:对于 AMQP 连接,标识连接的字符串。对于 MySQL 连接,连接名称必须是 db -->
    <exchange name="magento" type="topic" connection="amqp">
        <!-- id: 此绑定的唯一 ID -->
        <!-- topic: 主题名称 -->
        <!-- destinationType: 必须是队列 -->
        <!-- destination:目标队列名字-->
        <binding id="uniqueIdBinding" topic="yourtopibname.topic" destinationType="queue" destination="yourQueue"/>
    </exchange>
</config>

步骤 4# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_consumer.xml

此文件定义现有队列与其使用者之间的关系。

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_consumer.xsd">
    <!-- 名称: 消费者名称 -->
    <!-- queue: 定义将消息发送到的队列名称 -->
    <!-- connection: 对于AMQP连接,连接名称必须与queue_topology.xml中的connection属性匹配文件。否则,连接名必须是 db -->
    <!-- consumerInstance: 消费消息的 Magento 类名 -->
    <consumer name="consumerName" queue="yourQueue" connection="amqp" consumerInstance="NameSpace\ModuleName\Model\MassConsumer"/>
</config>

步骤 5# 在 magento-root/app/code/NameSpace/ModuleName/etc 文件夹中创建 queue_publisher.xml

此文件定义用于发布特定主题的消息的连接和交换。

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_publisher.xsd">
    <!-- topic: 主题名称-->
    <publisher topic="yourtopibname.topic">
        <!-- name: 对于 AMQP 连接,连接名称必须与 queue_topology.xml 文件中的连接属性匹配。否则,连接名称必须是 db -->
        <!-- exchange: 要发布到的交换的名称。默认系统交换名是magento -->
        <connection name="amqp" exchange="magento" /> <!-- 高级消息队列协议 -->
    </publisher>
</config>

现在我将登录 RabbitMQ 面板并检查队列列表


现在面板中没有队列我将在 magento 中安装我们的模块

 php bin/magento setup:upgrade

安装后,我们在 RabbitMQ 面板中得到了我们的队列,如下所示


我们的队列“yourQueue”添加到 RabbitMQ 现在我们将在这个队列中编写用于发布消息的代码

步骤 6# 在这里,我将创建一个控制器,从中我将在我们的队列中添加消息

创建文件:NameSpace\ModuleName\Controller\Adminhtml\PublishMessage\InRabbitMQ.php

<?php
namespace NameSpace\ModuleName\Controller\Adminhtml\PublishMessage;
use Magento\Backend\App\Action\Context;
use Magento\Framework\Controller\Result\JsonFactory;
use Magento\Framework\Json\Helper\Data as JsonHelper;
use NameSpace\ModuleName\Logger\Logger;
class InRabbitMQ extends \Magento\Backend\App\Action
{
    /**
     * @var \Magento\Framework\Controller\Result\JsonFactory
     */
    private $resultJsonFactory;
    /**
     * @var \Magento\Framework\Json\Helper\Data
     */
    private $jsonHelper;
    /**
     * @var \Magento\Framework\MessageQueue\PublisherInterface
     */
    private $publisher;
    /**
     * @var \NameSpace\ModuleName\Logger\Logger
     */
    private $logger;
    /**
     * @param Context $context
     * @param JsonFactory $resultJsonFactory
     * @param JsonHelper $jsonHelper
     * @param \Magento\Framework\MessageQueue\PublisherInterface $publisher
     * @param Logger $logger
     */
    public function __construct(
        Context                                            $context,
        JsonFactory                                        $resultJsonFactory,
        JsonHelper                                         $jsonHelper,
        \Magento\Framework\MessageQueue\PublisherInterface $publisher, // 用于在 RabbitMQ 中发布消息
        Logger                                             $logger
    )
    {
        parent::__construct($context);
        $this->resultJsonFactory = $resultJsonFactory;
        $this->jsonHelper = $jsonHelper;
        $this->publisher = $publisher;
        $this->logger = $logger;
    }
    /**
     * @return \Magento\Backend\Model\View\Result\Page
     */
    public function execute()
    {
        try {
            $resultJson = $this->resultJsonFactory->create();
            /**
             * 这里我们use随机产品id和产品数据as消息发布的$item
             * @var int $productId
             * @var array $item
             */
            $publishData = ['mage_pro_id' => $productId, 'item' => $item];
            // yourtopibname.topic 与您在communication.xml 文件中添加的相同
            $this->publisher->publish('yourtopibname.topic', $this->jsonHelper->jsonEncode($publishData));
            $result = ['msg' => '成功'];
            return $resultJson->setData($result);
        } catch (\Exception $e) {
            $result = ['error' => $e->getMessage()];
            return $resultJson->setData($result);
        }
    }
    /**
     * 检查产品导入权限。
     * */
    function _isAllowed()
    {
        return $this->_authorization->isAllowed('NameSpace_ModuleName::product_import');
    }
}

步骤 7# 我们将为进程队列消息(数据)创建模型类并将结果返回给消费者。

创建文件:NameSpace\ModuleName\Model\ProcessQueueMsg.php

<?php
namespace NameSpace\ModuleName\Model;
/**
 * ProcessQueueMsg 模型
 */
class ProcessQueueMsg
{
    /**
     * process
     * @param $message
     * @return
     */
    public function process($message)
    {
        // 在这里你可以处理你的数据并返回结果
    }
}

步骤 8# 现在我们将为进程队列消息创建消费者类

创建文件:NameSpace\ModuleName\Model\MassConsumer.php

<?php
namespace NameSpace\ModuleName\Model;
use Magento\Framework\App\ResourceConnection;
use NameSpace\ModuleName\Logger\Logger;
use Magento\Framework\MessageQueue\MessageLockException; 
use Magento\Framework\MessageQueue\ConnectionLostException; 
use Magento\Framework\Exception\NotFoundException; 
use Magento\Framework\MessageQueue\CallbackInvoker; 
use Magento\Framework\MessageQueue\ConsumerConfigurationInterface;
use Magento\Framework\MessageQueue\EnvelopeInterface; 
use Magento\Framework\MessageQueue\QueueInterface;
use Magento\Framework\MessageQueue\LockInterface;
use Magento\Framework\MessageQueue\MessageController;
use Magento\Framework\MessageQueue\ConsumerInterface;
/**
 * 类 Consumer 用于处理 OperationInterface 消息。
 *
 * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
 */ 
class MassConsumer implements ConsumerInterface
{
    /**
     * @var \Magento\Framework\MessageQueue\CallbackInvoker
     */
    private $invoker;
    /**
     * @var \Magento\Framework\App\ResourceConnection
     */
    private $resource;
    /**
     * @var \Magento\Framework\MessageQueue\ConsumerConfigurationInterface
     */
    private $configuration;
    /**
     * @var \Magento\Framework\MessageQueue\MessageController
     */
    private $messageController;
    /**
     * @var LoggerInterface
     */
    private $logger;
    /**
     * @var OperationProcessor
     */
    private $operationProcessor;
    /**
     * 初始化依赖。
     *
     * @param CallbackInvoker $invoker
     * @param ResourceConnection $resource
     * @param MessageController $messageController
     * @param ConsumerConfigurationInterface $configuration
     * @param OperationProcessorFactory $operationProcessorFactory
     * @param LoggerInterface $logger
     */
    public function __construct(
        CallbackInvoker                             $invoker,
        ResourceConnection                          $resource,
        MessageController                           $messageController,
        ConsumerConfigurationInterface              $configuration,
        \NameSpace\ModuleName\Model\ProcessQueueMsg $processQueueMsg,
        Logger                                      $logger
    )
    {
        $this->invoker = $invoker;
        $this->resource = $resource;
        $this->messageController = $messageController;
        $this->configuration = $configuration;
        $this->processQueueMsg = $processQueueMsg;
        $this->logger = $logger;
    }
    /**
     * {@inheritdoc}
     */
    public function process($maxNumberOfMessages = null)
    {
        $queue = $this->configuration->getQueue();
        if (!isset($maxNumberOfMessages)) {
            $queue->subscribe($this->getTransactionCallback($queue));
        } else {
            $this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
        }
    }
    /**
     * 获取交易回调。这处理异步的情况。
     *
     * @param QueueInterface $queue
     * @return \Closure
     */
    private function getTransactionCallback(QueueInterface $queue)
    {
        return function (EnvelopeInterface $message) use ($queue) {
            /** @var LockInterface $lock */
            $lock = null;
            try {
                $lock = $this->messageController->lock($message, $this->configuration->getConsumerName());
                $message = $message->getBody();
                /**
                 * $this->processQueueMsg->process() 用于处理您在队列中发布的消息
                 */
                $data = $this->processQueueMsg->process($message);
                if ($data === false) {
                    $queue->reject($message); // 如果在消息过程中出现错误
                }
                $queue->acknowledge($message); // 向队列发送确认
            } catch (MessageLockException $exception) {
                $queue->acknowledge($message);
            } catch (ConnectionLostException $e) {
                $queue->acknowledge($message);
                if ($lock) {
                    $this->resource->getConnection()
                        ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
                }
            } catch (NotFoundException $e) {
                $queue->acknowledge($message);
                $this->logger->warning($e->getMessage());
                $queue->reject($message, false, $e->getMessage());
                $queue->acknowledge($message);
                if ($lock) {
                    $this->resource->getConnection()
                        ->delete($this->resource->getTableName('queue_lock'), ['id = ?' => $lock->getId()]);
                }
            }
        };
    }
}

步骤 9# 现在我们将从终端检查我们的消费者

从 magento 根目录运行以下命令

php bin/magento queue:consumers:list

运行上述命令后,您将在终端上获得以下结果


步骤 10# 现在对于运行消费者,我们需要从终端执行以下命令

php bin/magento queue:consumers:start consumerName

要在后端运行此过程,您需要在上面的命令中添加“&”,如下所示

php bin/magento queue:consumers:start consumerName &



完毕!

原创