Commit | Line | Data |
---|---|---|
13812d8e TO |
1 | <?php |
2 | ||
3 | /* | |
4 | +--------------------------------------------------------------------+ | |
5 | | Copyright CiviCRM LLC. All rights reserved. | | |
6 | | | | |
7 | | This work is published under the GNU AGPLv3 license with some | | |
8 | | permitted exceptions and without any warranty. For full license | | |
9 | | and copyright information, see https://civicrm.org/licensing | | |
10 | +--------------------------------------------------------------------+ | |
11 | */ | |
12 | ||
13 | /** | |
14 | * Trait defines methods that are commonly used to implement a SQL-backed queue. | |
15 | */ | |
16 | trait CRM_Queue_Queue_SqlTrait { | |
17 | ||
18 | /** | |
19 | * Perform any registation or resource-allocation for a new queue | |
20 | */ | |
21 | public function createQueue() { | |
22 | // nothing to do -- just start CRUDing items in the appropriate table | |
23 | } | |
24 | ||
25 | /** | |
26 | * Perform any loading or pre-fetch for an existing queue. | |
27 | */ | |
28 | public function loadQueue() { | |
29 | // nothing to do -- just start CRUDing items in the appropriate table | |
30 | } | |
31 | ||
32 | /** | |
33 | * Release any resources claimed by the queue (memory, DB rows, etc) | |
34 | */ | |
35 | public function deleteQueue() { | |
36 | return CRM_Core_DAO::singleValueQuery(" | |
37 | DELETE FROM civicrm_queue_item | |
38 | WHERE queue_name = %1 | |
39 | ", [ | |
40 | 1 => [$this->getName(), 'String'], | |
41 | ]); | |
42 | } | |
43 | ||
44 | /** | |
45 | * Check if the queue exists. | |
46 | * | |
47 | * @return bool | |
48 | */ | |
49 | public function existsQueue() { | |
50 | return ($this->numberOfItems() > 0); | |
51 | } | |
52 | ||
53 | /** | |
54 | * Determine number of items remaining in the queue. | |
55 | * | |
56 | * @return int | |
57 | */ | |
58 | public function numberOfItems() { | |
59 | return CRM_Core_DAO::singleValueQuery(" | |
60 | SELECT count(*) | |
61 | FROM civicrm_queue_item | |
62 | WHERE queue_name = %1 | |
63 | ", [ | |
64 | 1 => [$this->getName(), 'String'], | |
65 | ]); | |
66 | } | |
67 | ||
0c610bd9 TO |
68 | /** |
69 | * Add a new item to the queue. | |
70 | * | |
71 | * @param mixed $data | |
72 | * Serializable PHP object or array. | |
73 | * @param array $options | |
74 | * Queue-dependent options; for example, if this is a | |
75 | * priority-queue, then $options might specify the item's priority. | |
76 | */ | |
77 | public function createItem($data, $options = []) { | |
78 | $dao = new CRM_Queue_DAO_QueueItem(); | |
79 | $dao->queue_name = $this->getName(); | |
80 | $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); | |
81 | $dao->data = serialize($data); | |
82 | $dao->weight = CRM_Utils_Array::value('weight', $options, 0); | |
83 | $dao->save(); | |
84 | } | |
85 | ||
13812d8e TO |
86 | /** |
87 | * Remove an item from the queue. | |
88 | * | |
0374a8cc | 89 | * @param CRM_Core_DAO|stdClass $item |
13812d8e TO |
90 | * The item returned by claimItem. |
91 | */ | |
0374a8cc TO |
92 | public function deleteItem($item) { |
93 | $this->deleteItems([$item]); | |
94 | } | |
95 | ||
96 | public function deleteItems($items): void { | |
97 | if (empty($items)) { | |
98 | return; | |
99 | } | |
100 | $sql = CRM_Utils_SQL::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [ | |
101 | 'ids' => CRM_Utils_Array::collect('id', $items), | |
102 | 'name' => $this->getName(), | |
103 | ]); | |
104 | CRM_Core_DAO::executeQuery($sql); | |
105 | $this->freeDAOs($items); | |
13812d8e TO |
106 | } |
107 | ||
f50d4351 TO |
108 | /** |
109 | * Get the full data for an item. | |
110 | * | |
111 | * This is a passive peek - it does not claim/steal/release anything. | |
112 | * | |
113 | * @param int|string $id | |
114 | * The unique ID of the task within the queue. | |
115 | * @return CRM_Queue_DAO_QueueItem|object|null $dao | |
116 | */ | |
117 | public function fetchItem($id) { | |
0374a8cc TO |
118 | $items = $this->fetchItems([$id]); |
119 | return $items[0] ?? NULL; | |
120 | } | |
121 | ||
122 | public function fetchItems(array $ids): array { | |
123 | $dao = CRM_Utils_SQL_Select::from('civicrm_queue_item') | |
124 | ->select(['id', 'data', 'run_count']) | |
125 | ->where('id IN (#ids)', ['ids' => $ids]) | |
126 | ->where('queue_name = @name', ['name' => $this->getName()]) | |
127 | ->execute(); | |
128 | $result = []; | |
129 | while ($dao->fetch()) { | |
130 | $result[] = (object) [ | |
131 | 'id' => $dao->id, | |
132 | 'data' => unserialize($dao->data), | |
133 | 'run_count' => $dao->run_count, | |
134 | 'queue_name' => $this->getName(), | |
135 | ]; | |
f50d4351 | 136 | } |
0374a8cc | 137 | return $result; |
f50d4351 TO |
138 | } |
139 | ||
13812d8e TO |
140 | /** |
141 | * Return an item that could not be processed. | |
142 | * | |
0374a8cc | 143 | * @param CRM_Core_DAO $item |
13812d8e TO |
144 | * The item returned by claimItem. |
145 | */ | |
0374a8cc TO |
146 | public function releaseItem($item) { |
147 | $this->releaseItems([$item]); | |
148 | } | |
149 | ||
150 | public function releaseItems($items): void { | |
151 | if (empty($items)) { | |
152 | return; | |
38132ac9 | 153 | } |
0374a8cc TO |
154 | $sql = empty($this->queueSpec['retry_interval']) |
155 | ? 'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name' | |
156 | : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name'; | |
157 | CRM_Core_DAO::executeQuery(CRM_Utils_SQL::interpolate($sql, [ | |
158 | 'ids' => CRM_Utils_Array::collect('id', $items), | |
159 | 'name' => $this->getName(), | |
160 | 'retry' => $this->queueSpec['retry_interval'] ?? NULL, | |
161 | ])); | |
162 | $this->freeDAOs($items); | |
163 | } | |
164 | ||
165 | protected function freeDAOs($mixed) { | |
166 | $mixed = (array) $mixed; | |
167 | foreach ($mixed as $item) { | |
168 | if ($item instanceof CRM_Core_DAO) { | |
169 | $item->free(); | |
170 | } | |
38132ac9 | 171 | } |
13812d8e TO |
172 | } |
173 | ||
174 | } |