3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * A queue implementation which stores items in the CiviCRM SQL database
15 class CRM_Queue_Queue_Memory
extends CRM_Queue_Queue
{
19 * array(queueItemId => queueItemData)
25 * array(queueItemId => releaseTime), expressed in seconds since epoch.
29 public $nextQueueItemId = 1;
32 * Create a reference to queue. After constructing the queue, one should
33 * usually call createQueue (if it's a new queue) or loadQueue (if it's
34 * known to be an existing queue).
36 * @param array $queueSpec
38 * - type: string, required, e.g. "interactive", "immediate", "stomp",
40 * - name: string, required, e.g. "upgrade-tasks"
41 * - reset: bool, optional; if a queue is found, then it should be
42 * flushed; default to TRUE
43 * - (additional keys depending on the queue provider).
45 public function __construct($queueSpec) {
46 parent
::__construct($queueSpec);
50 * Perform any registation or resource-allocation for a new queue
52 public function createQueue() {
54 $this->releaseTimes
= [];
58 * Perform any loading or pre-fetch for an existing queue.
60 public function loadQueue() {
61 // $this->createQueue();
62 throw new Exception('Unsupported: CRM_Queue_Queue_Memory::loadQueue');
66 * Release any resources claimed by the queue (memory, DB rows, etc)
68 public function deleteQueue() {
70 $this->releaseTimes
= NULL;
74 * Check if the queue exists.
78 public function existsQueue() {
79 return is_array($this->items
);
83 * Add a new item to the queue.
86 * Serializable PHP object or array.
87 * @param array $options
88 * Queue-dependent options; for example, if this is a
89 * priority-queue, then $options might specify the item's priority.
91 public function createItem($data, $options = []) {
92 $id = $this->nextQueueItemId++
;
93 // force copy, no unintendedsharing effects from pointers
94 $this->items
[$id] = serialize($data);
98 * Determine number of items remaining in the queue.
102 public function numberOfItems() {
103 return count($this->items
);
107 * Get and remove the next item.
109 * @param int $leaseTime
113 * Includes key 'data' that matches the inputted data.
115 public function claimItem($leaseTime = 3600) {
116 // foreach hits the items in order -- but we short-circuit after the first
117 foreach ($this->items
as $id => $data) {
118 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
119 if (empty($this->releaseTimes
[$id]) ||
$this->releaseTimes
[$id] < $nowEpoch) {
120 $this->releaseTimes
[$id] = $nowEpoch +
$leaseTime;
122 $item = new stdClass();
124 $item->data
= unserialize($data);
128 // item in queue is reserved
139 * @param int $leaseTime
143 * With key 'data' that matches the inputted data.
145 public function stealItem($leaseTime = 3600) {
146 // foreach hits the items in order -- but we short-circuit after the first
147 foreach ($this->items
as $id => $data) {
148 $nowEpoch = CRM_Utils_Time
::getTimeRaw();
149 $this->releaseTimes
[$id] = $nowEpoch +
$leaseTime;
151 $item = new stdClass();
153 $item->data
= unserialize($data);
161 * Remove an item from the queue.
163 * @param object $item
164 * The item returned by claimItem.
166 public function deleteItem($item) {
167 unset($this->items
[$item->id
]);
168 unset($this->releaseTimes
[$item->id
]);
172 * Return an item that could not be processed.
174 * @param CRM_Core_DAO $item
175 * The item returned by claimItem.
177 public function releaseItem($item) {
178 unset($this->releaseTimes
[$item->id
]);