这篇文章主要为大家详细介绍了PHP如何使用enqueue/amqp-lib实现rabbitmq任务处理,文中的示例代码讲解详细,感兴趣的小伙伴可以学习一下
目录
一:拓展安装二:方法介绍1:连接rabbitmq2:声明主题3:声明队列4:将队列绑定到主题5:发送消息6:消费消息【接收消息】三:简单实现1:发送消息2:消费消息一:拓展安装
composer require enqueue/amqp-lib
文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介绍
1:连接rabbitmq
$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88',//host 'port' => '5672',//端口 'vhost' => '/',//虚拟主机 'user' => 'admin',//账号 'pass' => 'admin',//密码]);$context = $factory->createContext();
2:声明主题
//声明并创建主题$exchangeName = 'exchange';$fooTopic = $context->createTopic($exchangeName);$fooTopic->setType(AmqpTopic::TYPE_FANOUT);$context->declareTopic($fooTopic); //删除主题$context->deleteTopic($fooTopic);
3:声明队列
//声明并创建队列$queueName = 'rabbitmq';$fooQueue = $context->createQueue($queueName);$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);$context->declareQueue($fooQueue); //删除队列$context->deleteQueue($fooQueue);
4:将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
5:发送消息
//向队列发送消息$message = $context->createMessage('Hello world!');$context->createProducer()->send($fooQueue, $message); //向队列发送优先消息$queueName = 'rabbitmq';$fooQueue = $context->createQueue(queueName);$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);//设置队列的最大优先级$fooQueue->setArguments(['x-max-priority' => 10]);$context->declareQueue($fooQueue); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者 ->send($fooQueue, $message); //向队列发送延时消息$message = $context->createMessage('Hello world!'); $context->createProducer() ->setDelayStrategy(new RabbitMqDlxDelayStrategy()) ->setDeliveryDelay(5000) //消息延时5秒 ->send($fooQueue, $message);
6:消费消息【接收消息】
//消费消息$consumer = $context->createConsumer($fooQueue); $message = $consumer->receive(); // process a message//业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 //订阅消费者$fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer();$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务 // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 return true;});$subscriptionConsumer->consume(); //清除队列消息$queueName = 'rabbitmq';$queue = $context->createQueue($queueName);$context->purgeQueue($queue);
三:简单实现
1:发送消息
//连接rabbitmq$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false,]); $context = $factory->createContext();//声明主题$exchangeName = 'exchange';$fooTopic = $context->createTopic($exchangeName);$fooTopic->setType(AmqpTopic::TYPE_FANOUT);$context->declareTopic($fooTopic); //声明队列$queueName = 'rabbitmq';$fooQueue = $context->createQueue($queueName);$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);$context->declareQueue($fooQueue); //将队列绑定到主题$context->bind(new AmqpBind($fooTopic, $fooQueue)); //发送消息到队列$message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message);
2:消费消息
$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false,]);$context = $factory->createContext(); $queueName = 'rabbitmq';$fooQueue = $context->createQueue($queueName); $fooConsumer = $context->createConsumer($fooQueue); $subscriptionConsumer = $context->createSubscriptionConsumer();$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message //业务代码 $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务 // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务 return true;});$subscriptionConsumer->consume();