3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue runner is a helper which runs all jobs in a queue.
15 * The queue runner is most useful for one-off queues (such as an upgrade);
16 * if the intention is to develop a dedicated, long-running worker thread,
17 * then one should consider writing a new queue consumer.
19 class CRM_Queue_Runner
{
22 * The failed task should be discarded, and queue processing should continue.
24 const ERROR_CONTINUE
= 1;
27 * The failed task should be kept in the queue, and queue processing should
30 const ERROR_ABORT
= 2;
38 * @var CRM_Queue_Queue
47 * queue-runner id; used for persistence
54 * Whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
59 * @var CRM_Queue_TaskContext
64 * Locate a previously-created instance of the queue-runner.
67 * The queue-runner ID.
69 * @return CRM_Queue_Runner|NULL
71 public static function instance($qrid) {
72 if (!empty($_SESSION['queueRunners'][$qrid])) {
73 return unserialize($_SESSION['queueRunners'][$qrid]);
82 * FIXME: parameter validation
83 * FIXME: document signature of onEnd callback
85 * @param array $runnerSpec
87 * - queue: CRM_Queue_Queue
88 * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT.
89 * - onEnd: mixed, a callback to update the UI after running; should be
90 * both callable and serializable.
91 * - onEndUrl: string, the URL to which one redirects.
92 * - pathPrefix: string, prepended to URLs for the web-runner;
93 * default: 'civicrm/queue'.
95 public function __construct($runnerSpec) {
96 $this->title
= CRM_Utils_Array
::value('title', $runnerSpec, ts('Queue Runner'));
97 $this->queue
= $runnerSpec['queue'];
98 $this->errorMode
= CRM_Utils_Array
::value('errorMode', $runnerSpec, self
::ERROR_ABORT
);
99 $this->isMinimal
= CRM_Utils_Array
::value('isMinimal', $runnerSpec, FALSE);
100 $this->onEnd
= $runnerSpec['onEnd'] ??
NULL;
101 $this->onEndUrl
= $runnerSpec['onEndUrl'] ??
NULL;
102 $this->pathPrefix
= CRM_Utils_Array
::value('pathPrefix', $runnerSpec, 'civicrm/queue');
103 $this->buttons
= CRM_Utils_Array
::value('buttons', $runnerSpec, ['retry' => TRUE, 'skip' => TRUE]);
104 // perhaps this value should be randomized?
105 $this->qrid
= $this->queue
->getName();
111 public function __sleep() {
127 * Redirect to the web-based queue-runner and evaluate all tasks in a queue.
129 public function runAllViaWeb() {
130 $_SESSION['queueRunners'][$this->qrid
] = serialize($this);
131 $url = CRM_Utils_System
::url($this->pathPrefix
. '/runner', 'reset=1&qrid=' . urlencode($this->qrid
));
132 CRM_Utils_System
::redirect($url);
133 // TODO: evaluate items incrementally via AJAX polling, cleanup session
137 * Immediately run all tasks in a queue (until either reaching the end
138 * of the queue or encountering an error)
140 * If the runner has an onEndUrl, then this function will not return
143 * TRUE if all tasks complete normally; otherwise, an array describing the
146 public function runAll() {
147 $taskResult = $this->formatTaskResult(TRUE);
148 while ($taskResult['is_continue']) {
149 // setRaiseException should't be necessary here, but there's a bug
150 // somewhere which causes this setting to be lost. Observed while
151 // upgrading 4.0=>4.2. This preference really shouldn't be a global
152 // setting -- it should be more of a contextual/stack-based setting.
153 // This should be appropriate because queue-runners are not used with
154 // basic web pages -- they're used with CLI/REST/AJAX.
155 $errorScope = CRM_Core_TemporaryErrorScope
::useException();
156 $taskResult = $this->runNext();
160 if ($taskResult['numberOfItems'] == 0) {
161 $result = $this->handleEnd();
162 if (!empty($result['redirect_url'])) {
163 CRM_Utils_System
::redirect($result['redirect_url']);
173 * Take the next item from the queue and attempt to run it.
175 * Individual tasks may also throw exceptions -- caller should watch for
178 * @param bool $useSteal
179 * Whether to steal active locks.
182 * - is_error => bool,
183 * - is_continue => bool,
184 * - numberOfItems => int,
185 * - 'last_task_title' => $,
188 public function runNext($useSteal = FALSE) {
190 $item = $this->queue
->stealItem();
193 $item = $this->queue
->claimItem();
197 $this->lastTaskTitle
= $item->data
->title
;
201 CRM_Core_Error
::debug_log_message("Running task: " . $this->lastTaskTitle
);
202 $isOK = $item->data
->run($this->getTaskContext());
204 $exception = new Exception('Task returned false');
207 catch (Exception
$e) {
213 $this->queue
->deleteItem($item);
216 $this->releaseErrorItem($item);
219 return $this->formatTaskResult($isOK, $exception);
222 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
227 * Take the next item from the queue and attempt to run it.
229 * Individual tasks may also throw exceptions -- caller should watch for
232 * @param bool $useSteal
233 * Whether to steal active locks.
236 * - is_error => bool,
237 * - is_continue => bool,
238 * - numberOfItems => int)
240 public function skipNext($useSteal = FALSE) {
242 $item = $this->queue
->stealItem();
245 $item = $this->queue
->claimItem();
249 $this->lastTaskTitle
= $item->data
->title
;
250 $this->queue
->deleteItem($item);
251 return $this->formatTaskResult(TRUE);
254 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
259 * Release an item in keeping with the error mode.
261 * @param object $item
262 * The item previously produced by Queue::claimItem.
264 protected function releaseErrorItem($item) {
265 switch ($this->errorMode
) {
266 case self
::ERROR_CONTINUE
:
267 $this->queue
->deleteItem($item);
268 case self
::ERROR_ABORT
:
270 $this->queue
->releaseItem($item);
276 * - is_error => bool,
277 * - is_continue => bool,
278 * - numberOfItems => int,
279 * - redirect_url => string
281 public function handleEnd() {
282 if (is_callable($this->onEnd
)) {
283 call_user_func($this->onEnd
, $this->getTaskContext());
285 // Don't remove queueRunner until onEnd succeeds
286 if (!empty($_SESSION['queueRunners'][$this->qrid
])) {
287 unset($_SESSION['queueRunners'][$this->qrid
]);
290 // Fallback; web UI does redirect in Javascript
292 $result['is_error'] = 0;
293 $result['numberOfItems'] = 0;
294 $result['is_continue'] = 0;
295 if (!empty($this->onEndUrl
)) {
296 $result['redirect_url'] = $this->onEndUrl
;
302 * Format a result record which describes whether the task completed.
305 * TRUE if the task completed successfully.
306 * @param Exception|NULL $exception
307 * If applicable, an unhandled exception that arose during execution.
310 * (is_error => bool, is_continue => bool, numberOfItems => int)
312 public function formatTaskResult($isOK, $exception = NULL) {
313 $numberOfItems = $this->queue
->numberOfItems();
316 $result['is_error'] = $isOK ?
0 : 1;
317 $result['exception'] = $exception;
318 $result['last_task_title'] = $this->lastTaskTitle ??
'';
319 $result['numberOfItems'] = $this->queue
->numberOfItems();
320 if ($result['numberOfItems'] <= 0) {
322 $result['is_continue'] = 0;
325 // more tasks remain, and this task succeeded
326 $result['is_continue'] = 1;
328 elseif ($this->errorMode
== CRM_Queue_Runner
::ERROR_CONTINUE
) {
329 // more tasks remain, and we can disregard this error
330 $result['is_continue'] = 1;
333 // more tasks remain, but we can't disregard the error
334 $result['is_continue'] = 0;
341 * @return CRM_Queue_TaskContext
343 protected function getTaskContext() {
344 if (!is_object($this->taskCtx
)) {
345 $this->taskCtx
= new CRM_Queue_TaskContext();
346 $this->taskCtx
->queue
= $this->queue
;
347 // $this->taskCtx->log = CRM_Core_Config::getLog();
348 $this->taskCtx
->log
= CRM_Core_Error
::createDebugLogger();
350 return $this->taskCtx
;