File: /home/marketing.cfbon.ru/public_html/vendor/google/cloud-core/src/Batch/BatchJob.php
<?php
/**
 * Copyright 2017 Google Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
namespace Google\Cloud\Core\Batch;
use Google\Cloud\Core\SysvTrait;
/**
 * Represent batch jobs.
 *
 * @experimental The experimental flag means that while we believe this method
 *      or class is ready for use, it may change before release in backwards-
 *      incompatible ways. Please use with caution, and test thoroughly when
 *      upgrading.
 */
class BatchJob implements JobInterface
{
    const DEFAULT_BATCH_SIZE = 100;
    const DEFAULT_CALL_PERIOD = 2.0;
    const DEFAULT_WORKERS = 1;
    use JobTrait;
    use SysvTrait;
    use InterruptTrait;
    use HandleFailureTrait;
    /**
     * @var callable The batch job handler. This callable accepts an array
     *      of items and should return a boolean.
     */
    private $func;
    /**
     * @var int The size of the batch.
     */
    private $batchSize;
    /**
     * @var float The period in seconds from the last execution to force
     *      executing the job.
     */
    private $callPeriod;
    /**
     * @param string $identifier Unique identifier of the job.
     * @param callable $func Any Callable except for Closure. The callable
     *        should accept an array of items as the first argument.
     * @param int $idNum A numeric id for the job.
     * @param array $options [optional] {
     *     Configuration options.
     *
     *     @type int $batchSize The size of the batch. **Defaults to** `100`.
     *     @type float $callPeriod The period in seconds from the last execution
     *                 to force executing the job. **Defaults to** `2.0`.
     *     @type int $numWorkers The number of child processes. It only takes
     *               effect with the {@see \Google\Cloud\Core\Batch\BatchDaemon}.
     *               **Defaults to** `1`.
     *     @type string $bootstrapFile A file to load before executing the
     *                  job. It's needed for registering global functions.
     * }
     */
    public function __construct(
        $identifier,
        $func,
        $idNum,
        array $options = []
    ) {
        $options += [
            'batchSize' => self::DEFAULT_BATCH_SIZE,
            'callPeriod' => self::DEFAULT_CALL_PERIOD,
            'bootstrapFile' => null,
            'numWorkers' => self::DEFAULT_WORKERS
        ];
        $this->identifier = $identifier;
        $this->func = $func;
        $this->id = $idNum;
        $this->batchSize = $options['batchSize'];
        $this->callPeriod = $options['callPeriod'];
        $this->bootstrapFile = $options['bootstrapFile'];
        $this->numWorkers = $options['numWorkers'];
        $this->initFailureFile();
    }
    /**
     * Run the job.
     */
    public function run()
    {
        $this->setupSignalHandlers();
        $sysvKey = $this->getSysvKey($this->id);
        $q = msg_get_queue($sysvKey);
        $items = [];
        $lastInvoked = microtime(true);
        if (!is_null($this->bootstrapFile)) {
            require_once($this->bootstrapFile);
        }
        while (true) {
            // Fire SIGALRM after 1 second to unblock the blocking call.
            pcntl_alarm(1);
            if (msg_receive(
                $q,
                0,
                $type,
                8192,
                $message,
                true,
                0, // blocking mode
                $errorcode
            )) {
                if ($type === self::$typeDirect) {
                    $items[] = $message;
                } elseif ($type === self::$typeFile) {
                    $items[] = unserialize(file_get_contents($message));
                    @unlink($message);
                }
            }
            pcntl_signal_dispatch();
            // It runs the job when
            // 1. Number of items reaches the batchSize.
            // 2-a. Count is >0 and the current time is larger than lastInvoked + period.
            // 2-b. Count is >0 and the shutdown flag is true.
            if ((count($items) >= $this->batchSize)
                || (count($items) > 0
                    && (microtime(true) > $lastInvoked + $this->callPeriod
                        || $this->shutdown))) {
                printf(
                    'Running the job with %d items' . PHP_EOL,
                    count($items)
                );
                $this->flush($items);
                $items = [];
                $lastInvoked = microtime(true);
            }
            gc_collect_cycles();
            if ($this->shutdown) {
                return;
            }
        }
    }
    /**
     * Finish any pending activity for this job.
     *
     * @param array $items
     * @return bool
     */
    public function flush(array $items = [])
    {
        if (! $this->callFunc($items)) {
            $this->handleFailure($this->id, $items);
            return false;
        }
        return true;
    }
    /**
     * Finish any pending activity for this job.
     *
     * @access private
     * @internal
     *
     * @param array $items
     * @return bool
     */
    public function callFunc(array $items = [])
    {
        return call_user_func_array($this->func, [$items]);
    }
    /**
     * Returns the period in seconds from the last execution to force
     * executing the job.
     *
     * @return float
     */
    public function getCallPeriod()
    {
        return $this->callPeriod;
    }
    /**
     * Returns the batch size.
     *
     * @return int
     */
    public function getBatchSize()
    {
        return $this->batchSize;
    }
}