[$this->getName(), 'String'], ]); } /** * Check if the queue exists. * * @return bool */ public function existsQueue() { return ($this->numberOfItems() > 0); } /** * Add a new item to the queue. * * @param mixed $data * Serializable PHP object or array. * @param array $options * Queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority. */ public function createItem($data, $options = []) { $dao = new CRM_Queue_DAO_QueueItem(); $dao->queue_name = $this->getName(); $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); $dao->data = serialize($data); $dao->weight = CRM_Utils_Array::value('weight', $options, 0); $dao->save(); } /** * Determine number of items remaining in the queue. * * @return int */ public function numberOfItems() { return CRM_Core_DAO::singleValueQuery(" SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1 ", [ 1 => [$this->getName(), 'String'], ]); } /** * Get the next item. * * @param int $lease_time * Seconds. * * @return object * With key 'data' that matches the inputted data. */ public function claimItem($lease_time = 3600) { $result = NULL; $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;'); $sql = "SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 AND release_time IS NULL ORDER BY weight ASC, id ASC LIMIT 1 "; $params = [ 1 => [$this->getName(), 'String'], ]; $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if (is_a($dao, 'DB_Error')) { // FIXME - Adding code to allow tests to pass CRM_Core_Error::fatal(); } if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], ]); // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?) // work-around: inconsistent date-formatting causes unintentional breakage # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time); # $dao->save(); $dao->data = unserialize($dao->data); $result = $dao; } $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;'); return $result; } /** * Get the next item, even if there's an active lease * * @param int $lease_time * Seconds. * * @return object * With key 'data' that matches the inputted data. */ public function stealItem($lease_time = 3600) { $sql = " SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC LIMIT 1 "; $params = [ 1 => [$this->getName(), 'String'], ]; $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], ]); $dao->data = unserialize($dao->data); return $dao; } } /** * Remove an item from the queue. * * @param CRM_Core_DAO $dao * The item returned by claimItem. */ public function deleteItem($dao) { $dao->delete(); $dao->free(); } /** * Return an item that could not be processed. * * @param CRM_Core_DAO $dao * The item returned by claimItem. */ public function releaseItem($dao) { $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; $params = [ 1 => [$dao->id, 'Integer'], ]; CRM_Core_DAO::executeQuery($sql, $params); $dao->free(); } }