338c46eac253919d62c41741a960cd0327985394
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
23 * $queue->createItem($myData);
25 * // Some time later...
26 * $item = $queue->claimItem();
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
31 * $queue->releaseItem($item);
36 class CRM_Queue_Service
{
38 protected static $_singleton;
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
46 private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
49 * FIXME: Singleton pattern should be removed when dependency-injection
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
55 * @return \CRM_Queue_Service
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew ||
!self
::$_singleton) {
59 self
::$_singleton = new CRM_Queue_Service();
61 return self
::$_singleton;
67 * Format is (string $queueName => CRM_Queue_Queue).
76 public function __construct() {
81 * Create a queue. If one already exists, then it will be reused.
83 * @param array $queueSpec
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - status: string, required for runnable-queues; specify whether the runner is currently active
94 * ex: 'active', 'draft', 'completed'
95 * - error: string, required for runnable-queues; specify what to do with unhandled errors
96 * ex: "drop" or "abort"
97 * - batch_limit: int, Maximum number of items in a batch.
98 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
99 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
100 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
101 * @return CRM_Queue_Queue
103 public function create($queueSpec) {
104 if (is_object($this->queues
[$queueSpec['name']] ??
NULL) && empty($queueSpec['reset'])) {
105 return $this->queues
[$queueSpec['name']];
108 if (!empty($queueSpec['is_persistent'])) {
109 $queueSpec = $this->findCreateQueueSpec($queueSpec);
111 $this->validateQueueSpec($queueSpec);
112 $queue = $this->instantiateQueueObject($queueSpec);
113 $exists = $queue->existsQueue();
115 $queue->createQueue();
117 elseif (@$queueSpec['reset']) {
118 $queue->deleteQueue();
119 $queue->createQueue();
124 $this->queues
[$queueSpec['name']] = $queue;
129 * Find/create the queue-spec. Specifically:
131 * - If there is a stored queue, use its spec.
132 * - If there is no stored queue, and if we have enough information, then create queue.
134 * @param array $queueSpec
137 * @throws \CRM_Core_Exception
139 protected function findCreateQueueSpec(array $queueSpec): array {
140 $loaded = $this->findQueueSpec($queueSpec);
141 if ($loaded !== NULL) {
145 if (isset($queueSpec['template'])) {
146 $base = $this->findQueueSpec(['name' => $queueSpec['template']]);
147 $reset = ['is_template' => 0];
148 $queueSpec = array_merge($base, $reset, $queueSpec);
151 $this->validateQueueSpec($queueSpec);
153 $dao = new CRM_Queue_DAO_Queue();
154 $dao->name
= $queueSpec['name'];
155 $dao->copyValues($queueSpec);
158 return $this->findQueueSpec($queueSpec);
161 protected function findQueueSpec(array $queueSpec): ?
array {
162 $dao = new CRM_Queue_DAO_Queue();
163 $dao->name
= $queueSpec['name'];
164 if ($dao->find(TRUE)) {
165 return array_merge($queueSpec, CRM_Utils_Array
::subset($dao->toArray(), static::$commonFields));
173 * Look up an existing queue.
175 * @param array $queueSpec
177 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
178 * - name: string, required, e.g. "upgrade-tasks"
179 * - (additional keys depending on the queue provider).
180 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
182 * @return CRM_Queue_Queue
184 public function load($queueSpec) {
185 if (is_object($this->queues
[$queueSpec['name']] ??
NULL)) {
186 return $this->queues
[$queueSpec['name']];
188 if (!empty($queueSpec['is_persistent'])) {
189 $queueSpec = $this->findCreateQueueSpec($queueSpec);
191 $queue = $this->instantiateQueueObject($queueSpec);
193 $this->queues
[$queueSpec['name']] = $queue;
198 * Convert a queue "type" name to a class name.
200 * @param string $type
201 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
205 protected function getQueueClass($type) {
206 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
207 $className = 'CRM_Queue_Queue_' . $type;
208 // FIXME: when used with class-autoloader, this may be unnecessary
209 if (!class_exists($className)) {
210 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
211 require_once $classFile;
217 * @param array $queueSpec
220 * @return CRM_Queue_Queue
222 protected function instantiateQueueObject($queueSpec) {
223 // note: you should probably never do anything else here
224 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
225 return $class->newInstance($queueSpec);
229 * Assert that the queueSpec is well-formed.
231 * @param array $queueSpec
232 * @throws \CRM_Core_Exception
234 public function validateQueueSpec(array $queueSpec): void
{
235 $throw = function(string $message, ...$args) use ($queueSpec) {
236 $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
237 throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
240 if (empty($queueSpec['type'])) {
241 $throw('Missing field "type".');
244 // The rest of the validations only apply to persistent, runnable queues.
245 if (empty($queueSpec['is_persistent']) ||
empty($queueSpec['runner'])) {
249 $statuses = CRM_Queue_BAO_Queue
::getStatuses();
250 $status = $queueSpec['status'] ??
NULL;
251 if (!isset($statuses[$status])) {
252 $throw('Invalid queue status "%s".', $status);
255 $errorModes = CRM_Queue_BAO_Queue
::getErrorModes();
256 $errorMode = $queueSpec['error'] ??
NULL;
257 if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
258 $throw('Invalid error mode "%s".', $errorMode);