/**
* Track a list of known queues.
*/
-class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
+class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Core\HookInterface {
/**
* Get a list of valid statuses.
];
}
+ /**
+ * Queues which contain `CRM_Queue_Task` records should use the `task` runner to evaluate them.
+ *
+ * @code
+ * $q = Civi::queue('do-stuff', ['type' => 'Sql', 'runner' => 'task']);
+ * $q->createItem(new CRM_Queue_Task('my_callback_func', [1,2,3]));
+ * @endCode
+ *
+ * @param \CRM_Queue_Queue $queue
+ * @param array $items
+ * @param array $outcomes
+ * @throws \API_Exception
+ * @see CRM_Utils_Hook::queueRun()
+ */
+ public static function hook_civicrm_queueRun_task(CRM_Queue_Queue $queue, array $items, array &$outcomes) {
+ foreach ($items as $itemPos => $item) {
+ $outcomes[$itemPos] = (new \CRM_Queue_TaskRunner())->run($queue, $item);
+ }
+ }
+
}
--- /dev/null
+<?php
+/*
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved. |
+ | |
+ | This work is published under the GNU AGPLv3 license with some |
+ | permitted exceptions and without any warranty. For full license |
+ | and copyright information, see https://civicrm.org/licensing |
+ +--------------------------------------------------------------------+
+ */
+
+/**
+ * `CRM_Queue_TaskRunner` a list tasks from a queue. It is designed to supported background
+ * tasks which run automatically.
+ *
+ * This runner is not appropriate for all queues or workloads, so you might choose or create
+ * a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists.
+ *
+ * @see CRM_Queue_Runner
+ */
+class CRM_Queue_TaskRunner {
+
+ /**
+ * @param \CRM_Queue_Queue $queue
+ * @param $item
+ * @return string
+ * One of the following:
+ * - 'ok': Task executed normally. Removed from queue.
+ * - 'retry': Task encountered an error. Will try again later.
+ * - 'delete': Task encountered an error. Will not try again later. Removed from queue.
+ * - 'abort': Task encountered an error. Will not try again later. Stopped the queue.
+ * @throws \API_Exception
+ */
+ public function run(CRM_Queue_Queue $queue, $item): string {
+ $this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.');
+
+ /** @var \CRM_Queue_Task $task */
+ $task = $item->data;
+
+ /** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */
+
+ if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) {
+ \Civi::log()->debug("Skipping exhausted task: " . $task->title);
+ $outcome = $queue->getSpec('error');
+ $exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted');
+ }
+ else {
+ \Civi::log()->debug("Running task: " . $task->title);
+ try {
+ $runResult = $task->run($this->createContext($queue));
+ $outcome = $runResult ? 'ok' : $queue->getSpec('error');
+ $exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false');
+ }
+ catch (\Exception $e) {
+ $outcome = $queue->getSpec('error');
+ $exception = $e;
+ }
+
+ if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) {
+ $outcome = 'retry';
+ }
+ }
+
+ if ($outcome !== 'ok') {
+ \CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception);
+ }
+
+ if ($outcome === 'ok') {
+ $queue->deleteItem($item);
+ return $outcome;
+ }
+
+ $logDetails = [
+ 'id' => $queue->getName() . '#' . $item->id,
+ 'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']),
+ 'outcome' => $outcome,
+ 'message' => $exception ? $exception->getMessage() : NULL,
+ 'exception' => $exception,
+ ];
+
+ switch ($outcome) {
+ case 'retry':
+ \Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails);
+ $queue->releaseItem($item);
+ break;
+
+ case 'delete':
+ \Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails);
+ $queue->deleteItem($item);
+ break;
+
+ case 'abort':
+ \Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails);
+ $queue->setStatus('aborted');
+ $queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */
+ break;
+
+ default:
+ \Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails);
+ break;
+ }
+
+ return $outcome;
+ }
+
+ /**
+ * @param \CRM_Queue_Queue $queue
+ * return CRM_Queue_TaskContext;
+ */
+ private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext {
+ $taskCtx = new \CRM_Queue_TaskContext();
+ $taskCtx->queue = $queue;
+ $taskCtx->log = \CRM_Core_Error::createDebugLogger();
+ return $taskCtx;
+ }
+
+ private function assertType($object, array $types, string $message) {
+ foreach ($types as $type) {
+ if ($object instanceof $type) {
+ return;
+ }
+ }
+ throw new \Exception($message);
+ }
+
+ private function isRetriable(\CRM_Queue_Queue $queue, $item): bool {
+ return property_exists($item, 'run_count')
+ && is_numeric($queue->getSpec('retry_limit'))
+ && $queue->getSpec('retry_limit') + 1 > $item->run_count;
+ }
+
+}
);
}
+ /**
+ * Fire `hook_civicrm_queueRun_{$runner}`.
+ *
+ * This event only fires if these conditions are met:
+ *
+ * 1. The `$queue` has been persisted in `civicrm_queue`.
+ * 2. The `$queue` has a `runner` property.
+ * 3. The `$queue` has some pending tasks.
+ * 4. The system has a queue-running agent.
+ *
+ * @param \CRM_Queue_Queue $queue
+ * @param array $items
+ * List of claimed items which we may evaluate.
+ * @param array $outcomes
+ * The outcomes of each task. One of 'ok', 'retry', 'fail'.
+ * Keys should match the keys in $items.
+ */
+ public static function queueRun(CRM_Queue_Queue $queue, array $items, &$outcomes) {
+ $runner = $queue->getSpec('runner');
+ if (empty($runner) || !preg_match(';^[A-Za-z0-9_]+$;', $runner)) {
+ throw new \CRM_Core_Exception("Cannot autorun queue: " . $queue->getName());
+ }
+ return self::singleton()->invoke(['queue', 'items', 'outcomes'], $queue, $items,
+ $outcomes, $exception, self::$_nullObject, self::$_nullObject,
+ 'civicrm_queueRun_' . $runner
+ );
+ }
+
+ /**
+ * This is called if automatic execution of a queue-task fails.
+ *
+ * The `$outcome` may be modified. For example, you might inspect the $item and $exception -- and then
+ * decide whether to 'retry', 'delete', or 'abort'.
+ *
+ * @param \CRM_Queue_Queue $queue
+ * @param \CRM_Queue_DAO_QueueItem|\stdClass $item
+ * The enqueued item $item.
+ * In principle, this is the $item format determined by the queue, which includes `id` and `data`.
+ * In practice, it is typically an instance of `CRM_Queue_DAO_QueueItem`.
+ * @param string $outcome
+ * The outcome of the task. Legal values:
+ * - 'retry': The task encountered a problem, and it should be retried.
+ * - 'delete': The task encountered a non-recoverable problem, and it should be deleted.
+ * - 'abort': The task encountered a non-recoverable problem, and the queue should be stopped.
+ * - 'ok': The task finished normally. (You won't generally see this, but it could be useful in some customizations.)
+ * The default outcome for task-errors is determined by the queue settings (`civicrm_queue.error`).
+ * @param \Throwable|null $exception
+ * If the task failed, this is the cause of the failure.
+ */
+ public static function queueTaskError(CRM_Queue_Queue $queue, $item, &$outcome, ?Throwable $exception) {
+ return self::singleton()->invoke(['job', 'params'], $queue, $item,
+ $outcome, $exception, self::$_nullObject, self::$_nullObject,
+ 'civicrm_queueTaskError'
+ );
+ }
+
/**
* This hook is called before a scheduled job is executed
*