array($this->getName(), 'String'), )); } /** * Check if the queue exists * * @return bool */ function existsQueue() { return ($this->numberOfItems() > 0); } /** * Add a new item to the queue * * @param $data serializable PHP object or array * @param $options queue-dependent options; for example, if this is a * priority-queue, then $options might specify the item's priority * * @return bool, TRUE on success */ function createItem($data, $options = array()) { $dao = new CRM_Queue_DAO_QueueItem(); $dao->queue_name = $this->getName(); $dao->submit_time = CRM_Utils_Time::getTime('YmdHis'); $dao->data = serialize($data); $dao->weight = CRM_Utils_Array::value('weight', $options, 0); $dao->save(); } /** * Determine number of items remaining in the queue * * @return int */ function numberOfItems() { return CRM_Core_DAO::singleValueQuery(" SELECT count(*) FROM civicrm_queue_item WHERE queue_name = %1 ", array( 1 => array($this->getName(), 'String'), )); } /** * Get the next item * * @param $lease_time seconds * * @return object with key 'data' that matches the inputted data */ function claimItem($lease_time = 3600) { $sql = " SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC LIMIT 1 "; $params = array( 1 => array($this->getName(), 'String'), ); $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if (is_a($dao, 'DB_Error')) { // FIXME - Adding code to allow tests to pass CRM_Core_Error::fatal(); } if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); if ($dao->release_time === NULL || strtotime($dao->release_time) < $nowEpoch) { CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array( '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'), '2' => array($dao->id, 'Integer'), )); // work-around: inconsistent date-formatting causes unintentional breakage # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time); # $dao->save(); $dao->data = unserialize($dao->data); return $dao; } else { CRM_Core_Error::debug_var('not ready for release', $dao); return FALSE; } } else { CRM_Core_Error::debug_var('no items found'); return FALSE; } } /** * Get the next item, even if there's an active lease * * @param $lease_time seconds * * @return object with key 'data' that matches the inputted data */ function stealItem($lease_time = 3600) { $sql = " SELECT id, queue_name, submit_time, release_time, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC LIMIT 1 "; $params = array( 1 => array($this->getName(), 'String'), ); $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array( '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'), '2' => array($dao->id, 'Integer'), )); $dao->data = unserialize($dao->data); return $dao; } else { CRM_Core_Error::debug_var('no items found'); return FALSE; } } /** * Remove an item from the queue * * @param $dao object The item returned by claimItem */ function deleteItem($dao) { $dao->delete(); $dao->free(); } /** * Return an item that could not be processed * * @param $dao object The item returned by claimItem * * @return bool */ function releaseItem($dao) { $sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1"; $params = array( 1 => array($dao->id, 'Integer'), ); CRM_Core_DAO::executeQuery($sql, $params); $dao->free(); } }