3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
23 * $queue->createItem($myData);
25 * // Some time later...
26 * $item = $queue->claimItem();
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
31 * $queue->releaseItem($item);
36 class CRM_Queue_Service
{
38 protected static $_singleton;
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
46 private static $commonFields = ['name', 'type', 'runner', 'status', 'error', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
49 * FIXME: Singleton pattern should be removed when dependency-injection
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
55 * @return \CRM_Queue_Service
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew ||
!self
::$_singleton) {
59 self
::$_singleton = new CRM_Queue_Service();
61 return self
::$_singleton;
67 * Format is (string $queueName => CRM_Queue_Queue).
76 public function __construct() {
81 * Create a queue. If one already exists, then it will be reused.
83 * @param array $queueSpec
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - status: string, required for runnable-queues; specify whether the runner is currently active
94 * ex: 'active', 'draft', 'completed'
95 * - error: string, required for runnable-queues; specify what to do with unhandled errors
96 * ex: "drop" or "abort"
97 * - batch_limit: int, Maximum number of items in a batch.
98 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
99 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
100 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
101 * @return CRM_Queue_Queue
103 public function create($queueSpec) {
104 if (is_object($this->queues
[$queueSpec['name']] ??
NULL) && empty($queueSpec['reset'])) {
105 return $this->queues
[$queueSpec['name']];
108 if (!empty($queueSpec['is_persistent'])) {
109 $queueSpec = $this->findCreateQueueSpec($queueSpec);
111 $this->validateQueueSpec($queueSpec);
112 $queue = $this->instantiateQueueObject($queueSpec);
113 $exists = $queue->existsQueue();
115 $queue->createQueue();
117 elseif (@$queueSpec['reset']) {
118 $queue->deleteQueue();
119 $queue->createQueue();
124 $this->queues
[$queueSpec['name']] = $queue;
129 * Find/create the queue-spec. Specifically:
131 * - If there is a stored queue, use its spec.
132 * - If there is no stored queue, and if we have enough information, then create queue.
134 * @param array $queueSpec
137 * @throws \CRM_Core_Exception
139 protected function findCreateQueueSpec(array $queueSpec): array {
140 $loaded = $this->findQueueSpec($queueSpec);
141 if ($loaded !== NULL) {
145 $this->validateQueueSpec($queueSpec);
147 $dao = new CRM_Queue_DAO_Queue();
148 $dao->name
= $queueSpec['name'];
149 $dao->copyValues($queueSpec);
152 return $this->findQueueSpec($queueSpec);
155 protected function findQueueSpec(array $queueSpec): ?
array {
156 $dao = new CRM_Queue_DAO_Queue();
157 $dao->name
= $queueSpec['name'];
158 if ($dao->find(TRUE)) {
159 return array_merge($queueSpec, CRM_Utils_Array
::subset($dao->toArray(), static::$commonFields));
167 * Look up an existing queue.
169 * @param array $queueSpec
171 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
172 * - name: string, required, e.g. "upgrade-tasks"
173 * - (additional keys depending on the queue provider).
174 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
176 * @return CRM_Queue_Queue
178 public function load($queueSpec) {
179 if (is_object($this->queues
[$queueSpec['name']] ??
NULL)) {
180 return $this->queues
[$queueSpec['name']];
182 if (!empty($queueSpec['is_persistent'])) {
183 $queueSpec = $this->findCreateQueueSpec($queueSpec);
185 $queue = $this->instantiateQueueObject($queueSpec);
187 $this->queues
[$queueSpec['name']] = $queue;
192 * Convert a queue "type" name to a class name.
194 * @param string $type
195 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
199 protected function getQueueClass($type) {
200 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
201 $className = 'CRM_Queue_Queue_' . $type;
202 // FIXME: when used with class-autoloader, this may be unnecessary
203 if (!class_exists($className)) {
204 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
205 require_once $classFile;
211 * @param array $queueSpec
214 * @return CRM_Queue_Queue
216 protected function instantiateQueueObject($queueSpec) {
217 // note: you should probably never do anything else here
218 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
219 return $class->newInstance($queueSpec);
223 * Assert that the queueSpec is well-formed.
225 * @param array $queueSpec
226 * @throws \CRM_Core_Exception
228 public function validateQueueSpec(array $queueSpec): void
{
229 $throw = function(string $message, ...$args) use ($queueSpec) {
230 $prefix = sprintf('Failed to create queue "%s". ', $queueSpec['name']);
231 throw new CRM_Core_Exception($prefix . sprintf($message, ...$args));
234 if (empty($queueSpec['type'])) {
235 $throw('Missing field "type".');
238 // The rest of the validations only apply to persistent, runnable queues.
239 if (empty($queueSpec['is_persistent']) ||
empty($queueSpec['runner'])) {
243 $statuses = CRM_Queue_BAO_Queue
::getStatuses();
244 $status = $queueSpec['status'] ??
NULL;
245 if (!isset($statuses[$status])) {
246 $throw('Invalid queue status "%s".', $status);
249 $errorModes = CRM_Queue_BAO_Queue
::getErrorModes();
250 $errorMode = $queueSpec['error'] ??
NULL;
251 if ($queueSpec['runner'] === 'task' && !isset($errorModes[$errorMode])) {
252 $throw('Invalid error mode "%s".', $errorMode);