3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
23 * $queue->createItem($myData);
25 * // Some time later...
26 * $item = $queue->claimItem();
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
31 * $queue->releaseItem($item);
36 class CRM_Queue_Service
{
38 protected static $_singleton;
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
46 private static $commonFields = ['name', 'type', 'runner', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
49 * FIXME: Singleton pattern should be removed when dependency-injection
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
55 * @return \CRM_Queue_Service
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew ||
!self
::$_singleton) {
59 self
::$_singleton = new CRM_Queue_Service();
61 return self
::$_singleton;
67 * Format is (string $queueName => CRM_Queue_Queue).
76 public function __construct() {
81 * Create a queue. If one already exists, then it will be reused.
83 * @param array $queueSpec
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - batch_limit: int, Maximum number of items in a batch.
94 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
95 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
96 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
97 * @return CRM_Queue_Queue
99 public function create($queueSpec) {
100 if (is_object($this->queues
[$queueSpec['name']] ??
NULL) && empty($queueSpec['reset'])) {
101 return $this->queues
[$queueSpec['name']];
104 if (!empty($queueSpec['is_persistent'])) {
105 $queueSpec = $this->findCreateQueueSpec($queueSpec);
107 $queue = $this->instantiateQueueObject($queueSpec);
108 $exists = $queue->existsQueue();
110 $queue->createQueue();
112 elseif (@$queueSpec['reset']) {
113 $queue->deleteQueue();
114 $queue->createQueue();
119 $this->queues
[$queueSpec['name']] = $queue;
124 * Find/create the queue-spec. Specifically:
126 * - If there is a stored queue, use its spec.
127 * - If there is no stored queue, and if we have enough information, then create queue.
129 * @param array $queueSpec
132 * @throws \CRM_Core_Exception
134 protected function findCreateQueueSpec(array $queueSpec): array {
135 $loaded = $this->findQueueSpec($queueSpec);
136 if ($loaded !== NULL) {
140 if (empty($queueSpec['type'])) {
141 throw new \
CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
142 $queueSpec['name'], 'type'));
145 $dao = new CRM_Queue_DAO_Queue();
146 $dao->name
= $queueSpec['name'];
147 $dao->copyValues($queueSpec);
150 return $this->findQueueSpec($queueSpec);
153 protected function findQueueSpec(array $queueSpec): ?
array {
154 $dao = new CRM_Queue_DAO_Queue();
155 $dao->name
= $queueSpec['name'];
156 if ($dao->find(TRUE)) {
157 return array_merge($queueSpec, CRM_Utils_Array
::subset($dao->toArray(), static::$commonFields));
165 * Look up an existing queue.
167 * @param array $queueSpec
169 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
170 * - name: string, required, e.g. "upgrade-tasks"
171 * - (additional keys depending on the queue provider).
172 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
174 * @return CRM_Queue_Queue
176 public function load($queueSpec) {
177 if (is_object($this->queues
[$queueSpec['name']] ??
NULL)) {
178 return $this->queues
[$queueSpec['name']];
180 if (!empty($queueSpec['is_persistent'])) {
181 $queueSpec = $this->findCreateQueueSpec($queueSpec);
183 $queue = $this->instantiateQueueObject($queueSpec);
185 $this->queues
[$queueSpec['name']] = $queue;
190 * Convert a queue "type" name to a class name.
192 * @param string $type
193 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
197 protected function getQueueClass($type) {
198 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
199 $className = 'CRM_Queue_Queue_' . $type;
200 // FIXME: when used with class-autoloader, this may be unnecessary
201 if (!class_exists($className)) {
202 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
203 require_once $classFile;
209 * @param array $queueSpec
212 * @return CRM_Queue_Queue
214 protected function instantiateQueueObject($queueSpec) {
215 // note: you should probably never do anything else here
216 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
217 return $class->newInstance($queueSpec);