CRM_Queue_TaskRunner - Add background-friendly handler for `CRM_Queue_Task`s
authorTim Otten <totten@civicrm.org>
Wed, 2 Feb 2022 08:29:13 +0000 (00:29 -0800)
committerTim Otten <totten@civicrm.org>
Thu, 2 Jun 2022 20:31:59 +0000 (13:31 -0700)
CRM/Queue/BAO/Queue.php
CRM/Queue/Runner.php
CRM/Queue/TaskRunner.php [new file with mode: 0644]
CRM/Utils/Hook.php

index 55175e32bd9495ed442c39979ce380094dee604e..35f8be2b9d5ef9977dff4cd8de8037cae000fdac 100644 (file)
@@ -18,7 +18,7 @@
 /**
  * Track a list of known queues.
  */
-class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
+class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue implements \Civi\Core\HookInterface {
 
   /**
    * Get a list of valid statuses.
@@ -76,4 +76,24 @@ class CRM_Queue_BAO_Queue extends CRM_Queue_DAO_Queue {
     ];
   }
 
+  /**
+   * Queues which contain `CRM_Queue_Task` records should use the `task` runner to evaluate them.
+   *
+   * @code
+   * $q = Civi::queue('do-stuff', ['type' => 'Sql', 'runner' => 'task']);
+   * $q->createItem(new CRM_Queue_Task('my_callback_func', [1,2,3]));
+   * @endCode
+   *
+   * @param \CRM_Queue_Queue $queue
+   * @param array $items
+   * @param array $outcomes
+   * @throws \API_Exception
+   * @see CRM_Utils_Hook::queueRun()
+   */
+  public static function hook_civicrm_queueRun_task(CRM_Queue_Queue $queue, array $items, array &$outcomes) {
+    foreach ($items as $itemPos => $item) {
+      $outcomes[$itemPos] = (new \CRM_Queue_TaskRunner())->run($queue, $item);
+    }
+  }
+
 }
index 4985bda7680ea349f4c63b29629885a2697dfe61..ee69cb63106b8555832c86b748ee6a0cf8bb49d9 100644 (file)
@@ -26,9 +26,9 @@
  *   This is used by some CLI upgrades.
  *
  * This runner is not appropriate for all queues or workloads, so you might choose or create
- * a different runner. For example, `CRM_Queue_Autorunner` is geared toward background task lists.
+ * a different runner. For example, `CRM_Queue_TaskRunner` is geared toward background task lists.
  *
- * @see CRM_Queue_Autorunner
+ * @see CRM_Queue_TaskRunner
  */
 class CRM_Queue_Runner {
 
diff --git a/CRM/Queue/TaskRunner.php b/CRM/Queue/TaskRunner.php
new file mode 100644 (file)
index 0000000..002e93b
--- /dev/null
@@ -0,0 +1,132 @@
+<?php
+/*
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved.                        |
+ |                                                                    |
+ | This work is published under the GNU AGPLv3 license with some      |
+ | permitted exceptions and without any warranty. For full license    |
+ | and copyright information, see https://civicrm.org/licensing       |
+ +--------------------------------------------------------------------+
+ */
+
+/**
+ * `CRM_Queue_TaskRunner`  a list tasks from a queue. It is designed to supported background
+ * tasks which run automatically.
+ *
+ * This runner is not appropriate for all queues or workloads, so you might choose or create
+ * a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists.
+ *
+ * @see CRM_Queue_Runner
+ */
+class CRM_Queue_TaskRunner {
+
+  /**
+   * @param \CRM_Queue_Queue $queue
+   * @param $item
+   * @return string
+   *   One of the following:
+   *    - 'ok': Task executed normally. Removed from queue.
+   *    - 'retry': Task encountered an error. Will try again later.
+   *    - 'delete': Task encountered an error. Will not try again later. Removed from queue.
+   *    - 'abort': Task encountered an error. Will not try again later. Stopped the queue.
+   * @throws \API_Exception
+   */
+  public function run(CRM_Queue_Queue $queue, $item): string {
+    $this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.');
+
+    /** @var \CRM_Queue_Task $task */
+    $task = $item->data;
+
+    /** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */
+
+    if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) {
+      \Civi::log()->debug("Skipping exhausted task: " . $task->title);
+      $outcome = $queue->getSpec('error');
+      $exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted');
+    }
+    else {
+      \Civi::log()->debug("Running task: " . $task->title);
+      try {
+        $runResult = $task->run($this->createContext($queue));
+        $outcome = $runResult ? 'ok' : $queue->getSpec('error');
+        $exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false');
+      }
+      catch (\Exception $e) {
+        $outcome = $queue->getSpec('error');
+        $exception = $e;
+      }
+
+      if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) {
+        $outcome = 'retry';
+      }
+    }
+
+    if ($outcome !== 'ok') {
+      \CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception);
+    }
+
+    if ($outcome === 'ok') {
+      $queue->deleteItem($item);
+      return $outcome;
+    }
+
+    $logDetails = [
+      'id' => $queue->getName() . '#' . $item->id,
+      'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']),
+      'outcome' => $outcome,
+      'message' => $exception ? $exception->getMessage() : NULL,
+      'exception' => $exception,
+    ];
+
+    switch ($outcome) {
+      case 'retry':
+        \Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails);
+        $queue->releaseItem($item);
+        break;
+
+      case 'delete':
+        \Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails);
+        $queue->deleteItem($item);
+        break;
+
+      case 'abort':
+        \Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails);
+        $queue->setStatus('aborted');
+        $queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */
+        break;
+
+      default:
+        \Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails);
+        break;
+    }
+
+    return $outcome;
+  }
+
+  /**
+   * @param \CRM_Queue_Queue $queue
+   * return CRM_Queue_TaskContext;
+   */
+  private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext {
+    $taskCtx = new \CRM_Queue_TaskContext();
+    $taskCtx->queue = $queue;
+    $taskCtx->log = \CRM_Core_Error::createDebugLogger();
+    return $taskCtx;
+  }
+
+  private function assertType($object, array $types, string $message) {
+    foreach ($types as $type) {
+      if ($object instanceof  $type) {
+        return;
+      }
+    }
+    throw new \Exception($message);
+  }
+
+  private function isRetriable(\CRM_Queue_Queue $queue, $item): bool {
+    return property_exists($item, 'run_count')
+      && is_numeric($queue->getSpec('retry_limit'))
+      && $queue->getSpec('retry_limit') + 1 > $item->run_count;
+  }
+
+}
index 76c3cd004751765ceb8b868be137d7cc70c6bf67..d2fb744c88439184311e67f6ccecfb3c853134b7 100644 (file)
@@ -2727,6 +2727,62 @@ abstract class CRM_Utils_Hook {
     );
   }
 
+  /**
+   * Fire `hook_civicrm_queueRun_{$runner}`.
+   *
+   * This event only fires if these conditions are met:
+   *
+   * 1. The `$queue` has been persisted in `civicrm_queue`.
+   * 2. The `$queue` has a `runner` property.
+   * 3. The `$queue` has some pending tasks.
+   * 4. The system has a queue-running agent.
+   *
+   * @param \CRM_Queue_Queue $queue
+   * @param array $items
+   *   List of claimed items which we may evaluate.
+   * @param array $outcomes
+   *   The outcomes of each task. One of 'ok', 'retry', 'fail'.
+   *   Keys should match the keys in $items.
+   */
+  public static function queueRun(CRM_Queue_Queue $queue, array $items, &$outcomes) {
+    $runner = $queue->getSpec('runner');
+    if (empty($runner) || !preg_match(';^[A-Za-z0-9_]+$;', $runner)) {
+      throw new \CRM_Core_Exception("Cannot autorun queue: " . $queue->getName());
+    }
+    return self::singleton()->invoke(['queue', 'items', 'outcomes'], $queue, $items,
+      $outcomes, $exception, self::$_nullObject, self::$_nullObject,
+      'civicrm_queueRun_' . $runner
+    );
+  }
+
+  /**
+   * This is called if automatic execution of a queue-task fails.
+   *
+   * The `$outcome` may be modified. For example, you might inspect the $item and $exception -- and then
+   * decide whether to 'retry', 'delete', or 'abort'.
+   *
+   * @param \CRM_Queue_Queue $queue
+   * @param \CRM_Queue_DAO_QueueItem|\stdClass $item
+   *   The enqueued item $item.
+   *   In principle, this is the $item format determined by the queue, which includes `id` and `data`.
+   *   In practice, it is typically an instance of `CRM_Queue_DAO_QueueItem`.
+   * @param string $outcome
+   *   The outcome of the task. Legal values:
+   *   - 'retry': The task encountered a problem, and it should be retried.
+   *   - 'delete': The task encountered a non-recoverable problem, and it should be deleted.
+   *   - 'abort': The task encountered a non-recoverable problem, and the queue should be stopped.
+   *   - 'ok': The task finished normally. (You won't generally see this, but it could be useful in some customizations.)
+   *   The default outcome for task-errors is determined by the queue settings (`civicrm_queue.error`).
+   * @param \Throwable|null $exception
+   *   If the task failed, this is the cause of the failure.
+   */
+  public static function queueTaskError(CRM_Queue_Queue $queue, $item, &$outcome, ?Throwable $exception) {
+    return self::singleton()->invoke(['job', 'params'], $queue, $item,
+      $outcome, $exception, self::$_nullObject, self::$_nullObject,
+      'civicrm_queueTaskError'
+    );
+  }
+
   /**
    * This hook is called before a scheduled job is executed
    *