3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.7 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2016 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
29 * The queue runner is a helper which runs all jobs in a queue.
31 * The queue runner is most useful for one-off queues (such as an upgrade);
32 * if the intention is to develop a dedicated, long-running worker thread,
33 * then one should consider writing a new queue consumer.
35 class CRM_Queue_Runner
{
38 * The failed task should be discarded, and queue processing should continue.
40 const ERROR_CONTINUE
= 1;
43 * The failed task should be kept in the queue, and queue processing should
46 const ERROR_ABORT
= 2;
54 * @var CRM_Queue_Queue
62 // queue-runner id; used for persistence
66 * @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
71 * @var CRM_Queue_TaskContext
76 * Locate a previously-created instance of the queue-runner.
79 * The queue-runner ID.
81 * @return CRM_Queue_Runner|NULL
83 public static function instance($qrid) {
84 if (!empty($_SESSION['queueRunners'][$qrid])) {
85 return unserialize($_SESSION['queueRunners'][$qrid]);
94 * FIXME: parameter validation
95 * FIXME: document signature of onEnd callback
97 * @param array $runnerSpec
99 * - queue: CRM_Queue_Queue
100 * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT.
101 * - onEnd: mixed, a callback to update the UI after running; should be
102 * both callable and serializable.
103 * - onEndUrl: string, the URL to which one redirects.
104 * - pathPrefix: string, prepended to URLs for the web-runner;
105 * default: 'civicrm/queue'.
107 public function __construct($runnerSpec) {
108 $this->title
= CRM_Utils_Array
::value('title', $runnerSpec, ts('Queue Runner'));
109 $this->queue
= $runnerSpec['queue'];
110 $this->errorMode
= CRM_Utils_Array
::value('errorMode', $runnerSpec, self
::ERROR_ABORT
);
111 $this->isMinimal
= CRM_Utils_Array
::value('isMinimal', $runnerSpec, FALSE);
112 $this->onEnd
= CRM_Utils_Array
::value('onEnd', $runnerSpec, NULL);
113 $this->onEndUrl
= CRM_Utils_Array
::value('onEndUrl', $runnerSpec, NULL);
114 $this->pathPrefix
= CRM_Utils_Array
::value('pathPrefix', $runnerSpec, 'civicrm/queue');
115 $this->buttons
= CRM_Utils_Array
::value('buttons', $runnerSpec, array('retry' => TRUE, 'skip' => TRUE));
116 // perhaps this value should be randomized?
117 $this->qrid
= $this->queue
->getName();
123 public function __sleep() {
139 * Redirect to the web-based queue-runner and evaluate all tasks in a queue.
141 public function runAllViaWeb() {
142 $_SESSION['queueRunners'][$this->qrid
] = serialize($this);
143 $url = CRM_Utils_System
::url($this->pathPrefix
. '/runner', 'reset=1&qrid=' . urlencode($this->qrid
));
144 CRM_Utils_System
::redirect($url);
145 // TODO: evaluate items incrementally via AJAX polling, cleanup session
149 * Immediately run all tasks in a queue (until either reaching the end
150 * of the queue or encountering an error)
152 * If the runner has an onEndUrl, then this function will not return
155 * TRUE if all tasks complete normally; otherwise, an array describing the
158 public function runAll() {
159 $taskResult = $this->formatTaskResult(TRUE);
160 while ($taskResult['is_continue']) {
161 // setRaiseException should't be necessary here, but there's a bug
162 // somewhere which causes this setting to be lost. Observed while
163 // upgrading 4.0=>4.2. This preference really shouldn't be a global
164 // setting -- it should be more of a contextual/stack-based setting.
165 // This should be appropriate because queue-runners are not used with
166 // basic web pages -- they're used with CLI/REST/AJAX.
167 $errorScope = CRM_Core_TemporaryErrorScope
::useException();
168 $taskResult = $this->runNext();
172 if ($taskResult['numberOfItems'] == 0) {
173 $result = $this->handleEnd();
174 if (!empty($result['redirect_url'])) {
175 CRM_Utils_System
::redirect($result['redirect_url']);
185 * Take the next item from the queue and attempt to run it.
187 * Individual tasks may also throw exceptions -- caller should watch for
190 * @param bool $useSteal
191 * Whether to steal active locks.
194 * - is_error => bool,
195 * - is_continue => bool,
196 * - numberOfItems => int,
197 * - 'last_task_title' => $,
200 public function runNext($useSteal = FALSE) {
202 $item = $this->queue
->stealItem();
205 $item = $this->queue
->claimItem();
209 $this->lastTaskTitle
= $item->data
->title
;
213 CRM_Core_Error
::debug_log_message("Running task: " . $this->lastTaskTitle
);
214 $isOK = $item->data
->run($this->getTaskContext());
216 $exception = new Exception('Task returned false');
219 catch (Exception
$e) {
225 $this->queue
->deleteItem($item);
228 $this->releaseErrorItem($item);
231 return $this->formatTaskResult($isOK, $exception);
234 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
239 * Take the next item from the queue and attempt to run it.
241 * Individual tasks may also throw exceptions -- caller should watch for
244 * @param bool $useSteal
245 * Whether to steal active locks.
248 * - is_error => bool,
249 * - is_continue => bool,
250 * - numberOfItems => int)
252 public function skipNext($useSteal = FALSE) {
254 $item = $this->queue
->stealItem();
257 $item = $this->queue
->claimItem();
261 $this->lastTaskTitle
= $item->data
->title
;
262 $this->queue
->deleteItem($item);
263 return $this->formatTaskResult(TRUE);
266 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
271 * Release an item in keeping with the error mode.
273 * @param object $item
274 * The item previously produced by Queue::claimItem.
276 protected function releaseErrorItem($item) {
277 switch ($this->errorMode
) {
278 case self
::ERROR_CONTINUE
:
279 $this->queue
->deleteItem($item);
280 case self
::ERROR_ABORT
:
282 $this->queue
->releaseItem($item);
288 * - is_error => bool,
289 * - is_continue => bool,
290 * - numberOfItems => int,
291 * - redirect_url => string
293 public function handleEnd() {
294 if (is_callable($this->onEnd
)) {
295 call_user_func($this->onEnd
, $this->getTaskContext());
297 // Don't remove queueRunner until onEnd succeeds
298 if (!empty($_SESSION['queueRunners'][$this->qrid
])) {
299 unset($_SESSION['queueRunners'][$this->qrid
]);
302 // Fallback; web UI does redirect in Javascript
304 $result['is_error'] = 0;
305 $result['numberOfItems'] = 0;
306 $result['is_continue'] = 0;
307 if (!empty($this->onEndUrl
)) {
308 $result['redirect_url'] = $this->onEndUrl
;
314 * Format a result record which describes whether the task completed.
317 * TRUE if the task completed successfully.
318 * @param Exception|NULL $exception
319 * If applicable, an unhandled exception that arose during execution.
322 * (is_error => bool, is_continue => bool, numberOfItems => int)
324 public function formatTaskResult($isOK, $exception = NULL) {
325 $numberOfItems = $this->queue
->numberOfItems();
328 $result['is_error'] = $isOK ?
0 : 1;
329 $result['exception'] = $exception;
330 $result['last_task_title'] = isset($this->lastTaskTitle
) ?
$this->lastTaskTitle
: '';
331 $result['numberOfItems'] = $this->queue
->numberOfItems();
332 if ($result['numberOfItems'] <= 0) {
334 $result['is_continue'] = 0;
337 // more tasks remain, and this task succeeded
338 $result['is_continue'] = 1;
340 elseif ($this->errorMode
== CRM_Queue_Runner
::ERROR_CONTINUE
) {
341 // more tasks remain, and we can disregard this error
342 $result['is_continue'] = 1;
345 // more tasks remain, but we can't disregard the error
346 $result['is_continue'] = 0;
353 * @return CRM_Queue_TaskContext
355 protected function getTaskContext() {
356 if (!is_object($this->taskCtx
)) {
357 $this->taskCtx
= new CRM_Queue_TaskContext();
358 $this->taskCtx
->queue
= $this->queue
;
359 // $this->taskCtx->log = CRM_Core_Config::getLog();
360 $this->taskCtx
->log
= CRM_Core_Error
::createDebugLogger();
362 return $this->taskCtx
;