Commit | Line | Data |
---|---|---|
6a488035 TO |
1 | <?php |
2 | /* | |
3 | +--------------------------------------------------------------------+ | |
bc77d7c0 | 4 | | Copyright CiviCRM LLC. All rights reserved. | |
6a488035 | 5 | | | |
bc77d7c0 TO |
6 | | This work is published under the GNU AGPLv3 license with some | |
7 | | permitted exceptions and without any warranty. For full license | | |
8 | | and copyright information, see https://civicrm.org/licensing | | |
6a488035 | 9 | +--------------------------------------------------------------------+ |
d25dd0ee | 10 | */ |
6a488035 TO |
11 | |
12 | /** | |
13 | * The queue service provides an interface for creating or locating | |
14 | * queues. Note that this approach hides the details of data-storage: | |
15 | * different queue-providers may store the queue content in different | |
16 | * ways (in memory, in SQL, or in an external service). | |
17 | * | |
0b882a86 | 18 | * ``` |
6a488035 TO |
19 | * $queue = CRM_Queue_Service::singleton()->create(array( |
20 | * 'type' => 'interactive', | |
21 | * 'name' => 'upgrade-tasks', | |
22 | * )); | |
23 | * $queue->createItem($myData); | |
24 | * | |
25 | * // Some time later... | |
26 | * $item = $queue->claimItem(); | |
5e74f823 TO |
27 | * if ($item) { |
28 | * if (my_process($item->data)) { | |
16e38e82 | 29 | * $queue->deleteItem($item); |
5e74f823 | 30 | * } else { |
16e38e82 | 31 | * $queue->releaseItem($item); |
5e74f823 | 32 | * } |
6a488035 | 33 | * } |
0b882a86 | 34 | * ``` |
6a488035 TO |
35 | */ |
36 | class CRM_Queue_Service { | |
37 | ||
4523a2f5 | 38 | protected static $_singleton; |
6a488035 | 39 | |
cf97aeef TO |
40 | /** |
41 | * List of fields which are shared by `$queueSpec` and `civicrm_queue`. | |
42 | * | |
43 | * @var string[] | |
44 | * @readonly | |
45 | */ | |
86ffee74 | 46 | private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval']; |
cf97aeef | 47 | |
6a488035 TO |
48 | /** |
49 | * FIXME: Singleton pattern should be removed when dependency-injection | |
50 | * becomes available. | |
51 | * | |
4523a2f5 TO |
52 | * @param bool $forceNew |
53 | * TRUE if a new instance must be created. | |
77b97be7 EM |
54 | * |
55 | * @return \CRM_Queue_Service | |
6a488035 | 56 | */ |
4523a2f5 | 57 | public static function &singleton($forceNew = FALSE) { |
6a488035 TO |
58 | if ($forceNew || !self::$_singleton) { |
59 | self::$_singleton = new CRM_Queue_Service(); | |
60 | } | |
61 | return self::$_singleton; | |
62 | } | |
63 | ||
64 | /** | |
041ecc95 | 65 | * Queues. |
66 | * | |
67 | * Format is (string $queueName => CRM_Queue_Queue). | |
68 | * | |
69 | * @var array | |
6a488035 | 70 | */ |
4523a2f5 | 71 | public $queues; |
e0ef6999 EM |
72 | |
73 | /** | |
041ecc95 | 74 | * Class constructor. |
e0ef6999 | 75 | */ |
4523a2f5 | 76 | public function __construct() { |
be2fb01f | 77 | $this->queues = []; |
6a488035 TO |
78 | } |
79 | ||
80 | /** | |
4523a2f5 TO |
81 | * Create a queue. If one already exists, then it will be reused. |
82 | * | |
83 | * @param array $queueSpec | |
84 | * Array with keys: | |
c218eb56 | 85 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
6a488035 | 86 | * - name: string, required, e.g. "upgrade-tasks" |
4523a2f5 TO |
87 | * - reset: bool, optional; if a queue is found, then it should be |
88 | * flushed; default to TRUE | |
89 | * - (additional keys depending on the queue provider). | |
26c70956 | 90 | * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list |
cf97aeef TO |
91 | * - runner: string, optional; if given, then items in this queue can run |
92 | * automatically via `hook_civicrm_queueRun_{$runner}` | |
86ffee74 TO |
93 | * - status: string, required for runnable-queues; specify whether the runner is currently active |
94 | * ex: 'active', 'draft', 'completed' | |
95 | * - error: string, required for runnable-queues; specify what to do with unhandled errors | |
96 | * ex: "drop" or "abort" | |
c7920734 | 97 | * - batch_limit: int, Maximum number of items in a batch. |
cf97aeef | 98 | * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds) |
f0297973 | 99 | * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable. |
cf97aeef | 100 | * - retry_interval: int, Number of seconds to wait before retrying a failed execution. |
6a488035 TO |
101 | * @return CRM_Queue_Queue |
102 | */ | |
4523a2f5 | 103 | public function create($queueSpec) { |
689e75e9 | 104 | if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) { |
6a488035 TO |
105 | return $this->queues[$queueSpec['name']]; |
106 | } | |
107 | ||
26c70956 TO |
108 | if (!empty($queueSpec['is_persistent'])) { |
109 | $queueSpec = $this->findCreateQueueSpec($queueSpec); | |
110 | } | |
86ffee74 | 111 | $this->validateQueueSpec($queueSpec); |
6a488035 TO |
112 | $queue = $this->instantiateQueueObject($queueSpec); |
113 | $exists = $queue->existsQueue(); | |
114 | if (!$exists) { | |
115 | $queue->createQueue(); | |
116 | } | |
117 | elseif (@$queueSpec['reset']) { | |
118 | $queue->deleteQueue(); | |
119 | $queue->createQueue(); | |
120 | } | |
121 | else { | |
122 | $queue->loadQueue(); | |
123 | } | |
124 | $this->queues[$queueSpec['name']] = $queue; | |
125 | return $queue; | |
126 | } | |
127 | ||
26c70956 TO |
128 | /** |
129 | * Find/create the queue-spec. Specifically: | |
130 | * | |
131 | * - If there is a stored queue, use its spec. | |
132 | * - If there is no stored queue, and if we have enough information, then create queue. | |
133 | * | |
134 | * @param array $queueSpec | |
135 | * @return array | |
136 | * Updated queueSpec. | |
137 | * @throws \CRM_Core_Exception | |
138 | */ | |
139 | protected function findCreateQueueSpec(array $queueSpec): array { | |
cf97aeef TO |
140 | $loaded = $this->findQueueSpec($queueSpec); |
141 | if ($loaded !== NULL) { | |
142 | return $loaded; | |
26c70956 TO |
143 | } |
144 | ||
86ffee74 | 145 | $this->validateQueueSpec($queueSpec); |
cf97aeef TO |
146 | |
147 | $dao = new CRM_Queue_DAO_Queue(); | |
148 | $dao->name = $queueSpec['name']; | |
26c70956 TO |
149 | $dao->copyValues($queueSpec); |
150 | $dao->insert(); | |
151 | ||
cf97aeef TO |
152 | return $this->findQueueSpec($queueSpec); |
153 | } | |
154 | ||
155 | protected function findQueueSpec(array $queueSpec): ?array { | |
156 | $dao = new CRM_Queue_DAO_Queue(); | |
157 | $dao->name = $queueSpec['name']; | |
158 | if ($dao->find(TRUE)) { | |
159 | return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields)); | |
160 | } | |
161 | else { | |
162 | return NULL; | |
163 | } | |
26c70956 TO |
164 | } |
165 | ||
6a488035 | 166 | /** |
4523a2f5 TO |
167 | * Look up an existing queue. |
168 | * | |
169 | * @param array $queueSpec | |
170 | * Array with keys: | |
c218eb56 | 171 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
6a488035 | 172 | * - name: string, required, e.g. "upgrade-tasks" |
4523a2f5 | 173 | * - (additional keys depending on the queue provider). |
26c70956 | 174 | * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list |
6a488035 TO |
175 | * |
176 | * @return CRM_Queue_Queue | |
177 | */ | |
4523a2f5 | 178 | public function load($queueSpec) { |
9f603ed0 | 179 | if (is_object($this->queues[$queueSpec['name']] ?? NULL)) { |
6a488035 TO |
180 | return $this->queues[$queueSpec['name']]; |
181 | } | |
26c70956 TO |
182 | if (!empty($queueSpec['is_persistent'])) { |
183 | $queueSpec = $this->findCreateQueueSpec($queueSpec); | |
184 | } | |
6a488035 TO |
185 | $queue = $this->instantiateQueueObject($queueSpec); |
186 | $queue->loadQueue(); | |
187 | $this->queues[$queueSpec['name']] = $queue; | |
188 | return $queue; | |
189 | } | |
190 | ||
191 | /** | |
fe482240 | 192 | * Convert a queue "type" name to a class name. |
6a488035 | 193 | * |
4523a2f5 | 194 | * @param string $type |
c218eb56 | 195 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
4523a2f5 TO |
196 | * @return string |
197 | * Class-name | |
6a488035 TO |
198 | */ |
199 | protected function getQueueClass($type) { | |
200 | $type = preg_replace('/[^a-zA-Z0-9]/', '', $type); | |
201 | $className = 'CRM_Queue_Queue_' . $type; | |
202 | // FIXME: when used with class-autoloader, this may be unnecessary | |
203 | if (!class_exists($className)) { | |
204 | $classFile = 'CRM/Queue/Queue/' . $type . '.php'; | |
205 | require_once $classFile; | |
206 | } | |
207 | return $className; | |
208 | } | |
209 | ||
210 | /** | |
4523a2f5 TO |
211 | * @param array $queueSpec |
212 | * See create(). | |
6a488035 TO |
213 | * |
214 | * @return CRM_Queue_Queue | |
215 | */ | |
216 | protected function instantiateQueueObject($queueSpec) { | |
217 | // note: you should probably never do anything else here | |
218 | $class = new ReflectionClass($this->getQueueClass($queueSpec['type'])); | |
219 | return $class->newInstance($queueSpec); | |
220 | } | |
96025800 | 221 | |
86ffee74 TO |
222 | /** |
223 | * Assert that the queueSpec is well-formed. | |
224 | * | |
225 | * @param array $queueSpec | |
226 | * @throws \CRM_Core_Exception | |
227 | */ | |
228 | public function validateQueueSpec(array $queueSpec): void { | |
229 | $throw = function(string $message, ...$args) use ($queueSpec) { | |
230 | $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']); | |
231 | throw new CRM_Core_Exception($prefix . sprintf($message, ...$args)); | |
232 | }; | |
233 | ||
234 | if (empty($queueSpec['type'])) { | |
235 | $throw('Missing field "type".'); | |
236 | } | |
237 | ||
238 | // The rest of the validations only apply to persistent, runnable queues. | |
239 | if (empty($queueSpec['is_persistent']) || empty($queueSpec['runner'])) { | |
240 | return; | |
241 | } | |
242 | ||
243 | $statuses = CRM_Queue_BAO_Queue::getStatuses(); | |
244 | $status = $queueSpec['status'] ?? NULL; | |
245 | if (!isset($statuses[$status])) { | |
246 | $throw('Invalid queue status "%s".', $status); | |
247 | } | |
248 | ||
249 | $errorModes = CRM_Queue_BAO_Queue::getErrorModes(); | |
250 | $errorMode = $queueSpec['error'] ?? NULL; | |
251 | if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) { | |
252 | $throw('Invalid error mode "%s".', $errorMode); | |
253 | } | |
254 | } | |
255 | ||
6a488035 | 256 | } |