Merge pull request #4696 from colemanw/CRM-15669
[civicrm-core.git] / CRM / Queue / Runner.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.6 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2014 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
9 | |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
13 | |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
26 */
27
28 /**
29 * The queue runner is a helper which runs all jobs in a queue.
30 *
31 * The queue runner is most useful for one-off queues (such as an upgrade);
32 * if the intention is to develop a dedicated, long-running worker thread,
33 * then one should consider writing a new queue consumer.
34 */
35 class CRM_Queue_Runner {
36
37 /**
38 * The failed task should be discarded, and queue processing should continue.
39 */
40 const ERROR_CONTINUE = 1;
41
42 /**
43 * The failed task should be kept in the queue, and queue processing should
44 * abort.
45 */
46 const ERROR_ABORT = 2;
47
48 /**
49 * @var string
50 */
51 public $title;
52
53 /**
54 * @var CRM_Queue_Queue
55 */
56 public $queue;
57 public $errorMode;
58 public $isMinimal;
59 public $onEnd;
60 public $onEndUrl;
61 public $pathPrefix;
62 // queue-runner id; used for persistence
63 public $qrid;
64
65 /**
66 * @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
67 */
68 public $buttons;
69
70 /**
71 * @var CRM_Queue_TaskContext
72 */
73 public $taskCtx;
74
75 /**
76 * Locate a previously-created instance of the queue-runner.
77 *
78 * @param string $qrid
79 * The queue-runner ID.
80 *
81 * @return CRM_Queue_Runner|NULL
82 */
83 public static function instance($qrid) {
84 if (!empty($_SESSION['queueRunners'][$qrid])) {
85 return unserialize($_SESSION['queueRunners'][$qrid]);
86 }
87 else {
88 return NULL;
89 }
90 }
91
92 /**
93 *
94 * FIXME: parameter validation
95 * FIXME: document signature of onEnd callback
96 *
97 * @param array $runnerSpec
98 * Array with keys:
99 * - queue: CRM_Queue_Queue
100 * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT.
101 * - onEnd: mixed, a callback to update the UI after running; should be
102 * both callable and serializable.
103 * - onEndUrl: string, the URL to which one redirects.
104 * - pathPrefix: string, prepended to URLs for the web-runner;
105 * default: 'civicrm/queue'.
106 */
107 public function __construct($runnerSpec) {
108 $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner'));
109 $this->queue = $runnerSpec['queue'];
110 $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT);
111 $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE);
112 $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL);
113 $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL);
114 $this->pathPrefix = CRM_Utils_Array::value('pathPrefix', $runnerSpec, 'civicrm/queue');
115 $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE, 'skip' => TRUE));
116 // perhaps this value should be randomized?
117 $this->qrid = $this->queue->getName();
118 }
119
120 /**
121 * @return array
122 */
123 public function __sleep() {
124 // exclude taskCtx
125 return array(
126 'title',
127 'queue',
128 'errorMode',
129 'isMinimal',
130 'onEnd',
131 'onEndUrl',
132 'pathPrefix',
133 'qrid',
134 'buttons',
135 );
136 }
137
138 /**
139 * Redirect to the web-based queue-runner and evaluate all tasks in a queue.
140 */
141 public function runAllViaWeb() {
142 $_SESSION['queueRunners'][$this->qrid] = serialize($this);
143 $url = CRM_Utils_System::url($this->pathPrefix . '/runner', 'reset=1&qrid=' . urlencode($this->qrid));
144 CRM_Utils_System::redirect($url);
145 // TODO: evaluate items incrementally via AJAX polling, cleanup session
146 }
147
148 /**
149 * Immediately run all tasks in a queue (until either reaching the end
150 * of the queue or encountering an error)
151 *
152 * If the runner has an onEndUrl, then this function will not return
153 *
154 * @return mixed
155 * TRUE if all tasks complete normally; otherwise, an array describing the
156 * failed task
157 */
158 public function runAll() {
159 $taskResult = $this->formatTaskResult(TRUE);
160 while ($taskResult['is_continue']) {
161 // setRaiseException should't be necessary here, but there's a bug
162 // somewhere which causes this setting to be lost. Observed while
163 // upgrading 4.0=>4.2. This preference really shouldn't be a global
164 // setting -- it should be more of a contextual/stack-based setting.
165 // This should be appropriate because queue-runners are not used with
166 // basic web pages -- they're used with CLI/REST/AJAX.
167 $errorScope = CRM_Core_TemporaryErrorScope::useException();
168 $taskResult = $this->runNext();
169 $errorScope = NULL;
170 }
171
172 if ($taskResult['numberOfItems'] == 0) {
173 $result = $this->handleEnd();
174 if (!empty($result['redirect_url'])) {
175 CRM_Utils_System::redirect($result['redirect_url']);
176 }
177 return TRUE;
178 }
179 else {
180 return $taskResult;
181 }
182 }
183
184 /**
185 * Take the next item from the queue and attempt to run it.
186 *
187 * Individual tasks may also throw exceptions -- caller should watch for
188 * exceptions.
189 *
190 * @param bool $useSteal
191 * Whether to steal active locks.
192 *
193 * @return array
194 * - is_error => bool,
195 * - is_continue => bool,
196 * - numberOfItems => int,
197 * - 'last_task_title' => $,
198 * - 'exception' => $
199 */
200 public function runNext($useSteal = FALSE) {
201 if ($useSteal) {
202 $item = $this->queue->stealItem();
203 }
204 else {
205 $item = $this->queue->claimItem();
206 }
207
208 if ($item) {
209 $this->lastTaskTitle = $item->data->title;
210
211 $exception = NULL;
212 try {
213 $isOK = $item->data->run($this->getTaskContext());
214 if (!$isOK) {
215 $exception = new Exception('Task returned false');
216 }
217 } catch (Exception$e) {
218 $isOK = FALSE;
219 $exception = $e;
220 }
221
222 if ($isOK) {
223 $this->queue->deleteItem($item);
224 }
225 else {
226 $this->releaseErrorItem($item);
227 }
228
229 return $this->formatTaskResult($isOK, $exception);
230 }
231 else {
232 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
233 }
234 }
235
236 /**
237 * Take the next item from the queue and attempt to run it.
238 *
239 * Individual tasks may also throw exceptions -- caller should watch for
240 * exceptions.
241 *
242 * @param bool $useSteal
243 * Whether to steal active locks.
244 *
245 * @return array
246 * - is_error => bool,
247 * - is_continue => bool,
248 * - numberOfItems => int)
249 */
250 public function skipNext($useSteal = FALSE) {
251 if ($useSteal) {
252 $item = $this->queue->stealItem();
253 }
254 else {
255 $item = $this->queue->claimItem();
256 }
257
258 if ($item) {
259 $this->lastTaskTitle = $item->data->title;
260 $this->queue->deleteItem($item);
261 return $this->formatTaskResult(TRUE);
262 }
263 else {
264 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
265 }
266 }
267
268 /**
269 * Release an item in keeping with the error mode.
270 *
271 * @param object $item
272 * The item previously produced by Queue::claimItem.
273 */
274 protected function releaseErrorItem($item) {
275 switch ($this->errorMode) {
276 case self::ERROR_CONTINUE:
277 $this->queue->deleteItem($item);
278 case self::ERROR_ABORT:
279 default:
280 $this->queue->releaseItem($item);
281 }
282 }
283
284 /**
285 * @return array
286 * - is_error => bool,
287 * - is_continue => bool,
288 * - numberOfItems => int,
289 * - redirect_url => string
290 */
291 public function handleEnd() {
292 if (is_callable($this->onEnd)) {
293 call_user_func($this->onEnd, $this->getTaskContext());
294 }
295 // Don't remove queueRunner until onEnd succeeds
296 if (!empty($_SESSION['queueRunners'][$this->qrid])) {
297 unset($_SESSION['queueRunners'][$this->qrid]);
298 }
299
300 // Fallback; web UI does redirect in Javascript
301 $result = array();
302 $result['is_error'] = 0;
303 $result['numberOfItems'] = 0;
304 $result['is_continue'] = 0;
305 if (!empty($this->onEndUrl)) {
306 $result['redirect_url'] = $this->onEndUrl;
307 }
308 return $result;
309 }
310
311 /**
312 * Format a result record which describes whether the task completed.
313 *
314 * @param bool $isOK
315 * TRUE if the task completed successfully.
316 * @param Exception|NULL $exception
317 * If applicable, an unhandled exception that arose during execution.
318 *
319 * @return array
320 * (is_error => bool, is_continue => bool, numberOfItems => int)
321 */
322 public function formatTaskResult($isOK, $exception = NULL) {
323 $numberOfItems = $this->queue->numberOfItems();
324
325 $result = array();
326 $result['is_error'] = $isOK ? 0 : 1;
327 $result['exception'] = $exception;
328 $result['last_task_title'] = isset($this->lastTaskTitle) ? $this->lastTaskTitle : '';
329 $result['numberOfItems'] = $this->queue->numberOfItems();
330 if ($result['numberOfItems'] <= 0) {
331 // nothing to do
332 $result['is_continue'] = 0;
333 }
334 elseif ($isOK) {
335 // more tasks remain, and this task succeeded
336 $result['is_continue'] = 1;
337 }
338 elseif ($this->errorMode == CRM_Queue_Runner::ERROR_CONTINUE) {
339 // more tasks remain, and we can disregard this error
340 $result['is_continue'] = 1;
341 }
342 else {
343 // more tasks remain, but we can't disregard the error
344 $result['is_continue'] = 0;
345 }
346
347 return $result;
348 }
349
350 /**
351 * @return CRM_Queue_TaskContext
352 */
353 protected function getTaskContext() {
354 if (!is_object($this->taskCtx)) {
355 $this->taskCtx = new CRM_Queue_TaskContext();
356 $this->taskCtx->queue = $this->queue;
357 // $this->taskCtx->log = CRM_Core_Config::getLog();
358 $this->taskCtx->log = CRM_Core_Error::createDebugLogger();
359 }
360 return $this->taskCtx;
361 }
362 }