File: /home/accemeff/vendor/yiisoft/yii2-queue/src/drivers/amqp/Queue.php
<?php
/**
* @link http://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license http://www.yiiframework.com/license/
*/
namespace yii\queue\amqp;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\Application as BaseApp;
use yii\base\Event;
use yii\base\NotSupportedException;
use yii\queue\cli\Queue as CliQueue;
/**
* Amqp Queue.
*
* @deprecated since 2.0.2 and will be removed in 2.1. Consider using amqp_interop driver instead.
*
* @author Roman Zhuravlev <zhuravljov@gmail.com>
*/
class Queue extends CliQueue
{
public $host = 'localhost';
public $port = 5672;
public $user = 'guest';
public $password = 'guest';
public $queueName = 'queue';
public $exchangeName = 'exchange';
public $vhost = '/';
/**
* @var string command class name
*/
public $commandClass = Command::class;
/**
* @var AMQPStreamConnection
*/
protected $connection;
/**
* @var AMQPChannel
*/
protected $channel;
/**
* @inheritdoc
*/
public function init()
{
parent::init();
Event::on(BaseApp::class, BaseApp::EVENT_AFTER_REQUEST, function () {
$this->close();
});
}
/**
* Listens amqp-queue and runs new jobs.
*/
public function listen()
{
$this->open();
$callback = function (AMQPMessage $payload) {
$id = $payload->get('message_id');
list($ttr, $message) = explode(';', $payload->body, 2);
if ($this->handleMessage($id, $message, $ttr, 1)) {
$payload->delivery_info['channel']->basic_ack($payload->delivery_info['delivery_tag']);
}
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
/**
* @inheritdoc
*/
protected function pushMessage($message, $ttr, $delay, $priority)
{
if ($delay) {
throw new NotSupportedException('Delayed work is not supported in the driver.');
}
if ($priority !== null) {
throw new NotSupportedException('Job priority is not supported in the driver.');
}
$this->open();
$id = uniqid('', true);
$this->channel->basic_publish(
new AMQPMessage("$ttr;$message", [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $id,
]),
$this->exchangeName
);
return $id;
}
/**
* @inheritdoc
*/
public function status($id)
{
throw new NotSupportedException('Status is not supported in the driver.');
}
/**
* Opens connection and channel.
*/
protected function open()
{
if ($this->channel) {
return;
}
$this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
$this->channel = $this->connection->channel();
$this->channel->queue_declare($this->queueName, false, true, false, false);
$this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$this->channel->queue_bind($this->queueName, $this->exchangeName);
}
/**
* Closes connection and channel.
*/
protected function close()
{
if (!$this->channel) {
return;
}
$this->channel->close();
$this->connection->close();
}
}