| 1 | <?php |
| 2 | /* |
| 3 | +--------------------------------------------------------------------+ |
| 4 | | Copyright CiviCRM LLC. All rights reserved. | |
| 5 | | | |
| 6 | | This work is published under the GNU AGPLv3 license with some | |
| 7 | | permitted exceptions and without any warranty. For full license | |
| 8 | | and copyright information, see https://civicrm.org/licensing | |
| 9 | +--------------------------------------------------------------------+ |
| 10 | */ |
| 11 | |
| 12 | /** |
| 13 | * `CRM_Queue_TaskRunner` a list tasks from a queue. It is designed to supported background |
| 14 | * tasks which run automatically. |
| 15 | * |
| 16 | * This runner is not appropriate for all queues or workloads, so you might choose or create |
| 17 | * a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists. |
| 18 | * |
| 19 | * @see CRM_Queue_Runner |
| 20 | */ |
| 21 | class CRM_Queue_TaskRunner { |
| 22 | |
| 23 | /** |
| 24 | * @param \CRM_Queue_Queue $queue |
| 25 | * @param $item |
| 26 | * @return string |
| 27 | * One of the following: |
| 28 | * - 'ok': Task executed normally. Removed from queue. |
| 29 | * - 'retry': Task encountered an error. Will try again later. |
| 30 | * - 'delete': Task encountered an error. Will not try again later. Removed from queue. |
| 31 | * - 'abort': Task encountered an error. Will not try again later. Stopped the queue. |
| 32 | * @throws \API_Exception |
| 33 | */ |
| 34 | public function run(CRM_Queue_Queue $queue, $item): string { |
| 35 | $this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.'); |
| 36 | |
| 37 | /** @var \CRM_Queue_Task $task */ |
| 38 | $task = $item->data; |
| 39 | |
| 40 | /** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */ |
| 41 | |
| 42 | if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) { |
| 43 | \Civi::log()->debug("Skipping exhausted task: " . $task->title); |
| 44 | $outcome = $queue->getSpec('error'); |
| 45 | $exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted'); |
| 46 | } |
| 47 | else { |
| 48 | \Civi::log()->debug("Running task: " . $task->title); |
| 49 | try { |
| 50 | $runResult = $task->run($this->createContext($queue)); |
| 51 | $outcome = $runResult ? 'ok' : $queue->getSpec('error'); |
| 52 | $exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false'); |
| 53 | } |
| 54 | catch (\Exception $e) { |
| 55 | $outcome = $queue->getSpec('error'); |
| 56 | $exception = $e; |
| 57 | } |
| 58 | |
| 59 | if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) { |
| 60 | $outcome = 'retry'; |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | if ($outcome !== 'ok') { |
| 65 | \CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception); |
| 66 | } |
| 67 | |
| 68 | if ($outcome === 'ok') { |
| 69 | $queue->deleteItem($item); |
| 70 | return $outcome; |
| 71 | } |
| 72 | |
| 73 | $logDetails = [ |
| 74 | 'id' => $queue->getName() . '#' . $item->id, |
| 75 | 'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']), |
| 76 | 'outcome' => $outcome, |
| 77 | 'message' => $exception ? $exception->getMessage() : NULL, |
| 78 | 'exception' => $exception, |
| 79 | ]; |
| 80 | |
| 81 | switch ($outcome) { |
| 82 | case 'retry': |
| 83 | \Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails); |
| 84 | $queue->releaseItem($item); |
| 85 | break; |
| 86 | |
| 87 | case 'delete': |
| 88 | \Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails); |
| 89 | $queue->deleteItem($item); |
| 90 | break; |
| 91 | |
| 92 | case 'abort': |
| 93 | \Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails); |
| 94 | $queue->setStatus('aborted'); |
| 95 | $queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */ |
| 96 | break; |
| 97 | |
| 98 | default: |
| 99 | \Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails); |
| 100 | break; |
| 101 | } |
| 102 | |
| 103 | return $outcome; |
| 104 | } |
| 105 | |
| 106 | /** |
| 107 | * @param \CRM_Queue_Queue $queue |
| 108 | * return CRM_Queue_TaskContext; |
| 109 | */ |
| 110 | private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext { |
| 111 | $taskCtx = new \CRM_Queue_TaskContext(); |
| 112 | $taskCtx->queue = $queue; |
| 113 | $taskCtx->log = \CRM_Core_Error::createDebugLogger(); |
| 114 | return $taskCtx; |
| 115 | } |
| 116 | |
| 117 | private function assertType($object, array $types, string $message) { |
| 118 | foreach ($types as $type) { |
| 119 | if ($object instanceof $type) { |
| 120 | return; |
| 121 | } |
| 122 | } |
| 123 | throw new \Exception($message); |
| 124 | } |
| 125 | |
| 126 | private function isRetriable(\CRM_Queue_Queue $queue, $item): bool { |
| 127 | return property_exists($item, 'run_count') |
| 128 | && is_numeric($queue->getSpec('retry_limit')) |
| 129 | && $queue->getSpec('retry_limit') + 1 > $item->run_count; |
| 130 | } |
| 131 | |
| 132 | } |