Commit | Line | Data |
---|---|---|
6a488035 TO |
1 | <?php |
2 | /* | |
3 | +--------------------------------------------------------------------+ | |
bc77d7c0 | 4 | | Copyright CiviCRM LLC. All rights reserved. | |
6a488035 | 5 | | | |
bc77d7c0 TO |
6 | | This work is published under the GNU AGPLv3 license with some | |
7 | | permitted exceptions and without any warranty. For full license | | |
8 | | and copyright information, see https://civicrm.org/licensing | | |
6a488035 | 9 | +--------------------------------------------------------------------+ |
d25dd0ee | 10 | */ |
6a488035 TO |
11 | |
12 | /** | |
13 | * The queue service provides an interface for creating or locating | |
14 | * queues. Note that this approach hides the details of data-storage: | |
15 | * different queue-providers may store the queue content in different | |
16 | * ways (in memory, in SQL, or in an external service). | |
17 | * | |
0b882a86 | 18 | * ``` |
6a488035 TO |
19 | * $queue = CRM_Queue_Service::singleton()->create(array( |
20 | * 'type' => 'interactive', | |
21 | * 'name' => 'upgrade-tasks', | |
22 | * )); | |
23 | * $queue->createItem($myData); | |
24 | * | |
25 | * // Some time later... | |
26 | * $item = $queue->claimItem(); | |
5e74f823 TO |
27 | * if ($item) { |
28 | * if (my_process($item->data)) { | |
16e38e82 | 29 | * $queue->deleteItem($item); |
5e74f823 | 30 | * } else { |
16e38e82 | 31 | * $queue->releaseItem($item); |
5e74f823 | 32 | * } |
6a488035 | 33 | * } |
0b882a86 | 34 | * ``` |
6a488035 TO |
35 | */ |
36 | class CRM_Queue_Service { | |
37 | ||
4523a2f5 | 38 | protected static $_singleton; |
6a488035 | 39 | |
cf97aeef TO |
40 | /** |
41 | * List of fields which are shared by `$queueSpec` and `civicrm_queue`. | |
42 | * | |
43 | * @var string[] | |
44 | * @readonly | |
45 | */ | |
46 | private static $commonFields = ['name', 'type', 'runner', 'batch_limit', 'lease_time', 'retry_limit', 'retry_interval']; | |
47 | ||
6a488035 TO |
48 | /** |
49 | * FIXME: Singleton pattern should be removed when dependency-injection | |
50 | * becomes available. | |
51 | * | |
4523a2f5 TO |
52 | * @param bool $forceNew |
53 | * TRUE if a new instance must be created. | |
77b97be7 EM |
54 | * |
55 | * @return \CRM_Queue_Service | |
6a488035 | 56 | */ |
4523a2f5 | 57 | public static function &singleton($forceNew = FALSE) { |
6a488035 TO |
58 | if ($forceNew || !self::$_singleton) { |
59 | self::$_singleton = new CRM_Queue_Service(); | |
60 | } | |
61 | return self::$_singleton; | |
62 | } | |
63 | ||
64 | /** | |
041ecc95 | 65 | * Queues. |
66 | * | |
67 | * Format is (string $queueName => CRM_Queue_Queue). | |
68 | * | |
69 | * @var array | |
6a488035 | 70 | */ |
4523a2f5 | 71 | public $queues; |
e0ef6999 EM |
72 | |
73 | /** | |
041ecc95 | 74 | * Class constructor. |
e0ef6999 | 75 | */ |
4523a2f5 | 76 | public function __construct() { |
be2fb01f | 77 | $this->queues = []; |
6a488035 TO |
78 | } |
79 | ||
80 | /** | |
4523a2f5 TO |
81 | * Create a queue. If one already exists, then it will be reused. |
82 | * | |
83 | * @param array $queueSpec | |
84 | * Array with keys: | |
c218eb56 | 85 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
6a488035 | 86 | * - name: string, required, e.g. "upgrade-tasks" |
4523a2f5 TO |
87 | * - reset: bool, optional; if a queue is found, then it should be |
88 | * flushed; default to TRUE | |
89 | * - (additional keys depending on the queue provider). | |
26c70956 | 90 | * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list |
cf97aeef TO |
91 | * - runner: string, optional; if given, then items in this queue can run |
92 | * automatically via `hook_civicrm_queueRun_{$runner}` | |
c7920734 | 93 | * - batch_limit: int, Maximum number of items in a batch. |
cf97aeef | 94 | * - lease_time: int, When claiming an item (or batch of items) for work, how long should the item(s) be reserved. (Seconds) |
f0297973 | 95 | * - retry_limit: int, Number of permitted retries. Set to zero (0) to disable. |
cf97aeef | 96 | * - retry_interval: int, Number of seconds to wait before retrying a failed execution. |
6a488035 TO |
97 | * @return CRM_Queue_Queue |
98 | */ | |
4523a2f5 | 99 | public function create($queueSpec) { |
689e75e9 | 100 | if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) { |
6a488035 TO |
101 | return $this->queues[$queueSpec['name']]; |
102 | } | |
103 | ||
26c70956 TO |
104 | if (!empty($queueSpec['is_persistent'])) { |
105 | $queueSpec = $this->findCreateQueueSpec($queueSpec); | |
106 | } | |
6a488035 TO |
107 | $queue = $this->instantiateQueueObject($queueSpec); |
108 | $exists = $queue->existsQueue(); | |
109 | if (!$exists) { | |
110 | $queue->createQueue(); | |
111 | } | |
112 | elseif (@$queueSpec['reset']) { | |
113 | $queue->deleteQueue(); | |
114 | $queue->createQueue(); | |
115 | } | |
116 | else { | |
117 | $queue->loadQueue(); | |
118 | } | |
119 | $this->queues[$queueSpec['name']] = $queue; | |
120 | return $queue; | |
121 | } | |
122 | ||
26c70956 TO |
123 | /** |
124 | * Find/create the queue-spec. Specifically: | |
125 | * | |
126 | * - If there is a stored queue, use its spec. | |
127 | * - If there is no stored queue, and if we have enough information, then create queue. | |
128 | * | |
129 | * @param array $queueSpec | |
130 | * @return array | |
131 | * Updated queueSpec. | |
132 | * @throws \CRM_Core_Exception | |
133 | */ | |
134 | protected function findCreateQueueSpec(array $queueSpec): array { | |
cf97aeef TO |
135 | $loaded = $this->findQueueSpec($queueSpec); |
136 | if ($loaded !== NULL) { | |
137 | return $loaded; | |
26c70956 TO |
138 | } |
139 | ||
140 | if (empty($queueSpec['type'])) { | |
141 | throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".', | |
142 | $queueSpec['name'], 'type')); | |
143 | } | |
cf97aeef TO |
144 | |
145 | $dao = new CRM_Queue_DAO_Queue(); | |
146 | $dao->name = $queueSpec['name']; | |
26c70956 TO |
147 | $dao->copyValues($queueSpec); |
148 | $dao->insert(); | |
149 | ||
cf97aeef TO |
150 | return $this->findQueueSpec($queueSpec); |
151 | } | |
152 | ||
153 | protected function findQueueSpec(array $queueSpec): ?array { | |
154 | $dao = new CRM_Queue_DAO_Queue(); | |
155 | $dao->name = $queueSpec['name']; | |
156 | if ($dao->find(TRUE)) { | |
157 | return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), static::$commonFields)); | |
158 | } | |
159 | else { | |
160 | return NULL; | |
161 | } | |
26c70956 TO |
162 | } |
163 | ||
6a488035 | 164 | /** |
4523a2f5 TO |
165 | * Look up an existing queue. |
166 | * | |
167 | * @param array $queueSpec | |
168 | * Array with keys: | |
c218eb56 | 169 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
6a488035 | 170 | * - name: string, required, e.g. "upgrade-tasks" |
4523a2f5 | 171 | * - (additional keys depending on the queue provider). |
26c70956 | 172 | * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list |
6a488035 TO |
173 | * |
174 | * @return CRM_Queue_Queue | |
175 | */ | |
4523a2f5 | 176 | public function load($queueSpec) { |
9f603ed0 | 177 | if (is_object($this->queues[$queueSpec['name']] ?? NULL)) { |
6a488035 TO |
178 | return $this->queues[$queueSpec['name']]; |
179 | } | |
26c70956 TO |
180 | if (!empty($queueSpec['is_persistent'])) { |
181 | $queueSpec = $this->findCreateQueueSpec($queueSpec); | |
182 | } | |
6a488035 TO |
183 | $queue = $this->instantiateQueueObject($queueSpec); |
184 | $queue->loadQueue(); | |
185 | $this->queues[$queueSpec['name']] = $queue; | |
186 | return $queue; | |
187 | } | |
188 | ||
189 | /** | |
fe482240 | 190 | * Convert a queue "type" name to a class name. |
6a488035 | 191 | * |
4523a2f5 | 192 | * @param string $type |
c218eb56 | 193 | * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory` |
4523a2f5 TO |
194 | * @return string |
195 | * Class-name | |
6a488035 TO |
196 | */ |
197 | protected function getQueueClass($type) { | |
198 | $type = preg_replace('/[^a-zA-Z0-9]/', '', $type); | |
199 | $className = 'CRM_Queue_Queue_' . $type; | |
200 | // FIXME: when used with class-autoloader, this may be unnecessary | |
201 | if (!class_exists($className)) { | |
202 | $classFile = 'CRM/Queue/Queue/' . $type . '.php'; | |
203 | require_once $classFile; | |
204 | } | |
205 | return $className; | |
206 | } | |
207 | ||
208 | /** | |
4523a2f5 TO |
209 | * @param array $queueSpec |
210 | * See create(). | |
6a488035 TO |
211 | * |
212 | * @return CRM_Queue_Queue | |
213 | */ | |
214 | protected function instantiateQueueObject($queueSpec) { | |
215 | // note: you should probably never do anything else here | |
216 | $class = new ReflectionClass($this->getQueueClass($queueSpec['type'])); | |
217 | return $class->newInstance($queueSpec); | |
218 | } | |
96025800 | 219 | |
6a488035 | 220 | } |