array($this->getName(), 'String'), )); } /** * Check if the queue exists. * * @return bool */ public function existsQueue() { return ($this->numberOfItems() > 0); } /** * Add a new item to the queue. * * @param mixed $data * Serializable PHP object or array. * @param array $options * Queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority. */ public function createItem($data, $options = array()) { $dao = new CRM_Queue_DAO_QueueItem(); $dao->queue_name = $this->getName(); $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); $dao->data = serialize($data); $dao->weight = CRM_Utils_Array::value('weight', $options, 0); $dao->save(); } /** * Determine number of items remaining in the queue. * * @return int */ public function numberOfItems() { return CRM_Core_DAO::singleValueQuery(" SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1 ", array( 1 => array($this->getName(), 'String'), )); } /** * Get the next item. * * @param int $lease_time * Seconds. * * @return object * With key 'data' that matches the inputted data. */ public function claimItem($lease_time = 3600) { $sql = " SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC LIMIT 1 "; $params = array( 1 => array($this->getName(), 'String'), ); $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if (is_a($dao, 'DB_Error')) { // FIXME - Adding code to allow tests to pass CRM_Core_Error::fatal(); } if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); if ($dao->release_time === NULL || strtotime($dao->release_time) < $nowEpoch) { CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array( '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'), '2' => array($dao->id, 'Integer'), )); // work-around: inconsistent date-formatting causes unintentional breakage # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time); # $dao->save(); $dao->data = unserialize($dao->data); return $dao; } } } /** * Get the next item, even if there's an active lease * * @param int $lease_time * Seconds. * * @return object * With key 'data' that matches the inputted data. */ public function stealItem($lease_time = 3600) { $sql = " SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC LIMIT 1 "; $params = array( 1 => array($this->getName(), 'String'), ); $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array( '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'), '2' => array($dao->id, 'Integer'), )); $dao->data = unserialize($dao->data); return $dao; } } /** * Remove an item from the queue. * * @param CRM_Core_DAO $dao * The item returned by claimItem. */ public function deleteItem($dao) { $dao->delete(); $dao->free(); } /** * Return an item that could not be processed. * * @param CRM_Core_DAO $dao * The item returned by claimItem. */ public function releaseItem($dao) { $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; $params = array( 1 => array($dao->id, 'Integer'), ); CRM_Core_DAO::executeQuery($sql, $params); $dao->free(); } }