3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * A queue is an object (usually backed by some persistent data store)
14 * which stores a list of tasks or messages for use by other processes.
16 * This would ideally be an interface, but it's handy to specify the
17 * "function __construct()" and the "$name" handling
19 * Note: This interface closely parallels the DrupalQueueInterface.
21 abstract class CRM_Queue_Queue
{
23 const DEFAULT_LEASE_TIME
= 3600;
31 * @var array{name: string, type: string, runner: string, batch_limit: int, lease_time: ?int, retry_limit: int, retry_interval: ?int}
32 * @see \CRM_Queue_Service::create()
37 * Create a reference to queue. After constructing the queue, one should
38 * usually call createQueue (if it's a new queue) or loadQueue (if it's
39 * known to be an existing queue).
41 * @param array{name: string, type: string, runner: string, batch_limit: int, lease_time: ?int, retry_limit: int, retry_interval: ?int} $queueSpec
42 * Ex: ['name' => 'my-import', 'type' => 'SqlParallel']
43 * The full definition of queueSpec is defined in CRM_Queue_Service.
44 * @see \CRM_Queue_Service::create()
46 public function __construct($queueSpec) {
47 $this->_name
= $queueSpec['name'];
48 $this->queueSpec
= $queueSpec;
49 unset($this->queueSpec
['status']);
50 // Status may be meaningfully + independently toggled (eg when using type=SqlParallel,error=abort).
51 // Retaining a copy of 'status' in here would be misleading.
55 * Determine whether this queue is currently active.
58 * TRUE if runners should continue claiming new tasks from this queue
59 * @throws \CRM_Core_Exception
61 public function isActive(): bool {
62 $status = CRM_Core_DAO
::getFieldValue('CRM_Queue_DAO_Queue', $this->_name
, 'status', 'name', TRUE);
63 // Note: In the future, we may want to incorporate other data (like maintenance-mode or upgrade-status) in deciding active queues.
64 return ($status === 'active');
68 * Change the status of the queue.
70 * @param string $status
71 * Ex: 'active', 'draft', 'aborted'
73 public function setStatus(string $status): void
{
74 $result = CRM_Core_DAO
::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [
75 1 => [$status, 'String'],
76 2 => [$this->getName(), 'String'],
78 // If multiple workers try to setStatus('completed') at roughly the same time, only one will fire an event.
79 if ($result->affectedRows() > 0) {
80 CRM_Utils_Hook
::queueStatus($this, $status);
85 * Determine the string name of this queue.
89 public function getName() {
94 * Get a property from the queueSpec.
96 * @param string $field
99 public function getSpec(string $field) {
100 return $this->queueSpec
[$field] ??
NULL;
104 * Perform any registation or resource-allocation for a new queue
106 abstract public function createQueue();
109 * Perform any loading or pre-fetch for an existing queue.
111 abstract public function loadQueue();
114 * Release any resources claimed by the queue (memory, DB rows, etc)
116 abstract public function deleteQueue();
119 * Check if the queue exists.
123 abstract public function existsQueue();
126 * Add a new item to the queue.
129 * Serializable PHP object or array.
130 * @param array $options
131 * Queue-dependent options; for example, if this is a
132 * priority-queue, then $options might specify the item's priority.
134 abstract public function createItem($data, $options = []);
137 * Determine number of items remaining in the queue.
141 abstract public function numberOfItems();
146 * @param int|null $lease_time
147 * Hold a lease on the claimed item for $X seconds.
148 * If NULL, inherit a default.
150 * with key 'data' that matches the inputted data
152 abstract public function claimItem($lease_time = NULL);
155 * Get the next item, even if there's an active lease
157 * @param int $lease_time
161 * with key 'data' that matches the inputted data
163 abstract public function stealItem($lease_time = NULL);
166 * Remove an item from the queue.
168 * @param object $item
169 * The item returned by claimItem.
171 abstract public function deleteItem($item);
174 * Get the full data for an item.
176 * This is a passive peek - it does not claim/steal/release anything.
178 * @param int|string $id
179 * The unique ID of the task within the queue.
180 * @return CRM_Queue_DAO_QueueItem|object|null $dao
182 abstract public function fetchItem($id);
185 * Return an item that could not be processed.
187 * @param object $item
188 * The item returned by claimItem.
190 abstract public function releaseItem($item);