MOON
Server: Apache
System: Linux res.emeff.ca 3.10.0-962.3.2.lve1.5.24.10.el7.x86_64 #1 SMP Wed Mar 20 07:36:02 EDT 2019 x86_64
User: accemeff (1004)
PHP: 7.0.33
Disabled: NONE
Upload Files
File: /home/accemeff/vendor/craftcms/cms/src/queue/Queue.php
<?php
/**
 * @link https://craftcms.com/
 * @copyright Copyright (c) Pixel & Tonic, Inc.
 * @license https://craftcms.github.io/license/
 */

namespace craft\queue;

use Craft;
use craft\db\Table;
use craft\helpers\Db;
use craft\helpers\Json;
use craft\helpers\UrlHelper;
use yii\base\Exception;
use yii\db\Query;
use yii\queue\cli\Signal;
use yii\queue\ExecEvent;
use yii\web\Response;

/**
 * Craft Queue
 *
 * @author Pixel & Tonic, Inc. <support@pixelandtonic.com>
 * @author Roman Zhuravlev <zhuravljov@gmail.com>
 * @since 3.0
 */
class Queue extends \yii\queue\cli\Queue implements QueueInterface
{
    // Properties
    // =========================================================================

    /**
     * @see isFailed()
     */
    const STATUS_FAILED = 4;

    // Properties
    // =========================================================================

    /**
     * @var int timeout
     */
    public $mutexTimeout = 3;

    /**
     * @var string command class name
     */
    public $commandClass = Command::class;

    /**
     * @var string|null The description of the job being pushed into the queue
     */
    private $_jobDescription;

    /**
     * @var string|null The currently-executing job ID
     */
    private $_executingJobId;

    /**
     * @var int The timestamp the last job was reserved
     */
    private $_reserveTime;

    /**
     * @var bool Whether we're already listening for the web response
     */
    private $_listeningForResponse = false;

    // Public Methods
    // =========================================================================

    /**
     * @inheritdoc
     */
    public function init()
    {
        parent::init();

        $this->on(self::EVENT_BEFORE_EXEC, function(ExecEvent $e) {
            $this->_executingJobId = $e->id;
        });

        $this->on(self::EVENT_AFTER_EXEC, function(ExecEvent $e) {
            $this->_executingJobId = null;
        });
    }

    /**
     * @inheritdoc
     */
    public function run()
    {
        while (!Signal::isExit() && ($payload = $this->reserve())) {
            if ($this->handleMessage($payload['id'], $payload['job'], $payload['ttr'], $payload['attempt'])) {
                $this->release($payload['id']);
            }
        }
    }

    /**
     * Listens to the queue and runs new jobs.
     *
     * @param integer $delay number of seconds for waiting new job.
     */
    public function listen(int $delay)
    {
        do {
            $this->run();
        } while (!$delay || sleep($delay) === 0);
    }

    /**
     * @param string $id of a job message
     * @return bool
     */
    public function isFailed(string $id): bool
    {
        return $this->status($id) === self::STATUS_FAILED;
    }

    /**
     *
     */
    public function status($id)
    {
        $payload = $this->_createJobQuery()
            ->select(['fail', 'timeUpdated'])
            ->where(['id' => $id])
            ->one();

        return $this->_status($payload);
    }

    /**
     * @inheritdoc
     */
    public function push($job)
    {
        // Capture the description so pushMessage() can access it
        if ($job instanceof JobInterface) {
            $this->_jobDescription = $job->getDescription();
        } else {
            $this->_jobDescription = null;
        }

        if (($id = parent::push($job)) === null) {
            return null;
        }

        // Have the response kick off a new queue runner if this is a site request
        if (Craft::$app->getConfig()->getGeneral()->runQueueAutomatically && !$this->_listeningForResponse) {
            $request = Craft::$app->getRequest();
            if ($request->getIsSiteRequest() && !$request->getIsAjax()) {
                Craft::$app->getResponse()->on(Response::EVENT_AFTER_PREPARE, [$this, 'handleResponse']);
                $this->_listeningForResponse = true;
            }
        }

        return $id;
    }

    /**
     * @inheritdoc
     */
    public function retry(string $id)
    {
        Craft::$app->getDb()->createCommand()
            ->update(
                Table::QUEUE,
                [
                    'dateReserved' => null,
                    'timeUpdated' => null,
                    'progress' => 0,
                    'attempt' => 0,
                    'fail' => false,
                    'dateFailed' => null,
                    'error' => null,
                ],
                ['id' => $id],
                [],
                false
            )
            ->execute();
    }

    /**
     * @inheritdoc
     */
    public function release(string $id)
    {
        Craft::$app->getDb()->createCommand()
            ->delete(Table::QUEUE, ['id' => $id])
            ->execute();
    }

    /**
     * @inheritdoc
     */
    public function setProgress(int $progress)
    {
        Craft::$app->getDb()->createCommand()
            ->update(
                Table::QUEUE,
                [
                    'progress' => $progress,
                    'timeUpdated' => time(),
                ],
                ['id' => $this->_executingJobId],
                [],
                false
            )
            ->execute();
    }

    /**
     * @inheritdoc
     */
    public function getHasWaitingJobs(): bool
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createWaitingJobQuery()->exists();
    }

    /**
     * @inheritdoc
     */
    public function getHasReservedJobs(): bool
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createReservedJobQuery()->exists();
    }

    /**
     * Returns the total number of waiting jobs
     *
     * @return int
     */
    public function getTotalWaiting(): int
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createWaitingJobQuery()->count();
    }

    /**
     * Returns the total number of delayed jobs
     *
     * @return int
     */
    public function getTotalDelayed(): int
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createDelayedJobQuery()->count();
    }

    /**
     * Returns the total number of reserved jobs
     *
     * @return int
     */
    public function getTotalReserved(): int
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createReservedJobQuery()->count();
    }

    /**
     * Returns the total number of failed jobs
     *
     * @return int
     */
    public function getTotalFailed(): int
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        return $this->_createFailedJobQuery()->count();
    }

    /**
     * @inheritdoc
     */
    public function getJobInfo(int $limit = null): array
    {
        // Move expired messages into waiting list
        $this->_moveExpired();

        $results = $this->_createJobQuery()
            ->select(['id', 'description', 'progress', 'timeUpdated', 'fail', 'error'])
            ->where('[[timePushed]] <= :time - [[delay]]', [':time' => time()])
            ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
            ->limit($limit)
            ->all();

        $info = [];

        foreach ($results as $result) {
            $info[] = [
                'id' => $result['id'],
                'status' => $this->_status($result),
                'progress' => (int)$result['progress'],
                'description' => $result['description'],
                'error' => $result['error'],
            ];
        }

        return $info;
    }

    /**
     * @inheritdoc
     */
    public function handleError($id, $job, $ttr, $attempt, $error)
    {
        $this->_executingJobId = null;

        if (parent::handleError($id, $job, $ttr, $attempt, $error)) {
            // Log the exception
            Craft::$app->getErrorHandler()->logException($error);

            // Mark the job as failed
            Craft::$app->getDb()->createCommand()
                ->update(
                    Table::QUEUE,
                    [
                        'fail' => true,
                        'dateFailed' => Db::prepareDateForDb(new \DateTime()),
                        'error' => $error->getMessage(),
                    ],
                    ['id' => $id],
                    [],
                    false
                )
                ->execute();
        }

        // Don't tell run() to release the job
        return false;
    }

    /**
     * Figure out how to initiate a new worker.
     */
    public function handleResponse()
    {
        // Prevent this from getting called twice
        $response = Craft::$app->getResponse();
        $response->off(Response::EVENT_AFTER_PREPARE, [$this, 'handleResponse']);

        // Ignore if any jobs are currently reserved
        if ($this->getHasReservedJobs()) {
            return;
        }

        // Ignore if this isn't an HTML/XHTML response
        if (!in_array($response->getContentType(), ['text/html', 'application/xhtml+xml'], true)) {
            return;
        }

        // Include JS that tells the browser to fire an Ajax request to kick off a new queue runner
        // (Ajax request code adapted from http://www.quirksmode.org/js/xmlhttp.html - thanks ppk!)
        $url = Json::encode(UrlHelper::actionUrl('queue/run'));
        $js = <<<EOD
<script type="text/javascript">
/*<![CDATA[*/
(function(){
    var XMLHttpFactories = [
        function () {return new XMLHttpRequest()},
        function () {return new ActiveXObject("Msxml2.XMLHTTP")},
        function () {return new ActiveXObject("Msxml3.XMLHTTP")},
        function () {return new ActiveXObject("Microsoft.XMLHTTP")}
    ];
    var req = false;
    for (var i = 0; i < XMLHttpFactories.length; i++) {
        try {
            req = XMLHttpFactories[i]();
        }
        catch (e) {
            continue;
        }
        break;
    }
    if (!req) return;
    req.open('GET', $url, true);
    if (req.readyState == 4) return;
    req.send();
})();
/*]]>*/
</script>
EOD;

        if ($response->content === null) {
            $response->content = $js;
        } else {
            $response->content .= $js;
        }
    }

    // Protected Methods
    // =========================================================================

    /**
     * @inheritdoc
     */
    protected function pushMessage($message, $ttr, $delay, $priority)
    {
        $db = Craft::$app->getDb();
        $db->createCommand()
            ->insert(
                Table::QUEUE,
                [
                    'job' => $message,
                    'description' => $this->_jobDescription,
                    'timePushed' => time(),
                    'ttr' => $ttr,
                    'delay' => $delay,
                    'priority' => $priority ?: 1024,
                ],
                false)
            ->execute();

        return $db->getLastInsertID(Table::QUEUE);
    }

    /**
     * @return array|null The payload, or null if there aren't any jobs to reserve
     * @throws Exception in case it hasn't waited the lock
     */
    protected function reserve()
    {
        $mutex = Craft::$app->getMutex();

        if (!$mutex->acquire(__CLASS__, $this->mutexTimeout)) {
            throw new Exception('Has not waited the lock.');
        }

        // Move expired messages into waiting list
        $this->_moveExpired();

        // Reserve one message
        $payload = $this->_createJobQuery()
            ->where(['and', ['fail' => false, 'timeUpdated' => null], '[[timePushed]] <= :time - [[delay]]'], [':time' => time()])
            ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
            ->limit(1)
            ->one();

        if (is_array($payload)) {
            $payload['dateReserved'] = new \DateTime();
            $payload['timeUpdated'] = $payload['dateReserved']->getTimestamp();
            $payload['attempt'] = (int)$payload['attempt'] + 1;
            Craft::$app->getDb()->createCommand()
                ->update(
                    Table::QUEUE,
                    [
                        'dateReserved' => Db::prepareDateForDb($payload['dateReserved']),
                        'timeUpdated' => $payload['timeUpdated'],
                        'attempt' => $payload['attempt']
                    ],
                    ['id' => $payload['id']],
                    [],
                    false
                )
                ->execute();
        }

        $mutex->release(__CLASS__);

        // pgsql
        if (is_array($payload) && is_resource($payload['job'])) {
            $payload['job'] = stream_get_contents($payload['job']);
        }

        return $payload;
    }

    // Private Methods
    // =========================================================================

    /**
     * Moves expired messages into waiting list.
     */
    private function _moveExpired()
    {
        if ($this->_reserveTime !== time()) {
            $this->_reserveTime = time();
            Craft::$app->getDb()->createCommand()
                ->update(
                    Table::QUEUE,
                    [
                        'dateReserved' => null,
                        'timeUpdated' => null,
                        'progress' => 0,
                    ],
                    '[[timeUpdated]] < :time - [[ttr]]',
                    [':time' => $this->_reserveTime],
                    false
                )
                ->execute();
        }
    }

    /**
     * Returns a new query for jobs.
     *
     * @return Query
     */
    private function _createJobQuery(): Query
    {
        return (new Query())
            ->from(Table::QUEUE);
    }

    /**
     * Returns a new query for waiting jobs.
     *
     * @return Query
     */
    private function _createWaitingJobQuery(): Query
    {
        return $this->_createJobQuery()
            ->where(['fail' => false, 'timeUpdated' => null])
            ->andWhere('[[timePushed]] + [[delay]] <= :time', ['time' => time()]);
    }

    /**
     * Returns a new query for delayed jobs.
     *
     * @return Query
     */
    private function _createDelayedJobQuery(): Query
    {
        return $this->_createJobQuery()
            ->where(['fail' => false, 'timeUpdated' => null])
            ->andWhere('[[timePushed]] + [[delay]] > :time', ['time' => time()]);
    }

    /**
     * Returns a new query for reserved jobs.
     *
     * @return Query
     */
    private function _createReservedJobQuery(): Query
    {
        return $this->_createJobQuery()
            ->where(['and', ['fail' => false], ['not', ['timeUpdated' => null]]]);
    }

    /**
     * Returns a new query for failed jobs.
     *
     * @return Query
     */
    private function _createFailedJobQuery(): Query
    {
        return $this->_createJobQuery()
            ->where(['fail' => true]);
    }

    /**
     * Returns a job's status.
     *
     * @param array|false $payload
     * @return int
     */
    private function _status($payload): int
    {
        if (!$payload) {
            return self::STATUS_DONE;
        }

        if ($payload['fail']) {
            return self::STATUS_FAILED;
        }

        if (!$payload['timeUpdated']) {
            return self::STATUS_WAITING;
        }

        return self::STATUS_RESERVED;
    }
}