3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * A queue implementation which stores items in the CiviCRM SQL database
15 class CRM_Queue_Queue_SqlParallel
extends CRM_Queue_Queue
{
17 use CRM_Queue_Queue_SqlTrait
;
20 * Create a reference to queue. After constructing the queue, one should
21 * usually call createQueue (if it's a new queue) or loadQueue (if it's
22 * known to be an existing queue).
24 * @param array $queueSpec
26 * - type: string, required, e.g. "interactive", "immediate", "stomp",
28 * - name: string, required, e.g. "upgrade-tasks"
29 * - reset: bool, optional; if a queue is found, then it should be
30 * flushed; default to TRUE
31 * - (additional keys depending on the queue provider).
33 public function __construct($queueSpec) {
34 parent
::__construct($queueSpec);
40 * @param int $lease_time
44 * With key 'data' that matches the inputted data.
46 public function claimItem($lease_time = 3600) {
49 $dao = CRM_Core_DAO
::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
50 $sql = "SELECT id, queue_name, submit_time, release_time, data
51 FROM civicrm_queue_item
53 AND (release_time IS NULL OR release_time < %2)
54 ORDER BY weight ASC, id ASC
58 1 => [$this->getName(), 'String'],
59 2 => [CRM_Utils_Time
::getTime(), 'Timestamp'],
61 $dao = CRM_Core_DAO
::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
62 if (is_a($dao, 'DB_Error')) {
63 // FIXME - Adding code to allow tests to pass
64 CRM_Core_Error
::fatal();
68 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
69 CRM_Core_DAO
::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
70 '1' => [date('YmdHis', $nowEpoch +
$lease_time), 'String'],
71 '2' => [$dao->id
, 'Integer'],
73 // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
74 // work-around: inconsistent date-formatting causes unintentional breakage
75 # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
76 # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
78 $dao->data
= unserialize($dao->data
);
82 $dao = CRM_Core_DAO
::executeQuery('UNLOCK TABLES;');
88 * Get the next item, even if there's an active lease
90 * @param int $lease_time
94 * With key 'data' that matches the inputted data.
96 public function stealItem($lease_time = 3600) {
98 SELECT id, queue_name, submit_time, release_time, data
99 FROM civicrm_queue_item
100 WHERE queue_name = %1
101 ORDER BY weight ASC, id ASC
105 1 => [$this->getName(), 'String'],
107 $dao = CRM_Core_DAO
::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
109 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
110 CRM_Core_DAO
::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
111 '1' => [date('YmdHis', $nowEpoch +
$lease_time), 'String'],
112 '2' => [$dao->id
, 'Integer'],
114 $dao->data
= unserialize($dao->data
);