3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
23 * $queue->createItem($myData);
25 * // Some time later...
26 * $item = $queue->claimItem();
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
31 * $queue->releaseItem($item);
36 class CRM_Queue_Service
{
38 protected static $_singleton;
41 * FIXME: Singleton pattern should be removed when dependency-injection
44 * @param bool $forceNew
45 * TRUE if a new instance must be created.
47 * @return \CRM_Queue_Service
49 public static function &singleton($forceNew = FALSE) {
50 if ($forceNew ||
!self
::$_singleton) {
51 self
::$_singleton = new CRM_Queue_Service();
53 return self
::$_singleton;
59 * Format is (string $queueName => CRM_Queue_Queue).
68 public function __construct() {
73 * Create a queue. If one already exists, then it will be reused.
75 * @param array $queueSpec
77 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
78 * - name: string, required, e.g. "upgrade-tasks"
79 * - reset: bool, optional; if a queue is found, then it should be
80 * flushed; default to TRUE
81 * - (additional keys depending on the queue provider).
82 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
83 * - is_autorun: bool, optional; if true, then this queue will be auto-scanned
84 * by background task-runners
86 * @return CRM_Queue_Queue
88 public function create($queueSpec) {
89 if (is_object($this->queues
[$queueSpec['name']] ??
NULL) && empty($queueSpec['reset'])) {
90 return $this->queues
[$queueSpec['name']];
93 if (!empty($queueSpec['is_persistent'])) {
94 $queueSpec = $this->findCreateQueueSpec($queueSpec);
96 $queue = $this->instantiateQueueObject($queueSpec);
97 $exists = $queue->existsQueue();
99 $queue->createQueue();
101 elseif (@$queueSpec['reset']) {
102 $queue->deleteQueue();
103 $queue->createQueue();
108 $this->queues
[$queueSpec['name']] = $queue;
113 * Find/create the queue-spec. Specifically:
115 * - If there is a stored queue, use its spec.
116 * - If there is no stored queue, and if we have enough information, then create queue.
118 * @param array $queueSpec
121 * @throws \CRM_Core_Exception
123 protected function findCreateQueueSpec(array $queueSpec): array {
124 $storageFields = ['type', 'is_autorun'];
125 $dao = new CRM_Queue_DAO_Queue();
126 $dao->name
= $queueSpec['name'];
127 if ($dao->find(TRUE)) {
128 return array_merge($queueSpec, CRM_Utils_Array
::subset($dao->toArray(), $storageFields));
131 if (empty($queueSpec['type'])) {
132 throw new \
CRM_Core_Exception(sprintf('Failed to find or create persistent queue "%s". Missing field "%s".',
133 $queueSpec['name'], 'type'));
135 $queueSpec = array_merge(['is_autorun' => FALSE], $queueSpec);
136 $dao->copyValues($queueSpec);
143 * Look up an existing queue.
145 * @param array $queueSpec
147 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
148 * - name: string, required, e.g. "upgrade-tasks"
149 * - (additional keys depending on the queue provider).
150 * - is_persistent: bool, optional; if true, then this queue is loaded from `civicrm_queue` list
152 * @return CRM_Queue_Queue
154 public function load($queueSpec) {
155 if (is_object($this->queues
[$queueSpec['name']] ??
NULL)) {
156 return $this->queues
[$queueSpec['name']];
158 if (!empty($queueSpec['is_persistent'])) {
159 $queueSpec = $this->findCreateQueueSpec($queueSpec);
161 $queue = $this->instantiateQueueObject($queueSpec);
163 $this->queues
[$queueSpec['name']] = $queue;
168 * Convert a queue "type" name to a class name.
170 * @param string $type
171 * - type: string, required, e.g. `Sql`, `SqlParallel`, `Memory`
175 protected function getQueueClass($type) {
176 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
177 $className = 'CRM_Queue_Queue_' . $type;
178 // FIXME: when used with class-autoloader, this may be unnecessary
179 if (!class_exists($className)) {
180 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
181 require_once $classFile;
187 * @param array $queueSpec
190 * @return CRM_Queue_Queue
192 protected function instantiateQueueObject($queueSpec) {
193 // note: you should probably never do anything else here
194 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
195 return $class->newInstance($queueSpec);