Merge pull request #22483 from eileenmcnaughton/token1
[civicrm-core.git] / CRM / Queue / Queue / SqlParallel.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
5 | |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
10 */
11
12 /**
13 * A queue implementation which stores items in the CiviCRM SQL database
14 */
15 class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
16
17 use CRM_Queue_Queue_SqlTrait;
18
19 /**
20 * Create a reference to queue. After constructing the queue, one should
21 * usually call createQueue (if it's a new queue) or loadQueue (if it's
22 * known to be an existing queue).
23 *
24 * @param array $queueSpec
25 * Array with keys:
26 * - type: string, required, e.g. "interactive", "immediate", "stomp",
27 * "beanstalk"
28 * - name: string, required, e.g. "upgrade-tasks"
29 * - reset: bool, optional; if a queue is found, then it should be
30 * flushed; default to TRUE
31 * - (additional keys depending on the queue provider).
32 */
33 public function __construct($queueSpec) {
34 parent::__construct($queueSpec);
35 }
36
37 /**
38 * Get the next item.
39 *
40 * @param int $lease_time
41 * Seconds.
42 *
43 * @return object
44 * With key 'data' that matches the inputted data.
45 */
46 public function claimItem($lease_time = 3600) {
47
48 $result = NULL;
49 $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
50 $sql = "SELECT id, queue_name, submit_time, release_time, data
51 FROM civicrm_queue_item
52 WHERE queue_name = %1
53 AND (release_time IS NULL OR release_time < %2)
54 ORDER BY weight ASC, id ASC
55 LIMIT 1
56 ";
57 $params = [
58 1 => [$this->getName(), 'String'],
59 2 => [CRM_Utils_Time::getTime(), 'Timestamp'],
60 ];
61 $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
62 if (is_a($dao, 'DB_Error')) {
63 // FIXME - Adding code to allow tests to pass
64 CRM_Core_Error::fatal();
65 }
66
67 if ($dao->fetch()) {
68 $nowEpoch = CRM_Utils_Time::getTimeRaw();
69 CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
70 '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
71 '2' => [$dao->id, 'Integer'],
72 ]);
73 // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
74 // work-around: inconsistent date-formatting causes unintentional breakage
75 # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
76 # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
77 # $dao->save();
78 $dao->data = unserialize($dao->data);
79 $result = $dao;
80 }
81
82 $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');
83
84 return $result;
85 }
86
87 /**
88 * Get the next item, even if there's an active lease
89 *
90 * @param int $lease_time
91 * Seconds.
92 *
93 * @return object
94 * With key 'data' that matches the inputted data.
95 */
96 public function stealItem($lease_time = 3600) {
97 $sql = "
98 SELECT id, queue_name, submit_time, release_time, data
99 FROM civicrm_queue_item
100 WHERE queue_name = %1
101 ORDER BY weight ASC, id ASC
102 LIMIT 1
103 ";
104 $params = [
105 1 => [$this->getName(), 'String'],
106 ];
107 $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
108 if ($dao->fetch()) {
109 $nowEpoch = CRM_Utils_Time::getTimeRaw();
110 CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [
111 '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
112 '2' => [$dao->id, 'Integer'],
113 ]);
114 $dao->data = unserialize($dao->data);
115 return $dao;
116 }
117 }
118
119 }