Merge pull request #20093 from larssandergreen/mailings-AB-test-improvements
[civicrm-core.git] / CRM / Queue / Service.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
17 *
18 * ```
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
22 * ));
23 * $queue->createItem($myData);
24 *
25 * // Some time later...
26 * $item = $queue->claimItem();
27 * if ($item) {
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
30 * } else {
31 * $queue->releaseItem($item);
32 * }
33 * }
34 * ```
35 */
36 class CRM_Queue_Service {
37
38 protected static $_singleton;
39
40 /**
41 * FIXME: Singleton pattern should be removed when dependency-injection
42 * becomes available.
43 *
44 * @param bool $forceNew
45 * TRUE if a new instance must be created.
46 *
47 * @return \CRM_Queue_Service
48 */
49 public static function &singleton($forceNew = FALSE) {
50 if ($forceNew || !self::$_singleton) {
51 self::$_singleton = new CRM_Queue_Service();
52 }
53 return self::$_singleton;
54 }
55
56 /**
57 * Queues.
58 *
59 * Format is (string $queueName => CRM_Queue_Queue).
60 *
61 * @var array
62 */
63 public $queues;
64
65 /**
66 * Class constructor.
67 */
68 public function __construct() {
69 $this->queues = [];
70 }
71
72 /**
73 * Create a queue. If one already exists, then it will be reused.
74 *
75 * @param array $queueSpec
76 * Array with keys:
77 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
78 * - name: string, required, e.g. "upgrade-tasks"
79 * - reset: bool, optional; if a queue is found, then it should be
80 * flushed; default to TRUE
81 * - (additional keys depending on the queue provider).
82 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
83 * - is_autorun: bool, optional; if true, then this queue will be auto-scanned
84 * by background task-runners
85 *
86 * @return CRM_Queue_Queue
87 */
88 public function create($queueSpec) {
89 if (is_object($this->queues[$queueSpec['name']] ?? NULL) && empty($queueSpec['reset'])) {
90 return $this->queues[$queueSpec['name']];
91 }
92
93 if (!empty($queueSpec['is_persistent'])) {
94 $queueSpec = $this->findCreateQueueSpec($queueSpec);
95 }
96 $queue = $this->instantiateQueueObject($queueSpec);
97 $exists = $queue->existsQueue();
98 if (!$exists) {
99 $queue->createQueue();
100 }
101 elseif (@$queueSpec['reset']) {
102 $queue->deleteQueue();
103 $queue->createQueue();
104 }
105 else {
106 $queue->loadQueue();
107 }
108 $this->queues[$queueSpec['name']] = $queue;
109 return $queue;
110 }
111
112 /**
113 * Find/create the queue-spec. Specifically:
114 *
115 * - If there is a stored queue, use its spec.
116 * - If there is no stored queue, and if we have enough information, then create queue.
117 *
118 * @param array $queueSpec
119 * @return array
120 * Updated queueSpec.
121 * @throws \CRM_Core_Exception
122 */
123 protected function findCreateQueueSpec(array $queueSpec): array {
124 $storageFields = ['type', 'is_autorun'];
125 $dao = new CRM_Queue_DAO_Queue();
126 $dao->name = $queueSpec['name'];
127 if ($dao->find(TRUE)) {
128 return array_merge($queueSpec, CRM_Utils_Array::subset($dao->toArray(), $storageFields));
129 }
130
131 if (empty($queueSpec['type'])) {
132 throw new \CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
133 $queueSpec['name'], 'type'));
134 }
135 $queueSpec = array_merge(['is_autorun' => FALSE], $queueSpec);
136 $dao->copyValues($queueSpec);
137 $dao->insert();
138
139 return $queueSpec;
140 }
141
142 /**
143 * Look up an existing queue.
144 *
145 * @param array $queueSpec
146 * Array with keys:
147 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
148 * - name: string, required, e.g. "upgrade-tasks"
149 * - (additional keys depending on the queue provider).
150 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
151 *
152 * @return CRM_Queue_Queue
153 */
154 public function load($queueSpec) {
155 if (is_object($this->queues[$queueSpec['name']] ?? NULL)) {
156 return $this->queues[$queueSpec['name']];
157 }
158 if (!empty($queueSpec['is_persistent'])) {
159 $queueSpec = $this->findCreateQueueSpec($queueSpec);
160 }
161 $queue = $this->instantiateQueueObject($queueSpec);
162 $queue->loadQueue();
163 $this->queues[$queueSpec['name']] = $queue;
164 return $queue;
165 }
166
167 /**
168 * Convert a queue "type" name to a class name.
169 *
170 * @param string $type
171 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
172 * @return string
173 * Class-name
174 */
175 protected function getQueueClass($type) {
176 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
177 $className = 'CRM_Queue_Queue_' . $type;
178 // FIXME: when used with class-autoloader, this may be unnecessary
179 if (!class_exists($className)) {
180 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
181 require_once $classFile;
182 }
183 return $className;
184 }
185
186 /**
187 * @param array $queueSpec
188 * See create().
189 *
190 * @return CRM_Queue_Queue
191 */
192 protected function instantiateQueueObject($queueSpec) {
193 // note: you should probably never do anything else here
194 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
195 return $class->newInstance($queueSpec);
196 }
197
198 }