Merge pull request #21993 from mattwire/aclcachequery2
[civicrm-core.git] / CRM / Queue / Service.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
17 *
18 * ```
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
22 * ));
23 * $queue->createItem($myData);
24 *
25 * // Some time later...
26 * $item = $queue->claimItem();
27 * if ($item) {
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
30 * } else {
31 * $queue->releaseItem($item);
32 * }
33 * }
34 * ```
35 */
36 class CRM_Queue_Service {
37
38 protected static $_singleton;
39
40 /**
41 * List of fields which are shared by `$queueSpec` and `civicrm_queue`.
42 *
43 * @var string[]
44 * @readonly
45 */
46 private static $commonFields = ['name', 'type', 'runner', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval'];
47
48 /**
49 * FIXME: Singleton pattern should be removed when dependency-injection
50 * becomes available.
51 *
52 * @param bool $forceNew
53 * TRUE if a new instance must be created.
54 *
55 * @return \CRM_Queue_Service
56 */
57 public static function &singleton($forceNew = FALSE) {
58 if ($forceNew || !self::$_singleton) {
59 self::$_singleton = new CRM_Queue_Service();
60 }
61 return self::$_singleton;
62 }
63
64 /**
65 * Queues.
66 *
67 * Format is (string $queueName => CRM_Queue_Queue).
68 *
69 * @var array
70 */
71 public $queues;
72
73 /**
74 * Class constructor.
75 */
76 public function __construct() {
77 $this->queues = [];
78 }
79
80 /**
81 * Create a queue. If one already exists, then it will be reused.
82 *
83 * @param array $queueSpec
84 * Array with keys:
85 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
86 * - name: string, required, e.g. "upgrade-tasks"
87 * - reset: bool, optional; if a queue is found, then it should be
88 * flushed; default to TRUE
89 * - (additional keys depending on the queue provider).
90 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
91 * - runner: string, optional; if given, then items in this queue can run
92 * automatically via `hook_civicrm_queueRun_{$runner}`
93 * - batch_limit: int, Maximum number of items in a batch.
94 * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds)
95 * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable.
96 * - retry_interval: int, Number of seconds to wait before retrying a failed execution.
97 * @return CRM_Queue_Queue
98 */
99 public function create($queueSpec) {
100 if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) {
101 return $this->queues[$queueSpec['name']];
102 }
103
104 if (!empty($queueSpec['is_persistent'])) {
105 $queueSpec = $this->findCreateQueueSpec($queueSpec);
106 }
107 $queue = $this->instantiateQueueObject($queueSpec);
108 $exists = $queue->existsQueue();
109 if (!$exists) {
110 $queue->createQueue();
111 }
112 elseif (@$queueSpec['reset']) {
113 $queue->deleteQueue();
114 $queue->createQueue();
115 }
116 else {
117 $queue->loadQueue();
118 }
119 $this->queues[$queueSpec['name']] = $queue;
120 return $queue;
121 }
122
123 /**
124 * Find/create the queue-spec. Specifically:
125 *
126 * - If there is a stored queue, use its spec.
127 * - If there is no stored queue, and if we have enough information, then create queue.
128 *
129 * @param array $queueSpec
130 * @return array
131 * Updated queueSpec.
132 * @throws \CRM_Core_Exception
133 */
134 protected function findCreateQueueSpec(array $queueSpec): array {
135 $loaded = $this->findQueueSpec($queueSpec);
136 if ($loaded !== NULL) {
137 return $loaded;
138 }
139
140 if (empty($queueSpec['type'])) {
141 throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
142 $queueSpec['name'], 'type'));
143 }
144
145 $dao = new CRM_Queue_DAO_Queue();
146 $dao->name = $queueSpec['name'];
147 $dao->copyValues($queueSpec);
148 $dao->insert();
149
150 return $this->findQueueSpec($queueSpec);
151 }
152
153 protected function findQueueSpec(array $queueSpec): ?array {
154 $dao = new CRM_Queue_DAO_Queue();
155 $dao->name = $queueSpec['name'];
156 if ($dao->find(TRUE)) {
157 return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields));
158 }
159 else {
160 return NULL;
161 }
162 }
163
164 /**
165 * Look up an existing queue.
166 *
167 * @param array $queueSpec
168 * Array with keys:
169 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
170 * - name: string, required, e.g. "upgrade-tasks"
171 * - (additional keys depending on the queue provider).
172 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
173 *
174 * @return CRM_Queue_Queue
175 */
176 public function load($queueSpec) {
177 if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
178 return $this->queues[$queueSpec['name']];
179 }
180 if (!empty($queueSpec['is_persistent'])) {
181 $queueSpec = $this->findCreateQueueSpec($queueSpec);
182 }
183 $queue = $this->instantiateQueueObject($queueSpec);
184 $queue->loadQueue();
185 $this->queues[$queueSpec['name']] = $queue;
186 return $queue;
187 }
188
189 /**
190 * Convert a queue "type" name to a class name.
191 *
192 * @param string $type
193 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
194 * @return string
195 * Class-name
196 */
197 protected function getQueueClass($type) {
198 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
199 $className = 'CRM_Queue_Queue_' . $type;
200 // FIXME: when used with class-autoloader, this may be unnecessary
201 if (!class_exists($className)) {
202 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
203 require_once $classFile;
204 }
205 return $className;
206 }
207
208 /**
209 * @param array $queueSpec
210 * See create().
211 *
212 * @return CRM_Queue_Queue
213 */
214 protected function instantiateQueueObject($queueSpec) {
215 // note: you should probably never do anything else here
216 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
217 return $class->newInstance($queueSpec);
218 }
219
220 }