File "DatabaseConnection.php"

Full Path: /home/capoeirajd/www/wp-content/plugins/wp-migrate-db/class/Common/Queue/Connections/DatabaseConnection.php
File size: 6.1 KB
MIME-type: text/x-php
Charset: utf-8

<?php

namespace DeliciousBrains\WPMDB\Common\Queue\Connections;

use DeliciousBrains\WPMDB\Container\Brumann\Polyfill\Unserialize;
use DateTime;
use DeliciousBrains\WPMDB\Common\Queue\Exceptions\InvalidJobTypeException;
use Exception;
use DeliciousBrains\WPMDB\Common\Queue\Job;
use wpdb;

class DatabaseConnection implements ConnectionInterface {

	/**
	 * @var wpdb
	 */
	protected $database;

    /**
     * @var array
     */
    protected $allowed_job_classes = [];

	/**
	 * @var string
	 */
	protected $jobs_table;

	/**
	 * @var string
	 */
	protected $failures_table;

    /**
     * DatabaseQueue constructor.
     *
     * @param wpdb  $wpdb
     * @param array $allowed_job_classes Job classes that may be handled, default any Job subclass.
     */
    public function __construct(wpdb $wpdb, array $allowed_job_classes = [])
    {
        $this->database            = $wpdb;
        $this->allowed_job_classes = $allowed_job_classes;
        $this->jobs_table          = $this->database->prefix . 'queue_jobs';
        $this->failures_table      = $this->database->prefix . 'queue_failures';
    }

	/**
	 * Push a job onto the queue.
	 *
	 * @param Job $job
	 * @param int $delay
	 *
	 * @return bool|int
	 */
	public function push( Job $job, $delay = 0 ) {
		$result = $this->database->insert( $this->jobs_table, array(
			'job'          => serialize( $job ),
			'available_at' => $this->datetime( $delay ),
			'created_at'   => $this->datetime(),
		) );

		if ( ! $result ) {
			return false;
		}

		return $this->database->insert_id;
	}

	/**
	 * Retrieve a job from the queue.
	 *
	 * @return bool|Job
	 */
	public function pop() {
		$this->release_reserved();

		$sql = $this->database->prepare( "
			SELECT * FROM {$this->jobs_table}
			WHERE reserved_at IS NULL
			AND available_at <= %s
			ORDER BY available_at
			LIMIT 1
		", $this->datetime() );

		$raw_job = $this->database->get_row( $sql );

		if ( is_null( $raw_job ) ) {
			return false;
		}

		$job = $this->vitalize_job( $raw_job );

        if ($job && is_a($job, Job::class)) {
            $this->reserve($job);
        }

		return $job;
	}

	/**
	 * Delete a job from the queue.
	 *
	 * @param Job $job
	 *
	 * @return bool
	 */
	public function delete( $job ) {
        if (is_a($job, Job::class)) {
            $id = $job->id();
        } elseif (is_object($job) && property_exists($job, 'id')) {
            $raw_job = (object)$job;
            $id      = $raw_job->id;
        } else {
            return false;
        }

		$where = array(
			'id' => $id,
		);

		if ( $this->database->delete( $this->jobs_table, $where ) ) {
			return true;
		}

		return false;
	}

	/**
	 * Release a job back onto the queue.
	 *
	 * @param Job $job
	 *
	 * @return bool
	 */
	public function release( Job $job ) {
		$data  = array(
			'job'         => serialize( $job ),
			'attempts'    => $job->attempts(),
			'reserved_at' => null,
		);
		$where = array(
			'id' => $job->id(),
		);

		if ( $this->database->update( $this->jobs_table, $data, $where ) ) {
			return true;
		}

		return false;
	}

	/**
	 * Push a job onto the failure queue.
	 *
	 * @param Job       $job
	 * @param Exception $exception
	 *
	 * @return bool
	 */
	public function failure( $job, Exception $exception ) {
		$insert = $this->database->insert( $this->failures_table, array(
			'job'       => serialize( $job ),
			'error'     => $this->format_exception( $exception ),
			'failed_at' => $this->datetime(),
		) );

		if ( $insert ) {
			$this->delete( $job );

			return true;
		}

		return false;
	}

	/**
	 * Get total jobs in the queue.
	 *
	 * @return int
	 */
	public function jobs() {
		$sql = "SELECT COUNT(*) FROM {$this->jobs_table}";

		return (int) $this->database->get_var( $sql );
	}

	/**
	 * Get total jobs in the failures queue.
	 *
	 * @return int
	 */
	public function failed_jobs() {
		$sql = "SELECT COUNT(*) FROM {$this->failures_table}";

		return (int) $this->database->get_var( $sql );
	}

	/**
	 * Reserve a job in the queue.
	 *
	 * @param Job $job
	 */
	protected function reserve( $job ) {
		$data = array(
			'reserved_at' => $this->datetime(),
		);

		$this->database->update( $this->jobs_table, $data, array(
			'id' => $job->id(),
		) );
	}

	/**
	 * Release reserved jobs back onto the queue.
	 */
	protected function release_reserved() {
		$expired = $this->datetime( -300 );

		$sql = $this->database->prepare( "
				UPDATE {$this->jobs_table}
				SET attempts = attempts + 1, reserved_at = NULL
				WHERE reserved_at <= %s", $expired );

		$this->database->query( $sql );
	}

    /**
     * Vitalize Job with latest data.
     *
     * @param mixed $raw_job
     *
     * @return Job|bool
     */
    protected function vitalize_job($raw_job)
    {
        $options = [];
        if ( ! empty($this->allowed_job_classes)) {
            $options['allowed_classes'] = $this->allowed_job_classes;
        }

        // Because we support PHP versions less than 7.0 we need to use the polyfill.
        $job = Unserialize::unserialize($raw_job->job, $options);

        if ( ! is_a($job, Job::class)) {
            $this->failure($raw_job, new InvalidJobTypeException());

            return false;
        }

        $job->set_id($raw_job->id);
        $job->set_attempts($raw_job->attempts);
        $job->set_reserved_at(empty($raw_job->reserved_at) ? null : new DateTime($raw_job->reserved_at));
        $job->set_available_at(new DateTime($raw_job->available_at));
        $job->set_created_at(new DateTime($raw_job->created_at));

        return $job;
    }

	/**
	 * Get MySQL datetime.
	 *
	 * @param int $offset Seconds, can pass negative int.
	 *
	 * @return string
	 */
	protected function datetime( $offset = 0 ) {
		$timestamp = time() + $offset;

		return gmdate( 'Y-m-d H:i:s', $timestamp );
	}

	/**
	 * Format an exception error string.
	 *
	 * @param Exception $exception
	 *
	 * @return string
	 */
	protected function format_exception( Exception $exception ) {
		$string  = get_class( $exception );
		$message = $exception->getMessage();
		if ( ! empty( $message ) ) {
			$string .= " : {$exception->getMessage()}";
		}
		$code = $exception->getCode();
		if ( ! empty( $code ) ) {
			$string .= " (#{$code})";
		}

		return $string;
	}

}