3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.5 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2014 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
29 * A queue implementation which stores items in the CiviCRM SQL database
31 class CRM_Queue_Queue_Sql
extends CRM_Queue_Queue
{
34 * Create a reference to queue. After constructing the queue, one should
35 * usually call createQueue (if it's a new queue) or loadQueue (if it's
36 * known to be an existing queue).
38 * @param $queueSpec, array with keys:
39 * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
40 * - name: string, required, e.g. "upgrade-tasks"
41 * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
42 * - (additional keys depending on the queue provider)
44 function __construct($queueSpec) {
45 parent
::__construct($queueSpec);
49 * Perform any registation or resource-allocation for a new queue
51 function createQueue() {
52 // nothing to do -- just start CRUDing items in the appropriate table
56 * Perform any loading or pre-fetch for an existing queue.
58 function loadQueue() {
59 // nothing to do -- just start CRUDing items in the appropriate table
63 * Release any resources claimed by the queue (memory, DB rows, etc)
65 function deleteQueue() {
66 return CRM_Core_DAO
::singleValueQuery("
67 DELETE FROM civicrm_queue_item
70 1 => array($this->getName(), 'String'),
75 * Check if the queue exists
79 function existsQueue() {
80 return ($this->numberOfItems() > 0);
84 * Add a new item to the queue
86 * @param $data serializable PHP object or array
87 * @param array|\queue $options queue-dependent options; for example, if this is a
88 * priority-queue, then $options might specify the item's priority
90 * @return bool, TRUE on success
92 function createItem($data, $options = array()) {
93 $dao = new CRM_Queue_DAO_QueueItem();
94 $dao->queue_name
= $this->getName();
95 $dao->submit_time
= CRM_Utils_Time
::getTime('YmdHis');
96 $dao->data
= serialize($data);
97 $dao->weight
= CRM_Utils_Array
::value('weight', $options, 0);
102 * Determine number of items remaining in the queue
106 function numberOfItems() {
107 return CRM_Core_DAO
::singleValueQuery("
109 FROM civicrm_queue_item
110 WHERE queue_name = %1
112 1 => array($this->getName(), 'String'),
119 * @param int|\seconds $lease_time seconds
121 * @return object with key 'data' that matches the inputted data
123 function claimItem($lease_time = 3600) {
125 SELECT id, queue_name, submit_time, release_time, data
126 FROM civicrm_queue_item
127 WHERE queue_name = %1
128 ORDER BY weight ASC, id ASC
132 1 => array($this->getName(), 'String'),
134 $dao = CRM_Core_DAO
::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
135 if (is_a($dao, 'DB_Error')) {
136 // FIXME - Adding code to allow tests to pass
137 CRM_Core_Error
::fatal();
141 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
142 if ($dao->release_time
=== NULL ||
strtotime($dao->release_time
) < $nowEpoch) {
143 CRM_Core_DAO
::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
144 '1' => array(date('YmdHis', $nowEpoch +
$lease_time), 'String'),
145 '2' => array($dao->id
, 'Integer'),
147 // work-around: inconsistent date-formatting causes unintentional breakage
148 # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
149 # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
151 $dao->data
= unserialize($dao->data
);
155 CRM_Core_Error
::debug_var('not ready for release', $dao);
160 CRM_Core_Error
::debug_var('no items found');
166 * Get the next item, even if there's an active lease
168 * @param int|\seconds $lease_time seconds
170 * @return object with key 'data' that matches the inputted data
172 function stealItem($lease_time = 3600) {
174 SELECT id, queue_name, submit_time, release_time, data
175 FROM civicrm_queue_item
176 WHERE queue_name = %1
177 ORDER BY weight ASC, id ASC
181 1 => array($this->getName(), 'String'),
183 $dao = CRM_Core_DAO
::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
185 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
186 CRM_Core_DAO
::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
187 '1' => array(date('YmdHis', $nowEpoch +
$lease_time), 'String'),
188 '2' => array($dao->id
, 'Integer'),
190 $dao->data
= unserialize($dao->data
);
194 CRM_Core_Error
::debug_var('no items found');
200 * Remove an item from the queue
202 * @param $dao object The item returned by claimItem
204 function deleteItem($dao) {
210 * Return an item that could not be processed
212 * @param $dao object The item returned by claimItem
216 function releaseItem($dao) {
217 $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
219 1 => array($dao->id
, 'Integer'),
221 CRM_Core_DAO
::executeQuery($sql, $params);