Merge pull request #23283 from eileenmcnaughton/import_saved_map
[civicrm-core.git] / CRM / Queue / Queue / SqlTrait.php
CommitLineData
13812d8e
TO
1<?php
2
3/*
4 +--------------------------------------------------------------------+
5 | Copyright CiviCRM LLC. All rights reserved. |
6 | |
7 | This work is published under the GNU AGPLv3 license with some |
8 | permitted exceptions and without any warranty. For full license |
9 | and copyright information, see https://civicrm.org/licensing |
10 +--------------------------------------------------------------------+
11 */
12
13/**
14 * Trait defines methods that are commonly used to implement a SQL-backed queue.
15 */
16trait CRM_Queue_Queue_SqlTrait {
17
18 /**
19 * Perform any registation or resource-allocation for a new queue
20 */
21 public function createQueue() {
22 // nothing to do -- just start CRUDing items in the appropriate table
23 }
24
25 /**
26 * Perform any loading or pre-fetch for an existing queue.
27 */
28 public function loadQueue() {
29 // nothing to do -- just start CRUDing items in the appropriate table
30 }
31
32 /**
33 * Release any resources claimed by the queue (memory, DB rows, etc)
34 */
35 public function deleteQueue() {
36 return CRM_Core_DAO::singleValueQuery("
37 DELETE FROM civicrm_queue_item
38 WHERE queue_name = %1
39 ", [
40 1 => [$this->getName(), 'String'],
41 ]);
42 }
43
44 /**
45 * Check if the queue exists.
46 *
47 * @return bool
48 */
49 public function existsQueue() {
50 return ($this->numberOfItems() > 0);
51 }
52
53 /**
54 * Determine number of items remaining in the queue.
55 *
56 * @return int
57 */
58 public function numberOfItems() {
59 return CRM_Core_DAO::singleValueQuery("
60 SELECT count(*)
61 FROM civicrm_queue_item
62 WHERE queue_name = %1
63 ", [
64 1 => [$this->getName(), 'String'],
65 ]);
66 }
67
0c610bd9
TO
68 /**
69 * Add a new item to the queue.
70 *
71 * @param mixed $data
72 * Serializable PHP object or array.
73 * @param array $options
74 * Queue-dependent options; for example, if this is a
75 * priority-queue, then $options might specify the item's priority.
76 */
77 public function createItem($data, $options = []) {
78 $dao = new CRM_Queue_DAO_QueueItem();
79 $dao->queue_name = $this->getName();
80 $dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
81 $dao->data = serialize($data);
82 $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
83 $dao->save();
84 }
85
13812d8e
TO
86 /**
87 * Remove an item from the queue.
88 *
0374a8cc 89 * @param CRM_Core_DAO|stdClass $item
13812d8e
TO
90 * The item returned by claimItem.
91 */
0374a8cc
TO
92 public function deleteItem($item) {
93 $this->deleteItems([$item]);
94 }
95
96 public function deleteItems($items): void {
97 if (empty($items)) {
98 return;
99 }
100 $sql = CRM_Utils_SQL::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [
101 'ids' => CRM_Utils_Array::collect('id', $items),
102 'name' => $this->getName(),
103 ]);
104 CRM_Core_DAO::executeQuery($sql);
105 $this->freeDAOs($items);
13812d8e
TO
106 }
107
f50d4351
TO
108 /**
109 * Get the full data for an item.
110 *
111 * This is a passive peek - it does not claim/steal/release anything.
112 *
113 * @param int|string $id
114 * The unique ID of the task within the queue.
115 * @return CRM_Queue_DAO_QueueItem|object|null $dao
116 */
117 public function fetchItem($id) {
0374a8cc
TO
118 $items = $this->fetchItems([$id]);
119 return $items[0] ?? NULL;
120 }
121
122 public function fetchItems(array $ids): array {
123 $dao = CRM_Utils_SQL_Select::from('civicrm_queue_item')
124 ->select(['id', 'data', 'run_count'])
125 ->where('id IN (#ids)', ['ids' => $ids])
126 ->where('queue_name = @name', ['name' => $this->getName()])
127 ->execute();
128 $result = [];
129 while ($dao->fetch()) {
130 $result[] = (object) [
131 'id' => $dao->id,
132 'data' => unserialize($dao->data),
133 'run_count' => $dao->run_count,
134 'queue_name' => $this->getName(),
135 ];
f50d4351 136 }
0374a8cc 137 return $result;
f50d4351
TO
138 }
139
13812d8e
TO
140 /**
141 * Return an item that could not be processed.
142 *
0374a8cc 143 * @param CRM_Core_DAO $item
13812d8e
TO
144 * The item returned by claimItem.
145 */
0374a8cc
TO
146 public function releaseItem($item) {
147 $this->releaseItems([$item]);
148 }
149
150 public function releaseItems($items): void {
151 if (empty($items)) {
152 return;
38132ac9 153 }
0374a8cc
TO
154 $sql = empty($this->queueSpec['retry_interval'])
155 ? 'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name'
156 : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name';
157 CRM_Core_DAO::executeQuery(CRM_Utils_SQL::interpolate($sql, [
158 'ids' => CRM_Utils_Array::collect('id', $items),
159 'name' => $this->getName(),
160 'retry' => $this->queueSpec['retry_interval'] ?? NULL,
161 ]));
162 $this->freeDAOs($items);
163 }
164
165 protected function freeDAOs($mixed) {
166 $mixed = (array) $mixed;
167 foreach ($mixed as $item) {
168 if ($item instanceof CRM_Core_DAO) {
169 $item->free();
170 }
38132ac9 171 }
13812d8e
TO
172 }
173
174}