338c46eac253919d62c41741a960cd0327985394
[civicrm-core.git] / CRM / Queue / Service.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
17 *
18 * ```
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
22 * ));
23 * $queue->createItem($myData);
24 *
25 * // Some time later...
26 * $item = $queue->claimItem();
27 * if ($item) {
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
30 * } else {
31 * $queue->releaseItem($item);
32 * }
33 * }
34 * ```
35 */
36 class CRM_Queue_Service {
37
38 protected static $_singleton;
39
40 /**
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
42 *
43 * @var string[]
44 * @readonly
45 */
46 private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
47
48 /**
49 * FIXME: Singleton pattern should be removed when dependency-injection
50 * becomes available.
51 *
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
54 *
55 * @return \CRM_Queue_Service
56 */
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew || !self::$_singleton) {
59 self::$_singleton = new CRM_Queue_Service();
60 }
61 return self::$_singleton;
62 }
63
64 /**
65 * Queues.
66 *
67 * Format is (string $queueName => CRM_Queue_Queue).
68 *
69 * @var array
70 */
71 public $queues;
72
73 /**
74 * Class constructor.
75 */
76 public function __construct() {
77 $this->queues = [];
78 }
79
80 /**
81 * Create a queue. If one already exists, then it will be reused.
82 *
83 * @param array $queueSpec
84 * Array with keys:
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - status: string, required for runnable-queues; specify whether the runner is currently active
94 * ex: 'active', 'draft', 'completed'
95 * - error: string, required for runnable-queues; specify what to do with unhandled errors
96 * ex: "drop" or "abort"
97 * - batch_limit: int, Maximum number of items in a batch.
98 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
99 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
100 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
101 * @return CRM_Queue_Queue
102 */
103 public function create($queueSpec) {
104 if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) {
105 return $this->queues[$queueSpec['name']];
106 }
107
108 if (!empty($queueSpec['is_persistent'])) {
109 $queueSpec = $this->findCreateQueueSpec($queueSpec);
110 }
111 $this->validateQueueSpec($queueSpec);
112 $queue = $this->instantiateQueueObject($queueSpec);
113 $exists = $queue->existsQueue();
114 if (!$exists) {
115 $queue->createQueue();
116 }
117 elseif (@$queueSpec['reset']) {
118 $queue->deleteQueue();
119 $queue->createQueue();
120 }
121 else {
122 $queue->loadQueue();
123 }
124 $this->queues[$queueSpec['name']] = $queue;
125 return $queue;
126 }
127
128 /**
129 * Find/create the queue-spec. Specifically:
130 *
131 * - If there is a stored queue, use its spec.
132 * - If there is no stored queue, and if we have enough information, then create queue.
133 *
134 * @param array $queueSpec
135 * @return array
136 * Updated queueSpec.
137 * @throws \CRM_Core_Exception
138 */
139 protected function findCreateQueueSpec(array $queueSpec): array {
140 $loaded = $this->findQueueSpec($queueSpec);
141 if ($loaded !== NULL) {
142 return $loaded;
143 }
144
145 if (isset($queueSpec['template'])) {
146 $base = $this->findQueueSpec(['name' => $queueSpec['template']]);
147 $reset = ['is_template' => 0];
148 $queueSpec = array_merge($base, $reset, $queueSpec);
149 }
150
151 $this->validateQueueSpec($queueSpec);
152
153 $dao = new CRM_Queue_DAO_Queue();
154 $dao->name = $queueSpec['name'];
155 $dao->copyValues($queueSpec);
156 $dao->insert();
157
158 return $this->findQueueSpec($queueSpec);
159 }
160
161 protected function findQueueSpec(array $queueSpec): ?array {
162 $dao = new CRM_Queue_DAO_Queue();
163 $dao->name = $queueSpec['name'];
164 if ($dao->find(TRUE)) {
165 return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
166 }
167 else {
168 return NULL;
169 }
170 }
171
172 /**
173 * Look up an existing queue.
174 *
175 * @param array $queueSpec
176 * Array with keys:
177 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
178 * - name: string, required, e.g. "upgrade-tasks"
179 * - (additional keys depending on the queue provider).
180 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
181 *
182 * @return CRM_Queue_Queue
183 */
184 public function load($queueSpec) {
185 if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
186 return $this->queues[$queueSpec['name']];
187 }
188 if (!empty($queueSpec['is_persistent'])) {
189 $queueSpec = $this->findCreateQueueSpec($queueSpec);
190 }
191 $queue = $this->instantiateQueueObject($queueSpec);
192 $queue->loadQueue();
193 $this->queues[$queueSpec['name']] = $queue;
194 return $queue;
195 }
196
197 /**
198 * Convert a queue "type" name to a class name.
199 *
200 * @param string $type
201 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
202 * @return string
203 * Class-name
204 */
205 protected function getQueueClass($type) {
206 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
207 $className = 'CRM_Queue_Queue_' . $type;
208 // FIXME: when used with class-autoloader, this may be unnecessary
209 if (!class_exists($className)) {
210 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
211 require_once $classFile;
212 }
213 return $className;
214 }
215
216 /**
217 * @param array $queueSpec
218 * See create().
219 *
220 * @return CRM_Queue_Queue
221 */
222 protected function instantiateQueueObject($queueSpec) {
223 // note: you should probably never do anything else here
224 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
225 return $class->newInstance($queueSpec);
226 }
227
228 /**
229 * Assert that the queueSpec is well-formed.
230 *
231 * @param array $queueSpec
232 * @throws \CRM_Core_Exception
233 */
234 public function validateQueueSpec(array $queueSpec): void {
235 $throw = function(string $message, ...$args) use ($queueSpec) {
236 $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
237 throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
238 };
239
240 if (empty($queueSpec['type'])) {
241 $throw('Missing field "type".');
242 }
243
244 // The rest of the validations only apply to persistent, runnable queues.
245 if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) {
246 return;
247 }
248
249 $statuses = CRM_Queue_BAO_Queue::getStatuses();
250 $status = $queueSpec['status'] ?? NULL;
251 if (!isset($statuses[$status])) {
252 $throw('Invalid queue status "%s".', $status);
253 }
254
255 $errorModes = CRM_Queue_BAO_Queue::getErrorModes();
256 $errorMode = $queueSpec['error'] ?? NULL;
257 if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
258 $throw('Invalid error mode "%s".', $errorMode);
259 }
260 }
261
262 }