Commit | Line | Data |
---|---|---|
3eaa853f TO |
1 | <?php |
2 | /* | |
3 | +--------------------------------------------------------------------+ | |
4 | | Copyright CiviCRM LLC. All rights reserved. | | |
5 | | | | |
6 | | This work is published under the GNU AGPLv3 license with some | | |
7 | | permitted exceptions and without any warranty. For full license | | |
8 | | and copyright information, see https://civicrm.org/licensing | | |
9 | +--------------------------------------------------------------------+ | |
10 | */ | |
11 | ||
12 | /** | |
13 | * `CRM_Queue_TaskRunner` a list tasks from a queue. It is designed to supported background | |
14 | * tasks which run automatically. | |
15 | * | |
16 | * This runner is not appropriate for all queues or workloads, so you might choose or create | |
17 | * a different runner. For example, `CRM_Queue_Runner` is geared toward background task lists. | |
18 | * | |
19 | * @see CRM_Queue_Runner | |
20 | */ | |
21 | class CRM_Queue_TaskRunner { | |
22 | ||
23 | /** | |
24 | * @param \CRM_Queue_Queue $queue | |
25 | * @param $item | |
26 | * @return string | |
27 | * One of the following: | |
28 | * - 'ok': Task executed normally. Removed from queue. | |
29 | * - 'retry': Task encountered an error. Will try again later. | |
30 | * - 'delete': Task encountered an error. Will not try again later. Removed from queue. | |
31 | * - 'abort': Task encountered an error. Will not try again later. Stopped the queue. | |
32 | * @throws \API_Exception | |
33 | */ | |
34 | public function run(CRM_Queue_Queue $queue, $item): string { | |
35 | $this->assertType($item->data, ['CRM_Queue_Task'], 'Cannot run. Invalid task given.'); | |
36 | ||
37 | /** @var \CRM_Queue_Task $task */ | |
38 | $task = $item->data; | |
39 | ||
40 | /** @var string $outcome One of 'ok', 'retry', 'delete', 'abort' */ | |
41 | ||
42 | if (is_numeric($queue->getSpec('retry_limit')) && $item->run_count > 1 + $queue->getSpec('retry_limit')) { | |
43 | \Civi::log()->debug("Skipping exhausted task: " . $task->title); | |
44 | $outcome = $queue->getSpec('error'); | |
45 | $exception = new \API_Exception(sprintf('Skipping exhausted task after %d tries: %s', $item->run_count, print_r($task, 1)), 'queue_retry_exhausted'); | |
46 | } | |
47 | else { | |
48 | \Civi::log()->debug("Running task: " . $task->title); | |
49 | try { | |
50 | $runResult = $task->run($this->createContext($queue)); | |
51 | $outcome = $runResult ? 'ok' : $queue->getSpec('error'); | |
52 | $exception = ($outcome === 'ok') ? NULL : new \API_Exception('Queue task returned false', 'queue_false'); | |
53 | } | |
54 | catch (\Exception $e) { | |
55 | $outcome = $queue->getSpec('error'); | |
56 | $exception = $e; | |
57 | } | |
58 | ||
59 | if (in_array($outcome, ['delete', 'abort']) && $this->isRetriable($queue, $item)) { | |
60 | $outcome = 'retry'; | |
61 | } | |
62 | } | |
63 | ||
64 | if ($outcome !== 'ok') { | |
65 | \CRM_Utils_Hook::queueTaskError($queue, $item, $outcome, $exception); | |
66 | } | |
67 | ||
68 | if ($outcome === 'ok') { | |
69 | $queue->deleteItem($item); | |
70 | return $outcome; | |
71 | } | |
72 | ||
73 | $logDetails = [ | |
74 | 'id' => $queue->getName() . '#' . $item->id, | |
75 | 'task' => CRM_Utils_Array::subset((array) $task, ['title', 'callback', 'arguments']), | |
76 | 'outcome' => $outcome, | |
77 | 'message' => $exception ? $exception->getMessage() : NULL, | |
78 | 'exception' => $exception, | |
79 | ]; | |
80 | ||
81 | switch ($outcome) { | |
82 | case 'retry': | |
83 | \Civi::log('queue')->error('Task "{id}" failed and should be retried. {message}', $logDetails); | |
84 | $queue->releaseItem($item); | |
85 | break; | |
86 | ||
87 | case 'delete': | |
88 | \Civi::log('queue')->error('Task "{id}" failed and will be deleted. {message}', $logDetails); | |
89 | $queue->deleteItem($item); | |
90 | break; | |
91 | ||
92 | case 'abort': | |
93 | \Civi::log('queue')->error('Task "{id}" failed. Queue processing aborted. {message}', $logDetails); | |
94 | $queue->setStatus('aborted'); | |
95 | $queue->releaseItem($item); /* Sysadmin might inspect, fix, and then resume. Item should be accessible. */ | |
96 | break; | |
97 | ||
98 | default: | |
99 | \Civi::log('queue')->critical('Unrecognized outcome for task "{id}": {outcome}', $logDetails); | |
100 | break; | |
101 | } | |
102 | ||
103 | return $outcome; | |
104 | } | |
105 | ||
106 | /** | |
107 | * @param \CRM_Queue_Queue $queue | |
108 | * return CRM_Queue_TaskContext; | |
109 | */ | |
110 | private function createContext(\CRM_Queue_Queue $queue): \CRM_Queue_TaskContext { | |
111 | $taskCtx = new \CRM_Queue_TaskContext(); | |
112 | $taskCtx->queue = $queue; | |
113 | $taskCtx->log = \CRM_Core_Error::createDebugLogger(); | |
114 | return $taskCtx; | |
115 | } | |
116 | ||
117 | private function assertType($object, array $types, string $message) { | |
118 | foreach ($types as $type) { | |
119 | if ($object instanceof $type) { | |
120 | return; | |
121 | } | |
122 | } | |
123 | throw new \Exception($message); | |
124 | } | |
125 | ||
126 | private function isRetriable(\CRM_Queue_Queue $queue, $item): bool { | |
127 | return property_exists($item, 'run_count') | |
128 | && is_numeric($queue->getSpec('retry_limit')) | |
129 | && $queue->getSpec('retry_limit') + 1 > $item->run_count; | |
130 | } | |
131 | ||
132 | } |