*
* @return bool TRUE if table now exists
*/
- static function findCreateTable() {
+ public static function findCreateTable() {
$checkTableSql = "show tables like 'civicrm_queue_item'";
$foundName = CRM_Core_DAO::singleValueQuery($checkTableSql);
if ($foundName == 'civicrm_queue_item') {
return ($foundName == 'civicrm_queue_item');
}
}
-
*/
/**
- * To ensure that PHP errors or unhandled exceptions are reported in JSON format,
- * wrap this around your code. For example:
+ * To ensure that PHP errors or unhandled exceptions are reported in JSON
+ * format, wrap this around your code. For example:
*
* @code
* $errorContainer = new CRM_Queue_ErrorPolicy();
* will be necessary to get reuse from the other parts of this class.
*/
class CRM_Queue_ErrorPolicy {
- var $active;
+ public $active;
/**
- * @param null $level
+ * @param null|int $level
+ * PHP error level to capture (e.g. E_PARSE|E_USER_ERROR).
*/
- function __construct($level = NULL) {
+ public function __construct($level = NULL) {
register_shutdown_function(array($this, 'onShutdown'));
if ($level === NULL) {
$level = E_ERROR | E_PARSE | E_CORE_ERROR | E_COMPILE_ERROR | E_USER_ERROR | E_RECOVERABLE_ERROR;
$this->level = $level;
}
- function activate() {
+ /**
+ * Enable the error policy.
+ */
+ public function activate() {
$this->active = TRUE;
$this->backup = array();
foreach (array(
$this->errorScope = CRM_Core_TemporaryErrorScope::useException();
}
- function deactivate() {
+ /**
+ * Disable the error policy.
+ */
+ public function deactivate() {
$this->errorScope = NULL;
restore_error_handler();
foreach (array(
}
/**
- * @param $callable
+ * Execute the callable. Activate and deactivate the error policy
+ * automatically.
+ *
+ * @param callable|array|string $callable
+ * A callback function.
*
* @return mixed
*/
- function call($callable) {
+ public function call($callable) {
$this->activate();
try {
$result = $callable();
*
* @see set_error_handler
*/
- function onError($errno, $errstr, $errfile, $errline) {
+ public function onError($errno, $errstr, $errfile, $errline) {
if (!(error_reporting() & $errno)) {
return TRUE;
}
* @see register_shutdown_function
* @see error_get_last
*/
- function onShutdown() {
+ public function onShutdown() {
if (!$this->active) {
return;
}
/**
* Print a fatal error
*
- * @param $error
+ * @param array $error
+ * The PHP error (with "type", "message", etc).
*/
- function reportError($error) {
+ public function reportError($error) {
$response = array(
'is_error' => 1,
'is_continue' => 0,
/**
* Print an unhandled exception
*
- * @param $e
+ * @param Exception $e
+ * The unhandled exception.
*/
- function reportException(Exception $e) {
+ public function reportException(Exception $e) {
CRM_Core_Error::debug_var('CRM_Queue_ErrorPolicy_reportException', CRM_Core_Error::formatTextException($e));
$response = array(
CRM_Utils_JSON::output($response);
}
}
-
class CRM_Queue_Menu {
/**
- * @param $path
- * @param $menuPath
+ * @param string $path
+ * The path for which we are trying to locate the route.
+ * @param array $menuPath
+ * The route.
*/
- static function alter($path, &$menuPath) {
+ public static function alter($path, &$menuPath) {
switch ($path) {
case 'civicrm/queue/runner':
case 'civicrm/upgrade/queue/runner':
}
}
}
-
/**
* Run the next task and return status information
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int, exception => htmlString)
+ * Outputs JSON: array(
+ * is_error => bool,
+ * is_continue => bool,
+ * numberOfItems => int,
+ * exception => htmlString
+ * )
*/
- static function runNext() {
+ public static function runNext() {
$errorPolicy = new CRM_Queue_ErrorPolicy();
- $errorPolicy->call(
- function () {
- global $activeQueueRunner;
- $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
- $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
- if (!is_object($activeQueueRunner)) {
- throw new Exception('Queue runner must be configured before execution.');
+ $errorPolicy->call(function () {
+ global $activeQueueRunner;
+ $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
+ $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
+ if (!is_object($activeQueueRunner)) {
+ throw new Exception('Queue runner must be configured before execution.');
}
- $result = $activeQueueRunner->runNext(TRUE);
- CRM_Queue_Page_AJAX::_return('runNext', $result);
- }
- );
+ $result = $activeQueueRunner->runNext(TRUE);
+ CRM_Queue_Page_AJAX::_return('runNext', $result);
+ });
}
/**
* Run the next task and return status information
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int, exception => htmlString)
+ * Outputs JSON: array(
+ * is_error => bool,
+ * is_continue => bool,
+ * numberOfItems => int,
+ * exception => htmlString
+ * )
*/
- static function skipNext() {
+ public static function skipNext() {
$errorPolicy = new CRM_Queue_ErrorPolicy();
- $errorPolicy->call(
- function () {
- global $activeQueueRunner;
- $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
- $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
- if (!is_object($activeQueueRunner)) {
- throw new Exception('Queue runner must be configured before execution.');
- }
- $result = $activeQueueRunner->skipNext(TRUE);
- CRM_Queue_Page_AJAX::_return('skipNext', $result);
+ $errorPolicy->call(function () {
+ global $activeQueueRunner;
+ $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
+ $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
+ if (!is_object($activeQueueRunner)) {
+ throw new Exception('Queue runner must be configured before execution.');
}
- );
+ $result = $activeQueueRunner->skipNext(TRUE);
+ CRM_Queue_Page_AJAX::_return('skipNext', $result);
+ });
}
/**
* Run the next task and return status information
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int, exception => htmlString)
+ * Outputs JSON: array(
+ * is_error => bool,
+ * is_continue => bool,
+ * numberOfItems => int,
+ * exception => htmlString
+ * )
*/
- static function onEnd() {
+ public static function onEnd() {
$errorPolicy = new CRM_Queue_ErrorPolicy();
- $errorPolicy->call(
- function () {
- global $activeQueueRunner;
- $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
- $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
- if (!is_object($activeQueueRunner)) {
- throw new Exception('Queue runner must be configured before execution. - onEnd');
+ $errorPolicy->call(function () {
+ global $activeQueueRunner;
+ $qrid = CRM_Utils_Request::retrieve('qrid', 'String', CRM_Core_DAO::$_nullObject, TRUE, NULL, 'POST');
+ $activeQueueRunner = CRM_Queue_Runner::instance($qrid);
+ if (!is_object($activeQueueRunner)) {
+ throw new Exception('Queue runner must be configured before execution. - onEnd');
}
- $result = $activeQueueRunner->handleEnd(FALSE);
- CRM_Queue_Page_AJAX::_return('onEnd', $result);
- }
- );
+ $result = $activeQueueRunner->handleEnd(FALSE);
+ CRM_Queue_Page_AJAX::_return('onEnd', $result);
+ });
}
/**
* Performing any view-layer filtering on result and send to client.
*/
- static function _return($op, $result) {
- if ($result['is_error']) {
+ public static function _return($op, $result) {
+ if ($result['is_error']) {
if (is_object($result['exception'])) {
CRM_Core_Error::debug_var("CRM_Queue_Page_AJAX_{$op}_error", CRM_Core_Error::formatTextException($result['exception']));
$config = CRM_Core_Config::singleton();
if ($config->backtrace || CRM_Core_Config::isUpgradeMode()) {
$result['exception'] = CRM_Core_Error::formatHtmlException($result['exception']);
- }
+ }
else {
$result['exception'] = $result['exception']->getMessage();
}
- } else {
+ }
+ else {
CRM_Core_Error::debug_var("CRM_Queue_Page_AJAX_{$op}_error", $result);
}
}
- CRM_Utils_JSON::output($result);
- }
+ CRM_Utils_JSON::output($result);
}
-
+}
*
* POST Param 'qrid': string, usually the name of the queue
*/
- function run() {
+ public function run() {
$qrid = CRM_Utils_Request::retrieve('qrid', 'String', $this, TRUE);
$runner = CRM_Queue_Runner::instance($qrid);
- // dpm(array( 'action' => 'CRM_Queue_Page_Runner::run()', 'session' => $_SESSION, 'runner' => $runner, 'qrid' => $qrid ));
if (!is_object($runner)) {
CRM_Core_Error::fatal('Queue runner must be configured before execution.');
}
CRM_Utils_System::setTitle($runner->title);
$this->assign('queueRunnerData', array(
- 'qrid' => $runner->qrid,
- 'runNextAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/runNext', NULL, FALSE, NULL, FALSE ),
- 'skipNextAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/skipNext', NULL, FALSE, NULL, FALSE ),
- 'onEndAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/onEnd', NULL, FALSE, NULL, FALSE ),
- 'completed' => 0,
- 'numberOfItems' => $runner->queue->numberOfItems(),
- 'buttons' => $runner->buttons,
- ));
+ 'qrid' => $runner->qrid,
+ 'runNextAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/runNext', NULL, FALSE, NULL, FALSE),
+ 'skipNextAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/skipNext', NULL, FALSE, NULL, FALSE),
+ 'onEndAjax' => CRM_Utils_System::url($runner->pathPrefix . '/ajax/onEnd', NULL, FALSE, NULL, FALSE),
+ 'completed' => 0,
+ 'numberOfItems' => $runner->queue->numberOfItems(),
+ 'buttons' => $runner->buttons,
+ ));
if ($runner->isMinimal) {
// Render page header
* usually call createQueue (if it's a new queue) or loadQueue (if it's
* known to be an existing queue).
*
- * @param $queueSpec, array with keys:
- * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * @param array $queueSpec
+ * Array with keys:
+ * - type: string, required, e.g. "interactive", "immediate", "stomp",
+ * "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
- * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
- * - (additional keys depending on the queue provider)
- */ function __construct($queueSpec) {
+ * - reset: bool, optional; if a queue is found, then it should be
+ * flushed; default to TRUE
+ * - (additional keys depending on the queue provider).
+ */
+ public function __construct($queueSpec) {
$this->_name = $queueSpec['name'];
}
*
* @return string
*/
- function getName() {
+ public function getName() {
return $this->_name;
}
/**
* Perform any registation or resource-allocation for a new queue
*/
- abstract function createQueue();
+ public abstract function createQueue();
/**
* Perform any loading or pre-fetch for an existing queue.
*/
- abstract function loadQueue();
+ public abstract function loadQueue();
/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
- abstract function deleteQueue();
+ public abstract function deleteQueue();
/**
* Check if the queue exists
*
* @return bool
*/
- abstract function existsQueue();
+ public abstract function existsQueue();
/**
* Add a new item to the queue
*
- * @param $data serializable PHP object or array
- * @param array|\queue $options queue-dependent options; for example, if this is a
- * priority-queue, then $options might specify the item's priority
- *
- * @return bool, TRUE on success
+ * @param mixed $data
+ * Serializable PHP object or array.
+ * @param array $options
+ * Queue-dependent options; for example, if this is a
+ * priority-queue, then $options might specify the item's priority.
*/
- abstract function createItem($data, $options = array());
+ public abstract function createItem($data, $options = array());
/**
* Determine number of items remaining in the queue
*
* @return int
*/
- abstract function numberOfItems();
+ public abstract function numberOfItems();
/**
* Get the next item
*
- * @param int|\seconds $lease_time seconds
+ * @param int $lease_time
+ * Seconds.
*
* @return object with key 'data' that matches the inputted data
*/
- abstract function claimItem($lease_time = 3600);
+ public abstract function claimItem($lease_time = 3600);
/**
* Get the next item, even if there's an active lease
*
- * @param int|\seconds $lease_time seconds
+ * @param int $lease_time
+ * Seconds.
*
* @return object with key 'data' that matches the inputted data
*/
- abstract function stealItem($lease_time = 3600);
+ public abstract function stealItem($lease_time = 3600);
/**
* Remove an item from the queue
*
- * @param $item The item returned by claimItem
+ * @param object $item
+ * The item returned by claimItem.
*/
- abstract function deleteItem($item);
+ public abstract function deleteItem($item);
/**
* Return an item that could not be processed
*
- * @param $item The item returned by claimItem
- *
- * @return bool
+ * @param object $item
+ * The item returned by claimItem.
*/
- abstract function releaseItem($item);
+ public abstract function releaseItem($item);
}
-
class CRM_Queue_Queue_Memory extends CRM_Queue_Queue {
/**
- * @var array(queueItemId => queueItemData)
+ * @var array
+ * array(queueItemId => queueItemData)
*/
- var $items;
+ public $items;
/**
- * @var array(
- queueItemId => releaseTime), expressed in seconds since epoch
+ * @var array
+ * array(queueItemId => releaseTime), expressed in seconds since epoch.
*/
- var $releaseTimes;
+ public $releaseTimes;
- var $nextQueueItemId = 1;
+ public $nextQueueItemId = 1;
/**
* Create a reference to queue. After constructing the queue, one should
* usually call createQueue (if it's a new queue) or loadQueue (if it's
* known to be an existing queue).
*
- * @param $queueSpec, array with keys:
- * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * @param array $queueSpec
+ * Array with keys:
+ * - type: string, required, e.g. "interactive", "immediate", "stomp",
+ * "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
- * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
- * - (additional keys depending on the queue provider)
+ * - reset: bool, optional; if a queue is found, then it should be
+ * flushed; default to TRUE
+ * - (additional keys depending on the queue provider).
*/
- function __construct($queueSpec) {
+ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}
/**
* Perform any registation or resource-allocation for a new queue
*/
- function createQueue() {
+ public function createQueue() {
$this->items = array();
$this->releaseTimes = array();
}
/**
* Perform any loading or pre-fetch for an existing queue.
*/
- function loadQueue() {
+ public function loadQueue() {
// $this->createQueue();
throw new Exception('Unsupported: CRM_Queue_Queue_Memory::loadQueue');
}
/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
- function deleteQueue() {
+ public function deleteQueue() {
$this->items = NULL;
$this->releaseTimes = NULL;
}
*
* @return bool
*/
- function existsQueue() {
+ public function existsQueue() {
return is_array($this->items);
}
/**
* Add a new item to the queue
*
- * @param $data serializable PHP object or array
- * @param array|\queue $options queue-dependent options; for example, if this is a
- * priority-queue, then $options might specify the item's priority
- *
- * @return bool, TRUE on success
+ * @param mixed $data
+ * Serializable PHP object or array.
+ * @param array $options
+ * Queue-dependent options; for example, if this is a
+ * priority-queue, then $options might specify the item's priority.
*/
- function createItem($data, $options = array()) {
+ public function createItem($data, $options = array()) {
$id = $this->nextQueueItemId++;
// force copy, no unintendedsharing effects from pointers
$this->items[$id] = serialize($data);
*
* @return int
*/
- function numberOfItems() {
+ public function numberOfItems() {
return count($this->items);
}
/**
* Get and remove the next item
*
- * @param int|\seconds $leaseTime seconds
+ * @param int $leaseTime
+ * Seconds.
*
- * @return object with key 'data' that matches the inputted data
+ * @return object
+ * Includes key 'data' that matches the inputted data.
*/
- function claimItem($leaseTime = 3600) {
+ public function claimItem($leaseTime = 3600) {
// foreach hits the items in order -- but we short-circuit after the first
foreach ($this->items as $id => $data) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
/**
* Get the next item
*
- * @param int|\seconds $leaseTime seconds
+ * @param int $leaseTime
+ * Seconds.
*
- * @return object with key 'data' that matches the inputted data
+ * @return object
+ * With key 'data' that matches the inputted data.
*/
- function stealItem($leaseTime = 3600) {
+ public function stealItem($leaseTime = 3600) {
// foreach hits the items in order -- but we short-circuit after the first
foreach ($this->items as $id => $data) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
/**
* Remove an item from the queue
*
- * @param $item object The item returned by claimItem
+ * @param object $item
+ * The item returned by claimItem.
*/
- function deleteItem($item) {
+ public function deleteItem($item) {
unset($this->items[$item->id]);
unset($this->releaseTimes[$item->id]);
}
/**
* Return an item that could not be processed
*
- * @param CRM_Core_DAO $item The item returned by claimItem
- *
- * @return bool
+ * @param CRM_Core_DAO $item
+ * The item returned by claimItem.
*/
- function releaseItem($item) {
+ public function releaseItem($item) {
unset($this->releaseTimes[$item->id]);
}
}
-
* usually call createQueue (if it's a new queue) or loadQueue (if it's
* known to be an existing queue).
*
- * @param $queueSpec, array with keys:
- * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * @param array $queueSpec
+ * Array with keys:
+ * - type: string, required, e.g. "interactive", "immediate", "stomp",
+ * "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
- * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
- * - (additional keys depending on the queue provider)
+ * - reset: bool, optional; if a queue is found, then it should be
+ * flushed; default to TRUE
+ * - (additional keys depending on the queue provider).
*/
- function __construct($queueSpec) {
+ public function __construct($queueSpec) {
parent::__construct($queueSpec);
}
/**
* Perform any registation or resource-allocation for a new queue
*/
- function createQueue() {
+ public function createQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}
/**
* Perform any loading or pre-fetch for an existing queue.
*/
- function loadQueue() {
+ public function loadQueue() {
// nothing to do -- just start CRUDing items in the appropriate table
}
/**
* Release any resources claimed by the queue (memory, DB rows, etc)
*/
- function deleteQueue() {
+ public function deleteQueue() {
return CRM_Core_DAO::singleValueQuery("
DELETE FROM civicrm_queue_item
WHERE queue_name = %1
", array(
- 1 => array($this->getName(), 'String'),
- ));
+ 1 => array($this->getName(), 'String'),
+ ));
}
/**
*
* @return bool
*/
- function existsQueue() {
+ public function existsQueue() {
return ($this->numberOfItems() > 0);
}
/**
* Add a new item to the queue
*
- * @param $data serializable PHP object or array
- * @param array|\queue $options queue-dependent options; for example, if this is a
- * priority-queue, then $options might specify the item's priority
- *
- * @return bool, TRUE on success
+ * @param mixed $data
+ * Serializable PHP object or array.
+ * @param array $options
+ * Queue-dependent options; for example, if this is a
+ * priority-queue, then $options might specify the item's priority.
*/
- function createItem($data, $options = array()) {
- $dao = new CRM_Queue_DAO_QueueItem();
- $dao->queue_name = $this->getName();
+ public function createItem($data, $options = array()) {
+ $dao = new CRM_Queue_DAO_QueueItem();
+ $dao->queue_name = $this->getName();
$dao->submit_time = CRM_Utils_Time::getTime('YmdHis');
- $dao->data = serialize($data);
- $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
+ $dao->data = serialize($data);
+ $dao->weight = CRM_Utils_Array::value('weight', $options, 0);
$dao->save();
}
*
* @return int
*/
- function numberOfItems() {
+ public function numberOfItems() {
return CRM_Core_DAO::singleValueQuery("
SELECT count(*)
FROM civicrm_queue_item
WHERE queue_name = %1
", array(
- 1 => array($this->getName(), 'String'),
- ));
+ 1 => array($this->getName(), 'String'),
+ ));
}
/**
* Get the next item
*
- * @param int|\seconds $lease_time seconds
+ * @param int $lease_time
+ * Seconds.
*
- * @return object with key 'data' that matches the inputted data
+ * @return object
+ * With key 'data' that matches the inputted data.
*/
- function claimItem($lease_time = 3600) {
+ public function claimItem($lease_time = 3600) {
$sql = "
SELECT id, queue_name, submit_time, release_time, data
FROM civicrm_queue_item
$nowEpoch = CRM_Utils_Time::getTimeRaw();
if ($dao->release_time === NULL || strtotime($dao->release_time) < $nowEpoch) {
CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
- '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
- '2' => array($dao->id, 'Integer'),
- ));
+ '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
+ '2' => array($dao->id, 'Integer'),
+ ));
// work-around: inconsistent date-formatting causes unintentional breakage
# $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
# $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
/**
* Get the next item, even if there's an active lease
*
- * @param int|\seconds $lease_time seconds
+ * @param int $lease_time
+ * Seconds.
*
- * @return object with key 'data' that matches the inputted data
+ * @return object
+ * With key 'data' that matches the inputted data.
*/
- function stealItem($lease_time = 3600) {
+ public function stealItem($lease_time = 3600) {
$sql = "
SELECT id, queue_name, submit_time, release_time, data
FROM civicrm_queue_item
if ($dao->fetch()) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", array(
- '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
- '2' => array($dao->id, 'Integer'),
- ));
+ '1' => array(date('YmdHis', $nowEpoch + $lease_time), 'String'),
+ '2' => array($dao->id, 'Integer'),
+ ));
$dao->data = unserialize($dao->data);
return $dao;
}
/**
* Remove an item from the queue
*
- * @param CRM_Core_DAO $dao The item returned by claimItem
+ * @param CRM_Core_DAO $dao
+ * The item returned by claimItem.
*/
- function deleteItem($dao) {
+ public function deleteItem($dao) {
$dao->delete();
$dao->free();
}
/**
* Return an item that could not be processed
*
- * @param CRM_Core_DAO $dao The item returned by claimItem
- *
- * @return bool
+ * @param CRM_Core_DAO $dao
+ * The item returned by claimItem.
*/
- function releaseItem($dao) {
+ public function releaseItem($dao) {
$sql = "UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1";
$params = array(
1 => array($dao->id, 'Integer'),
$dao->free();
}
}
-
class CRM_Queue_Runner {
/**
- * The failed task should be discarded, and queue processing should continue
+ * The failed task should be discarded, and queue processing should continue.
*/
- CONST ERROR_CONTINUE = 1;
+ const ERROR_CONTINUE = 1;
/**
- * The failed task should be kept in the queue, and queue processing should abort
+ * The failed task should be kept in the queue, and queue processing should
+ * abort.
*/
- CONST ERROR_ABORT = 2;
+ const ERROR_ABORT = 2;
/**
* @var string
*/
- var $title;
+ public $title;
/**
* @var CRM_Queue_Queue
*/
- var $queue;
- var $errorMode;
- var $isMinimal;
- var $onEnd;
- var $onEndUrl;
- var $pathPrefix;
+ public $queue;
+ public $errorMode;
+ public $isMinimal;
+ public $onEnd;
+ public $onEndUrl;
+ public $pathPrefix;
// queue-runner id; used for persistence
- var $qrid;
+ public $qrid;
/**
* @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
*/
- var $buttons;
+ public $buttons;
/**
* @var CRM_Queue_TaskContext
*/
- var $taskCtx;
+ public $taskCtx;
/**
- * Locate a previously-created instance of the queue-runner
+ * Locate a previously-created instance of the queue-runner.
*
- * @param $qrid string, the queue-runner ID
+ * @param string $qrid
+ * The queue-runner ID.
*
- * @return CRM_Queue_Runner or NULL
+ * @return CRM_Queue_Runner|NULL
*/
- static function instance($qrid) {
+ public static function instance($qrid) {
if (!empty($_SESSION['queueRunners'][$qrid])) {
return unserialize($_SESSION['queueRunners'][$qrid]);
}
* FIXME: parameter validation
* FIXME: document signature of onEnd callback
*
- * @param $runnerSpec array with keys:
+ * @param array $runnerSpec
+ * Array with keys:
* - queue: CRM_Queue_Queue
- * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT
- * - onEnd: mixed, a callback to update the UI after running; should be both callable and serializable
- * - onEndUrl: string, the URL to which one redirects
- * - pathPrefix: string, prepended to URLs for the web-runner; default: 'civicrm/queue'
+ * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT.
+ * - onEnd: mixed, a callback to update the UI after running; should be
+ * both callable and serializable.
+ * - onEndUrl: string, the URL to which one redirects.
+ * - pathPrefix: string, prepended to URLs for the web-runner;
+ * default: 'civicrm/queue'.
*/
public function __construct($runnerSpec) {
- $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner'));
- $this->queue = $runnerSpec['queue'];
- $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT);
- $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE);
- $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL);
- $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL);
+ $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner'));
+ $this->queue = $runnerSpec['queue'];
+ $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT);
+ $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE);
+ $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL);
+ $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL);
$this->pathPrefix = CRM_Utils_Array::value('pathPrefix', $runnerSpec, 'civicrm/queue');
- $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE,'skip' => TRUE));
+ $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE, 'skip' => TRUE));
// perhaps this value should be randomized?
$this->qrid = $this->queue->getName();
}
/**
* @return array
*/
- function __sleep() {
+ public function __sleep() {
// exclude taskCtx
- return array('title', 'queue', 'errorMode', 'isMinimal', 'onEnd', 'onEndUrl', 'pathPrefix', 'qrid', 'buttons');
+ return array(
+ 'title',
+ 'queue',
+ 'errorMode',
+ 'isMinimal',
+ 'onEnd',
+ 'onEndUrl',
+ 'pathPrefix',
+ 'qrid',
+ 'buttons',
+ );
}
/**
$_SESSION['queueRunners'][$this->qrid] = serialize($this);
$url = CRM_Utils_System::url($this->pathPrefix . '/runner', 'reset=1&qrid=' . urlencode($this->qrid));
CRM_Utils_System::redirect($url);
- // TODO: evaluate items incrementally via AJAX polling, cleanup session, call
+ // TODO: evaluate items incrementally via AJAX polling, cleanup session
}
/**
*
* If the runner has an onEndUrl, then this function will not return
*
- * @return mixed, TRUE if all tasks complete normally; otherwise, an array describing the failed task
+ * @return mixed
+ * TRUE if all tasks complete normally; otherwise, an array describing the
+ * failed task
*/
public function runAll() {
$taskResult = $this->formatTaskResult(TRUE);
/**
* Take the next item from the queue and attempt to run it.
*
- * Individual tasks may also throw exceptions -- caller should watch for exceptions
+ * Individual tasks may also throw exceptions -- caller should watch for
+ * exceptions.
*
- * @param $useSteal bool, whether to steal active locks
+ * @param bool $useSteal
+ * Whether to steal active locks.
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int, 'last_task_title' => $, 'exception' => $)
+ * @return array
+ * - is_error => bool,
+ * - is_continue => bool,
+ * - numberOfItems => int,
+ * - 'last_task_title' => $,
+ * - 'exception' => $
*/
public function runNext($useSteal = FALSE) {
if ($useSteal) {
$isOK = $item->data->run($this->getTaskContext());
if (!$isOK) {
$exception = new Exception('Task returned false');
- }
- }
- catch(Exception$e) {
+ }
+ } catch (Exception$e) {
$isOK = FALSE;
$exception = $e;
- }
+ }
if ($isOK) {
$this->queue->deleteItem($item);
/**
* Take the next item from the queue and attempt to run it.
*
- * Individual tasks may also throw exceptions -- caller should watch for exceptions
+ * Individual tasks may also throw exceptions -- caller should watch for
+ * exceptions.
*
- * @param $useSteal bool, whether to steal active locks
+ * @param bool $useSteal
+ * Whether to steal active locks.
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
+ * @return array
+ * - is_error => bool,
+ * - is_continue => bool,
+ * - numberOfItems => int)
*/
public function skipNext($useSteal = FALSE) {
if ($useSteal) {
}
/**
- * @param $item
+ * Release an item in keeping with the error mode.
+ *
+ * @param object $item
+ * The item previously produced by Queue::claimItem.
*/
protected function releaseErrorItem($item) {
switch ($this->errorMode) {
}
/**
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int, redirect_url => string)
+ * @return array
+ * - is_error => bool,
+ * - is_continue => bool,
+ * - numberOfItems => int,
+ * - redirect_url => string
*/
public function handleEnd() {
if (is_callable($this->onEnd)) {
}
/**
- * @param $isOK
- * @param null $exception
+ * Format a result record which describes whether the task completed.
*
- * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
+ * @param bool $isOK
+ * TRUE if the task completed successfully.
+ * @param Exception|NULL $exception
+ * If applicable, an unhandled exception that arose during execution.
+ *
+ * @return array
+ * (is_error => bool, is_continue => bool, numberOfItems => int)
*/
- function formatTaskResult($isOK, $exception = NULL) {
+ public function formatTaskResult($isOK, $exception = NULL) {
$numberOfItems = $this->queue->numberOfItems();
$result = array();
return $this->taskCtx;
}
}
-
*/
class CRM_Queue_Service {
- static $_singleton;
+ protected static $_singleton;
/**
* FIXME: Singleton pattern should be removed when dependency-injection
* becomes available.
*
- * @param $forceNew bool
+ * @param bool $forceNew
+ * TRUE if a new instance must be created.
*
* @return \CRM_Queue_Service
*/
- static function &singleton($forceNew = FALSE) {
+ public static function &singleton($forceNew = FALSE) {
if ($forceNew || !self::$_singleton) {
self::$_singleton = new CRM_Queue_Service();
}
}
/**
- * @var array(queueName => CRM_Queue_Queue)
+ * @var array (string $queueName => CRM_Queue_Queue)
*/
- var $queues;
+ public $queues;
/**
*
*/
- function __construct() {
+ public function __construct() {
$this->queues = array();
}
/**
- * @param $queueSpec, array with keys:
- * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * Create a queue. If one already exists, then it will be reused.
+ *
+ * @param array $queueSpec
+ * Array with keys:
+ * - type: string, required, e.g. "interactive", "immediate", "stomp",
+ * "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
- * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE
- * - (additional keys depending on the queue provider)
+ * - reset: bool, optional; if a queue is found, then it should be
+ * flushed; default to TRUE
+ * - (additional keys depending on the queue provider).
*
* @return CRM_Queue_Queue
*/
- function create($queueSpec) {
+ public function create($queueSpec) {
if (@is_object($this->queues[$queueSpec['name']]) && empty($queueSpec['reset'])) {
return $this->queues[$queueSpec['name']];
}
}
/**
- * @param $queueSpec, array with keys:
- * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * Look up an existing queue.
+ *
+ * @param array $queueSpec
+ * Array with keys:
+ * - type: string, required, e.g. "interactive", "immediate", "stomp",
+ * "beanstalk"
* - name: string, required, e.g. "upgrade-tasks"
- * - (additional keys depending on the queue provider)
+ * - (additional keys depending on the queue provider).
*
* @return CRM_Queue_Queue
*/
- function load($queueSpec) {
+ public function load($queueSpec) {
if (is_object($this->queues[$queueSpec['name']])) {
return $this->queues[$queueSpec['name']];
}
/**
* Convert a queue "type" name to a class name
*
- * @param $type string, e.g. "interactive", "immediate", "stomp", "beanstalk"
+ * @param string $type
+ * E.g. "interactive", "immediate", "stomp", "beanstalk".
*
- * @return string, class-name
+ * @return string
+ * Class-name
*/
protected function getQueueClass($type) {
$type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
}
/**
- * @param $queueSpec array, see create()
+ * @param array $queueSpec
+ * See create().
*
* @return CRM_Queue_Queue
*/
return $class->newInstance($queueSpec);
}
}
-
class CRM_Queue_Task {
/** Task was performed successfully */
- CONST TASK_SUCCESS = 1;
+ const TASK_SUCCESS = 1;
/** Task failed and should not be retried */
- CONST TASK_FAIL = 2;
+ const TASK_FAIL = 2;
/**
* @var mixed, serializable
*/
- var $callback;
+ public $callback;
/**
* @var array, serializable
*/
- var $arguments;
+ public $arguments;
/**
* @var string, NULL-able
*/
- var $title;
+ public $title;
/**
- * @param $callback mixed, serializable, a callable PHP item; must accept at least one argument (CRM_Queue_TaskContext)
- * @param $arguments array, serializable, extra arguments to pass to the callback (in order)
- * @param $title string, a printable string which describes this task
+ * @param mixed $callback
+ * Serializable, a callable PHP item; must accept at least one argument
+ * (CRM_Queue_TaskContext).
+ * @param array $arguments
+ * Serializable, extra arguments to pass to the callback (in order).
+ * @param string $title
+ * A printable string which describes this task.
*/
- function __construct($callback, $arguments, $title = NULL) {
- $this->callback = $callback;
+ public function __construct($callback, $arguments, $title = NULL) {
+ $this->callback = $callback;
$this->arguments = $arguments;
- $this->title = $title;
+ $this->title = $title;
}
/**
* Perform the task
*
- * @param $taskCtx array with keys:
- * - log: object 'Log'
+ * @param array $taskCtx
+ * Array with keys:
+ * - log: object 'Log'
*
* @throws Exception
* @return bool, TRUE if task completes successfully
*/
- function run($taskCtx) {
+ public function run($taskCtx) {
$args = $this->arguments;
array_unshift($args, $taskCtx);
}
}
}
-
/**
* @var CRM_Queue_Queue
*/
- var $queue;
+ public $queue;
/**
* @var Log
*/
- var $log;
+ public $log;
}
-