copyright and version fixes
[civicrm-core.git] / CRM / Queue / Queue / Sql.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.5 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2014 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
9 | |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
13 | |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
26 */
27
28 /**
29 * A queue implementation which stores items in the CiviCRM SQL database
30 */
31 class CRM_Queue_Queue_Sql extends CRM_Queue_Queue {
32
33 /**
34 * Create a reference to queue. After constructing the queue, one should
35 * usually call createQueue (if it's a new queue) or loadQueue (if it's
36 * known to be an existing queue).
37 *
38 * @param $queueSpec, array with keys:
39 * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
40 * - name: string, required, e.g. "upgrade-tasks"
41 * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
42 * - (additional keys depending on the queue provider)
43 */
44 function __construct($queueSpec) {
45 parent::__construct($queueSpec);
46 }
47
48 /**
49 * Perform any registation or resource-allocation for a new queue
50 */
51 function createQueue() {
52 // nothing to do -- just start CRUDing items in the appropriate table
53 }
54
55 /**
56 * Perform any loading or pre-fetch for an existing queue.
57 */
58 function loadQueue() {
59 // nothing to do -- just start CRUDing items in the appropriate table
60 }
61
62 /**
63 * Release any resources claimed by the queue (memory, DB rows, etc)
64 */
65 function deleteQueue() {
66 return CRM_Core_DAO::singleValueQuery("
67 DELETE FROM civicrm_queue_item
68 WHERE queue_name = %1
69 ", array(
70 1 => array($this->getName(), 'String'),
71 ));
72 }
73
74 /**
75 * Check if the queue exists
76 *
77 * @return bool
78 */
79 function existsQueue() {
80 return ($this->numberOfItems() > 0);
81 }
82
83 /**
84 * Add a new item to the queue
85 *
86 * @param $data serializable PHP object or array
87 * @param $options queue-dependent options; for example, if this is a
88 * priority-queue, then $options might specify the item's priority
89 *
90 * @return bool, TRUE on success
91 */
92 function createItem($data, $options = array()) {
93 $dao = new CRM_Queue_DAO_QueueItem();
94 $dao->queue_name = $this->getName();
95 $dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
96 $dao->data = serialize($data);
97 $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
98 $dao->save();
99 }
100
101 /**
102 * Determine number of items remaining in the queue
103 *
104 * @return int
105 */
106 function numberOfItems() {
107 return CRM_Core_DAO::singleValueQuery("
108 SELECT count(*)
109 FROM civicrm_queue_item
110 WHERE queue_name = %1
111 ", array(
112 1 => array($this->getName(), 'String'),
113 ));
114 }
115
116 /**
117 * Get the next item
118 *
119 * @param $lease_time seconds
120 *
121 * @return object with key 'data' that matches the inputted data
122 */
123 function claimItem($lease_time = 3600) {
124 $sql = "
125 SELECT id, queue_name, submit_time, release_time, data
126 FROM civicrm_queue_item
127 WHERE queue_name = %1
128 ORDER BY weight ASC, id ASC
129 LIMIT 1
130 ";
131 $params = array(
132 1 => array($this->getName(), 'String'),
133 );
134 $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
135 if (is_a($dao, 'DB_Error')) {
136 // FIXME - Adding code to allow tests to pass
137 CRM_Core_Error::fatal();
138 }
139
140 if ($dao->fetch()) {
141 $nowEpoch = CRM_Utils_Time::getTimeRaw();
142 if ($dao->release_time === NULL || strtotime($dao->release_time) < $nowEpoch) {
143 CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
144 '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
145 '2' => array($dao->id, 'Integer'),
146 ));
147 // work-around: inconsistent date-formatting causes unintentional breakage
148 # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
149 # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
150 # $dao->save();
151 $dao->data = unserialize($dao->data);
152 return $dao;
153 }
154 else {
155 CRM_Core_Error::debug_var('not ready for release', $dao);
156 return FALSE;
157 }
158 }
159 else {
160 CRM_Core_Error::debug_var('no items found');
161 return FALSE;
162 }
163 }
164
165 /**
166 * Get the next item, even if there's an active lease
167 *
168 * @param $lease_time seconds
169 *
170 * @return object with key 'data' that matches the inputted data
171 */
172 function stealItem($lease_time = 3600) {
173 $sql = "
174 SELECT id, queue_name, submit_time, release_time, data
175 FROM civicrm_queue_item
176 WHERE queue_name = %1
177 ORDER BY weight ASC, id ASC
178 LIMIT 1
179 ";
180 $params = array(
181 1 => array($this->getName(), 'String'),
182 );
183 $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
184 if ($dao->fetch()) {
185 $nowEpoch = CRM_Utils_Time::getTimeRaw();
186 CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
187 '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
188 '2' => array($dao->id, 'Integer'),
189 ));
190 $dao->data = unserialize($dao->data);
191 return $dao;
192 }
193 else {
194 CRM_Core_Error::debug_var('no items found');
195 return FALSE;
196 }
197 }
198
199 /**
200 * Remove an item from the queue
201 *
202 * @param $dao object The item returned by claimItem
203 */
204 function deleteItem($dao) {
205 $dao->delete();
206 $dao->free();
207 }
208
209 /**
210 * Return an item that could not be processed
211 *
212 * @param $dao object The item returned by claimItem
213 *
214 * @return bool
215 */
216 function releaseItem($dao) {
217 $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
218 $params = array(
219 1 => array($dao->id, 'Integer'),
220 );
221 CRM_Core_DAO::executeQuery($sql, $params);
222 $dao->free();
223 }
224 }
225