queueItemData) */ var $items; /** * @var array( queueItemId => releaseTime), expressed in seconds since epoch */ var $releaseTimes; var $nextQueueItemId = 1; /** * Create a reference to queue. After constructing the queue, one should * usually call createQueue (if it's a new queue) or loadQueue (if it's * known to be an existing queue). * * @param $queueSpec, array with keys: * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk" * - name: string, required, e.g. "upgrade-tasks" * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE * - (additional keys depending on the queue provider) */ function __construct($queueSpec) { parent::__construct($queueSpec); } /** * Perform any registation or resource-allocation for a new queue */ function createQueue() { $this->items = array(); $this->releaseTimes = array(); } /** * Perform any loading or pre-fetch for an existing queue. */ function loadQueue() { // $this->createQueue(); throw new Exception('Unsupported: CRM_Queue_Queue_Memory::loadQueue'); } /** * Release any resources claimed by the queue (memory, DB rows, etc) */ function deleteQueue() { $this->items = NULL; $this->releaseTimes = NULL; } /** * Check if the queue exists * * @return bool */ function existsQueue() { return is_array($this->items); } /** * Add a new item to the queue * * @param $data serializable PHP object or array * @param array|\queue $options queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority * * @return bool, TRUE on success */ function createItem($data, $options = array()) { $id = $this->nextQueueItemId++; // force copy, no unintendedsharing effects from pointers $this->items[$id] = serialize($data); } /** * Determine number of items remaining in the queue * * @return int */ function numberOfItems() { return count($this->items); } /** * Get and remove the next item * * @param int|\seconds $leaseTime seconds * * @return object with key 'data' that matches the inputted data */ function claimItem($leaseTime = 3600) { // foreach hits the items in order -- but we short-circuit after the first foreach ($this->items as $id => $data) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); if (empty($this->releaseTimes[$id]) || $this->releaseTimes[$id] < $nowEpoch) { $this->releaseTimes[$id] = $nowEpoch + $leaseTime; $item = new stdClass(); $item->id = $id; $item->data = unserialize($data); return $item; } else { // item in queue is reserved return FALSE; } } // nothing in queue return FALSE; } /** * Get the next item * * @param int|\seconds $leaseTime seconds * * @return object with key 'data' that matches the inputted data */ function stealItem($leaseTime = 3600) { // foreach hits the items in order -- but we short-circuit after the first foreach ($this->items as $id => $data) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); $this->releaseTimes[$id] = $nowEpoch + $leaseTime; $item = new stdClass(); $item->id = $id; $item->data = unserialize($data); return $item; } // nothing in queue return FALSE; } /** * Remove an item from the queue * * @param $item object The item returned by claimItem */ function deleteItem($item) { unset($this->items[$item->id]); unset($this->releaseTimes[$item->id]); } /** * Return an item that could not be processed * * @param The $item * * @internal param object $dao The item returned by claimItem * * @return bool */ function releaseItem($item) { unset($this->releaseTimes[$item->id]); } }