Merge pull request #23283 from eileenmcnaughton/import_saved_map
[civicrm-core.git] / CRM / Queue / Service.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
17 *
18 * ```
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
22 * ));
23 * $queue->createItem($myData);
24 *
25 * // Some time later...
26 * $item = $queue->claimItem();
27 * if ($item) {
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
30 * } else {
31 * $queue->releaseItem($item);
32 * }
33 * }
34 * ```
35 */
36 class CRM_Queue_Service {
37
38 protected static $_singleton;
39
40 /**
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
42 *
43 * @var string[]
44 * @readonly
45 */
46 private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
47
48 /**
49 * FIXME: Singleton pattern should be removed when dependency-injection
50 * becomes available.
51 *
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
54 *
55 * @return \CRM_Queue_Service
56 */
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew || !self::$_singleton) {
59 self::$_singleton = new CRM_Queue_Service();
60 }
61 return self::$_singleton;
62 }
63
64 /**
65 * Queues.
66 *
67 * Format is (string $queueName => CRM_Queue_Queue).
68 *
69 * @var array
70 */
71 public $queues;
72
73 /**
74 * Class constructor.
75 */
76 public function __construct() {
77 $this->queues = [];
78 }
79
80 /**
81 * Create a queue. If one already exists, then it will be reused.
82 *
83 * @param array $queueSpec
84 * Array with keys:
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - status: string, required for runnable-queues; specify whether the runner is currently active
94 * ex: 'active', 'draft', 'completed'
95 * - error: string, required for runnable-queues; specify what to do with unhandled errors
96 * ex: "drop" or "abort"
97 * - batch_limit: int, Maximum number of items in a batch.
98 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
99 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
100 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
101 * @return CRM_Queue_Queue
102 */
103 public function create($queueSpec) {
104 if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) {
105 return $this->queues[$queueSpec['name']];
106 }
107
108 if (!empty($queueSpec['is_persistent'])) {
109 $queueSpec = $this->findCreateQueueSpec($queueSpec);
110 }
111 $this->validateQueueSpec($queueSpec);
112 $queue = $this->instantiateQueueObject($queueSpec);
113 $exists = $queue->existsQueue();
114 if (!$exists) {
115 $queue->createQueue();
116 }
117 elseif (@$queueSpec['reset']) {
118 $queue->deleteQueue();
119 $queue->createQueue();
120 }
121 else {
122 $queue->loadQueue();
123 }
124 $this->queues[$queueSpec['name']] = $queue;
125 return $queue;
126 }
127
128 /**
129 * Find/create the queue-spec. Specifically:
130 *
131 * - If there is a stored queue, use its spec.
132 * - If there is no stored queue, and if we have enough information, then create queue.
133 *
134 * @param array $queueSpec
135 * @return array
136 * Updated queueSpec.
137 * @throws \CRM_Core_Exception
138 */
139 protected function findCreateQueueSpec(array $queueSpec): array {
140 $loaded = $this->findQueueSpec($queueSpec);
141 if ($loaded !== NULL) {
142 return $loaded;
143 }
144
145 $this->validateQueueSpec($queueSpec);
146
147 $dao = new CRM_Queue_DAO_Queue();
148 $dao->name = $queueSpec['name'];
149 $dao->copyValues($queueSpec);
150 $dao->insert();
151
152 return $this->findQueueSpec($queueSpec);
153 }
154
155 protected function findQueueSpec(array $queueSpec): ?array {
156 $dao = new CRM_Queue_DAO_Queue();
157 $dao->name = $queueSpec['name'];
158 if ($dao->find(TRUE)) {
159 return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
160 }
161 else {
162 return NULL;
163 }
164 }
165
166 /**
167 * Look up an existing queue.
168 *
169 * @param array $queueSpec
170 * Array with keys:
171 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
172 * - name: string, required, e.g. "upgrade-tasks"
173 * - (additional keys depending on the queue provider).
174 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
175 *
176 * @return CRM_Queue_Queue
177 */
178 public function load($queueSpec) {
179 if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
180 return $this->queues[$queueSpec['name']];
181 }
182 if (!empty($queueSpec['is_persistent'])) {
183 $queueSpec = $this->findCreateQueueSpec($queueSpec);
184 }
185 $queue = $this->instantiateQueueObject($queueSpec);
186 $queue->loadQueue();
187 $this->queues[$queueSpec['name']] = $queue;
188 return $queue;
189 }
190
191 /**
192 * Convert a queue "type" name to a class name.
193 *
194 * @param string $type
195 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
196 * @return string
197 * Class-name
198 */
199 protected function getQueueClass($type) {
200 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
201 $className = 'CRM_Queue_Queue_' . $type;
202 // FIXME: when used with class-autoloader, this may be unnecessary
203 if (!class_exists($className)) {
204 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
205 require_once $classFile;
206 }
207 return $className;
208 }
209
210 /**
211 * @param array $queueSpec
212 * See create().
213 *
214 * @return CRM_Queue_Queue
215 */
216 protected function instantiateQueueObject($queueSpec) {
217 // note: you should probably never do anything else here
218 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
219 return $class->newInstance($queueSpec);
220 }
221
222 /**
223 * Assert that the queueSpec is well-formed.
224 *
225 * @param array $queueSpec
226 * @throws \CRM_Core_Exception
227 */
228 public function validateQueueSpec(array $queueSpec): void {
229 $throw = function(string $message, ...$args) use ($queueSpec) {
230 $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
231 throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
232 };
233
234 if (empty($queueSpec['type'])) {
235 $throw('Missing field "type".');
236 }
237
238 // The rest of the validations only apply to persistent, runnable queues.
239 if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) {
240 return;
241 }
242
243 $statuses = CRM_Queue_BAO_Queue::getStatuses();
244 $status = $queueSpec['status'] ?? NULL;
245 if (!isset($statuses[$status])) {
246 $throw('Invalid queue status "%s".', $status);
247 }
248
249 $errorModes = CRM_Queue_BAO_Queue::getErrorModes();
250 $errorMode = $queueSpec['error'] ?? NULL;
251 if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
252 $throw('Invalid error mode "%s".', $errorMode);
253 }
254 }
255
256 }