4 +--------------------------------------------------------------------+
5 | Copyright CiviCRM LLC. All rights reserved. |
7 | This work is published under the GNU AGPLv3 license with some |
8 | permitted exceptions and without any warranty. For full license |
9 | and copyright information, see https://civicrm.org/licensing |
10 +--------------------------------------------------------------------+
14 * Trait defines methods that are commonly used to implement a SQL-backed queue.
16 trait CRM_Queue_Queue_SqlTrait
{
19 * Perform any registation or resource-allocation for a new queue
21 public function createQueue() {
22 // nothing to do -- just start CRUDing items in the appropriate table
26 * Perform any loading or pre-fetch for an existing queue.
28 public function loadQueue() {
29 // nothing to do -- just start CRUDing items in the appropriate table
33 * Release any resources claimed by the queue (memory, DB rows, etc)
35 public function deleteQueue() {
36 return CRM_Core_DAO
::singleValueQuery("
37 DELETE FROM civicrm_queue_item
40 1 => [$this->getName(), 'String'],
45 * Check if the queue exists.
49 public function existsQueue() {
50 return ($this->numberOfItems() > 0);
54 * Determine number of items remaining in the queue.
58 public function numberOfItems() {
59 return CRM_Core_DAO
::singleValueQuery("
61 FROM civicrm_queue_item
64 1 => [$this->getName(), 'String'],
69 * Add a new item to the queue.
72 * Serializable PHP object or array.
73 * @param array $options
74 * Queue-dependent options; for example, if this is a
75 * priority-queue, then $options might specify the item's priority.
77 public function createItem($data, $options = []) {
78 $dao = new CRM_Queue_DAO_QueueItem();
79 $dao->queue_name
= $this->getName();
80 $dao->submit_time
= CRM_Utils_Time
::getTime('YmdHis');
81 $dao->data
= serialize($data);
82 $dao->weight
= CRM_Utils_Array
::value('weight', $options, 0);
87 * Remove an item from the queue.
89 * @param CRM_Core_DAO|stdClass $item
90 * The item returned by claimItem.
92 public function deleteItem($item) {
93 $this->deleteItems([$item]);
96 public function deleteItems($items): void
{
100 $sql = CRM_Utils_SQL
::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [
101 'ids' => CRM_Utils_Array
::collect('id', $items),
102 'name' => $this->getName(),
104 CRM_Core_DAO
::executeQuery($sql);
105 $this->freeDAOs($items);
109 * Get the full data for an item.
111 * This is a passive peek - it does not claim/steal/release anything.
113 * @param int|string $id
114 * The unique ID of the task within the queue.
115 * @return CRM_Queue_DAO_QueueItem|object|null $dao
117 public function fetchItem($id) {
118 $items = $this->fetchItems([$id]);
119 return $items[0] ??
NULL;
122 public function fetchItems(array $ids): array {
123 $dao = CRM_Utils_SQL_Select
::from('civicrm_queue_item')
124 ->select(['id', 'data', 'run_count'])
125 ->where('id IN (#ids)', ['ids' => $ids])
126 ->where('queue_name = @name', ['name' => $this->getName()])
129 while ($dao->fetch()) {
130 $result[] = (object) [
132 'data' => unserialize($dao->data
),
133 'run_count' => $dao->run_count
,
134 'queue_name' => $this->getName(),
141 * Return an item that could not be processed.
143 * @param CRM_Core_DAO $item
144 * The item returned by claimItem.
146 public function releaseItem($item) {
147 $this->releaseItems([$item]);
150 public function releaseItems($items): void
{
154 $sql = empty($this->queueSpec
['retry_interval'])
155 ?
'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name'
156 : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name';
157 CRM_Core_DAO
::executeQuery(CRM_Utils_SQL
::interpolate($sql, [
158 'ids' => CRM_Utils_Array
::collect('id', $items),
159 'name' => $this->getName(),
160 'retry' => $this->queueSpec
['retry_interval'] ??
NULL,
162 $this->freeDAOs($items);
165 protected function freeDAOs($mixed) {
166 $mixed = (array) $mixed;
167 foreach ($mixed as $item) {
168 if ($item instanceof CRM_Core_DAO
) {