Merge pull request #23645 from jmcclelland/profile-require-tag
[civicrm-core.git] / CRM / Queue / Service.php
CommitLineData
6a488035
TO
1<?php
2/*
3 +--------------------------------------------------------------------+
bc77d7c0 4 | Copyright CiviCRM LLC. All rights reserved. |
6a488035 5 | |
bc77d7c0
TO
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
6a488035 9 +--------------------------------------------------------------------+
d25dd0ee 10 */
6a488035
TO
11
12/**
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
17 *
0b882a86 18 * ```
6a488035
TO
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
22 * ));
23 * $queue->createItem($myData);
24 *
25 * // Some time later...
26 * $item = $queue->claimItem();
5e74f823
TO
27 * if ($item) {
28 * if (my_process($item->data)) {
16e38e82 29 * $queue->deleteItem($item);
5e74f823 30 * } else {
16e38e82 31 * $queue->releaseItem($item);
5e74f823 32 * }
6a488035 33 * }
0b882a86 34 * ```
6a488035
TO
35 */
36class CRM_Queue_Service {
37
4523a2f5 38 protected static $_singleton;
6a488035 39
cf97aeef
TO
40 /**
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
42 *
43 * @var string[]
44 * @readonly
45 */
86ffee74 46 private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
cf97aeef 47
6a488035
TO
48 /**
49 * FIXME: Singleton pattern should be removed when dependency-injection
50 * becomes available.
51 *
4523a2f5
TO
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
77b97be7
EM
54 *
55 * @return \CRM_Queue_Service
6a488035 56 */
4523a2f5 57 public static function &singleton($forceNew = FALSE) {
6a488035
TO
58 if ($forceNew || !self::$_singleton) {
59 self::$_singleton = new CRM_Queue_Service();
60 }
61 return self::$_singleton;
62 }
63
64 /**
041ecc95 65 * Queues.
66 *
67 * Format is (string $queueName => CRM_Queue_Queue).
68 *
69 * @var array
6a488035 70 */
4523a2f5 71 public $queues;
e0ef6999
EM
72
73 /**
041ecc95 74 * Class constructor.
e0ef6999 75 */
4523a2f5 76 public function __construct() {
be2fb01f 77 $this->queues = [];
6a488035
TO
78 }
79
80 /**
4523a2f5
TO
81 * Create a queue. If one already exists, then it will be reused.
82 *
83 * @param array $queueSpec
84 * Array with keys:
c218eb56 85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
6a488035 86 * - name: string, required, e.g. "upgrade-tasks"
4523a2f5
TO
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
26c70956 90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
cf97aeef
TO
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
86ffee74
TO
93 * - status: string, required for runnable-queues; specify whether the runner is currently active
94 * ex: 'active', 'draft', 'completed'
95 * - error: string, required for runnable-queues; specify what to do with unhandled errors
96 * ex: "drop" or "abort"
c7920734 97 * - batch_limit: int, Maximum number of items in a batch.
cf97aeef 98 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
f0297973 99 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
cf97aeef 100 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
6a488035
TO
101 * @return CRM_Queue_Queue
102 */
4523a2f5 103 public function create($queueSpec) {
689e75e9 104 if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) {
6a488035
TO
105 return $this->queues[$queueSpec['name']];
106 }
107
26c70956
TO
108 if (!empty($queueSpec['is_persistent'])) {
109 $queueSpec = $this->findCreateQueueSpec($queueSpec);
110 }
86ffee74 111 $this->validateQueueSpec($queueSpec);
6a488035
TO
112 $queue = $this->instantiateQueueObject($queueSpec);
113 $exists = $queue->existsQueue();
114 if (!$exists) {
115 $queue->createQueue();
116 }
117 elseif (@$queueSpec['reset']) {
118 $queue->deleteQueue();
119 $queue->createQueue();
120 }
121 else {
122 $queue->loadQueue();
123 }
124 $this->queues[$queueSpec['name']] = $queue;
125 return $queue;
126 }
127
26c70956
TO
128 /**
129 * Find/create the queue-spec. Specifically:
130 *
131 * - If there is a stored queue, use its spec.
132 * - If there is no stored queue, and if we have enough information, then create queue.
133 *
134 * @param array $queueSpec
135 * @return array
136 * Updated queueSpec.
137 * @throws \CRM_Core_Exception
138 */
139 protected function findCreateQueueSpec(array $queueSpec): array {
cf97aeef
TO
140 $loaded = $this->findQueueSpec($queueSpec);
141 if ($loaded !== NULL) {
142 return $loaded;
26c70956
TO
143 }
144
86ffee74 145 $this->validateQueueSpec($queueSpec);
cf97aeef
TO
146
147 $dao = new CRM_Queue_DAO_Queue();
148 $dao->name = $queueSpec['name'];
26c70956
TO
149 $dao->copyValues($queueSpec);
150 $dao->insert();
151
cf97aeef
TO
152 return $this->findQueueSpec($queueSpec);
153 }
154
155 protected function findQueueSpec(array $queueSpec): ?array {
156 $dao = new CRM_Queue_DAO_Queue();
157 $dao->name = $queueSpec['name'];
158 if ($dao->find(TRUE)) {
159 return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
160 }
161 else {
162 return NULL;
163 }
26c70956
TO
164 }
165
6a488035 166 /**
4523a2f5
TO
167 * Look up an existing queue.
168 *
169 * @param array $queueSpec
170 * Array with keys:
c218eb56 171 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
6a488035 172 * - name: string, required, e.g. "upgrade-tasks"
4523a2f5 173 * - (additional keys depending on the queue provider).
26c70956 174 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
6a488035
TO
175 *
176 * @return CRM_Queue_Queue
177 */
4523a2f5 178 public function load($queueSpec) {
9f603ed0 179 if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
6a488035
TO
180 return $this->queues[$queueSpec['name']];
181 }
26c70956
TO
182 if (!empty($queueSpec['is_persistent'])) {
183 $queueSpec = $this->findCreateQueueSpec($queueSpec);
184 }
6a488035
TO
185 $queue = $this->instantiateQueueObject($queueSpec);
186 $queue->loadQueue();
187 $this->queues[$queueSpec['name']] = $queue;
188 return $queue;
189 }
190
191 /**
fe482240 192 * Convert a queue "type" name to a class name.
6a488035 193 *
4523a2f5 194 * @param string $type
c218eb56 195 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
4523a2f5
TO
196 * @return string
197 * Class-name
6a488035
TO
198 */
199 protected function getQueueClass($type) {
200 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
201 $className = 'CRM_Queue_Queue_' . $type;
202 // FIXME: when used with class-autoloader, this may be unnecessary
203 if (!class_exists($className)) {
204 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
205 require_once $classFile;
206 }
207 return $className;
208 }
209
210 /**
4523a2f5
TO
211 * @param array $queueSpec
212 * See create().
6a488035
TO
213 *
214 * @return CRM_Queue_Queue
215 */
216 protected function instantiateQueueObject($queueSpec) {
217 // note: you should probably never do anything else here
218 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
219 return $class->newInstance($queueSpec);
220 }
96025800 221
86ffee74
TO
222 /**
223 * Assert that the queueSpec is well-formed.
224 *
225 * @param array $queueSpec
226 * @throws \CRM_Core_Exception
227 */
228 public function validateQueueSpec(array $queueSpec): void {
229 $throw = function(string $message, ...$args) use ($queueSpec) {
230 $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
231 throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
232 };
233
234 if (empty($queueSpec['type'])) {
235 $throw('Missing field "type".');
236 }
237
238 // The rest of the validations only apply to persistent, runnable queues.
239 if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) {
240 return;
241 }
242
243 $statuses = CRM_Queue_BAO_Queue::getStatuses();
244 $status = $queueSpec['status'] ?? NULL;
245 if (!isset($statuses[$status])) {
246 $throw('Invalid queue status "%s".', $status);
247 }
248
249 $errorModes = CRM_Queue_BAO_Queue::getErrorModes();
250 $errorMode = $queueSpec['error'] ?? NULL;
251 if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
252 $throw('Invalid error mode "%s".', $errorMode);
253 }
254 }
255
6a488035 256}