CRM_Queue_Queue_* - Add fetchItem($id)
[civicrm-core.git] / CRM / Queue / Queue.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * A queue is an object (usually backed by some persistent data store)
14 * which stores a list of tasks or messages for use by other processes.
15 *
16 * This would ideally be an interface, but it's handy to specify the
17 * "function __construct()" and the "$name" handling
18 *
19 * Note: This interface closely parallels the DrupalQueueInterface.
20 */
21 abstract class CRM_Queue_Queue {
22
23 /**
24 * @var string
25 */
26 private $_name;
27
28 /**
29 * @var array{name: string, type: string, runner: string, batch_limit: int, lease_time: ?int, retry_limit: int, retry_interval: ?int}
30 * @see \CRM_Queue_Service::create()
31 */
32 protected $queueSpec;
33
34 /**
35 * Create a reference to queue. After constructing the queue, one should
36 * usually call createQueue (if it's a new queue) or loadQueue (if it's
37 * known to be an existing queue).
38 *
39 * @param array{name: string, type: string, runner: string, batch_limit: int, lease_time: ?int, retry_limit: int, retry_interval: ?int} $queueSpec
40 * Ex: ['name' => 'my-import', 'type' => 'SqlParallel']
41 * The full definition of queueSpec is defined in CRM_Queue_Service.
42 * @see \CRM_Queue_Service::create()
43 */
44 public function __construct($queueSpec) {
45 $this->_name = $queueSpec['name'];
46 $this->queueSpec = $queueSpec;
47 unset($this->queueSpec['status']);
48 // Status may be meaningfully + independently toggled (eg when using type=SqlParallel,error=abort).
49 // Retaining a copy of 'status' in here would be misleading.
50 }
51
52 /**
53 * Determine whether this queue is currently active.
54 *
55 * @return bool
56 * TRUE if runners should continue claiming new tasks from this queue
57 * @throws \CRM_Core_Exception
58 */
59 public function isActive(): bool {
60 $status = CRM_Core_DAO::getFieldValue('CRM_Queue_DAO_Queue', $this->_name, 'status', 'name', TRUE);
61 // Note: In the future, we may want to incorporate other data (like maintenance-mode or upgrade-status) in deciding active queues.
62 return ($status === 'active');
63 }
64
65 /**
66 * Change the status of the queue.
67 *
68 * @param string $status
69 * Ex: 'active', 'draft', 'aborted'
70 */
71 public function setStatus(string $status): void {
72 CRM_Core_DAO::executeQuery('UPDATE civicrm_queue SET status = %1 WHERE name = %2', [
73 1 => ['aborted', 'String'],
74 2 => [$this->getName(), 'String'],
75 ]);
76 }
77
78 /**
79 * Determine the string name of this queue.
80 *
81 * @return string
82 */
83 public function getName() {
84 return $this->_name;
85 }
86
87 /**
88 * Get a property from the queueSpec.
89 *
90 * @param string $field
91 * @return mixed|null
92 */
93 public function getSpec(string $field) {
94 return $this->queueSpec[$field] ?? NULL;
95 }
96
97 /**
98 * Perform any registation or resource-allocation for a new queue
99 */
100 abstract public function createQueue();
101
102 /**
103 * Perform any loading or pre-fetch for an existing queue.
104 */
105 abstract public function loadQueue();
106
107 /**
108 * Release any resources claimed by the queue (memory, DB rows, etc)
109 */
110 abstract public function deleteQueue();
111
112 /**
113 * Check if the queue exists.
114 *
115 * @return bool
116 */
117 abstract public function existsQueue();
118
119 /**
120 * Add a new item to the queue.
121 *
122 * @param mixed $data
123 * Serializable PHP object or array.
124 * @param array $options
125 * Queue-dependent options; for example, if this is a
126 * priority-queue, then $options might specify the item's priority.
127 */
128 abstract public function createItem($data, $options = []);
129
130 /**
131 * Determine number of items remaining in the queue.
132 *
133 * @return int
134 */
135 abstract public function numberOfItems();
136
137 /**
138 * Get the next item.
139 *
140 * @param int $lease_time
141 * Seconds.
142 *
143 * @return object
144 * with key 'data' that matches the inputted data
145 */
146 abstract public function claimItem($lease_time = 3600);
147
148 /**
149 * Get the next item, even if there's an active lease
150 *
151 * @param int $lease_time
152 * Seconds.
153 *
154 * @return object
155 * with key 'data' that matches the inputted data
156 */
157 abstract public function stealItem($lease_time = 3600);
158
159 /**
160 * Remove an item from the queue.
161 *
162 * @param object $item
163 * The item returned by claimItem.
164 */
165 abstract public function deleteItem($item);
166
167 /**
168 * Get the full data for an item.
169 *
170 * This is a passive peek - it does not claim/steal/release anything.
171 *
172 * @param int|string $id
173 * The unique ID of the task within the queue.
174 * @return CRM_Queue_DAO_QueueItem|object|null $dao
175 */
176 abstract public function fetchItem($id);
177
178 /**
179 * Return an item that could not be processed.
180 *
181 * @param object $item
182 * The item returned by claimItem.
183 */
184 abstract public function releaseItem($item);
185
186 }