0f069b268197094ca55b915a4364ea06f7c47f8c
[civicrm-core.git] / CRM / Queue / Runner.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.5 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2014 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
9 | |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
13 | |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
26 */
27
28 /**
29 * The queue runner is a helper which runs all jobs in a queue.
30 *
31 * The queue runner is most useful for one-off queues (such as an upgrade);
32 * if the intention is to develop a dedicated, long-running worker thread,
33 * then one should consider writing a new queue consumer.
34 */
35 class CRM_Queue_Runner {
36
37 /**
38 * The failed task should be discarded, and queue processing should continue
39 */
40 CONST ERROR_CONTINUE = 1;
41
42 /**
43 * The failed task should be kept in the queue, and queue processing should abort
44 */
45 CONST ERROR_ABORT = 2;
46
47 /**
48 * @var string
49 */
50 var $title;
51
52 /**
53 * @var CRM_Queue_Queue
54 */
55 var $queue;
56 var $errorMode;
57 var $isMinimal;
58 var $onEnd;
59 var $onEndUrl;
60 var $pathPrefix;
61 // queue-runner id; used for persistence
62 var $qrid;
63
64 /**
65 * @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
66 */
67 var $buttons;
68
69 /**
70 * @var CRM_Queue_TaskContext
71 */
72 var $taskCtx;
73
74 /**
75 * Locate a previously-created instance of the queue-runner
76 *
77 * @param $qrid string, the queue-runner ID
78 *
79 * @return CRM_Queue_Runner or NULL
80 */
81 static function instance($qrid) {
82 if (!empty($_SESSION['queueRunners'][$qrid])) {
83 return unserialize($_SESSION['queueRunners'][$qrid]);
84 }
85 else {
86 return NULL;
87 }
88 }
89
90 /**
91 *
92 * FIXME: parameter validation
93 * FIXME: document signature of onEnd callback
94 *
95 * @param $runnerSpec array with keys:
96 * - queue: CRM_Queue_Queue
97 * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT
98 * - onEnd: mixed, a callback to update the UI after running; should be both callable and serializable
99 * - onEndUrl: string, the URL to which one redirects
100 * - pathPrefix: string, prepended to URLs for the web-runner; default: 'civicrm/queue'
101 */
102 public function __construct($runnerSpec) {
103 $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner'));
104 $this->queue = $runnerSpec['queue'];
105 $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT);
106 $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE);
107 $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL);
108 $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL);
109 $this->pathPrefix = CRM_Utils_Array::value('pathPrefix', $runnerSpec, 'civicrm/queue');
110 $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE,'skip' => TRUE));
111 // perhaps this value should be randomized?
112 $this->qrid = $this->queue->getName();
113 }
114
115 /**
116 * @return array
117 */
118 function __sleep() {
119 // exclude taskCtx
120 return array('title', 'queue', 'errorMode', 'isMinimal', 'onEnd', 'onEndUrl', 'pathPrefix', 'qrid', 'buttons');
121 }
122
123 /**
124 * Redirect to the web-based queue-runner and evaluate all tasks in a queue.
125 */
126 public function runAllViaWeb() {
127 $_SESSION['queueRunners'][$this->qrid] = serialize($this);
128 $url = CRM_Utils_System::url($this->pathPrefix . '/runner', 'reset=1&qrid=' . urlencode($this->qrid));
129 CRM_Utils_System::redirect($url);
130 // TODO: evaluate items incrementally via AJAX polling, cleanup session, call
131 }
132
133 /**
134 * Immediately run all tasks in a queue (until either reaching the end
135 * of the queue or encountering an error)
136 *
137 * If the runner has an onEndUrl, then this function will not return
138 *
139 * @return mixed, TRUE if all tasks complete normally; otherwise, an array describing the failed task
140 */
141 public function runAll() {
142 $taskResult = $this->formatTaskResult(TRUE);
143 while ($taskResult['is_continue']) {
144 // setRaiseException should't be necessary here, but there's a bug
145 // somewhere which causes this setting to be lost. Observed while
146 // upgrading 4.0=>4.2. This preference really shouldn't be a global
147 // setting -- it should be more of a contextual/stack-based setting.
148 // This should be appropriate because queue-runners are not used with
149 // basic web pages -- they're used with CLI/REST/AJAX.
150 $errorScope = CRM_Core_TemporaryErrorScope::useException();
151 $taskResult = $this->runNext();
152 $errorScope = NULL;
153 }
154
155 if ($taskResult['numberOfItems'] == 0) {
156 $result = $this->handleEnd();
157 if (!empty($result['redirect_url'])) {
158 CRM_Utils_System::redirect($result['redirect_url']);
159 }
160 return TRUE;
161 }
162 else {
163 return $taskResult;
164 }
165 }
166
167 /**
168 * Take the next item from the queue and attempt to run it.
169 *
170 * Individual tasks may also throw exceptions -- caller should watch for exceptions
171 *
172 * @param $useSteal bool, whether to steal active locks
173 *
174 * @return array(is_error => bool, is_continue => bool, numberOfItems => int, 'last_task_title' => $, 'exception' => $)
175 */
176 public function runNext($useSteal = FALSE) {
177 if ($useSteal) {
178 $item = $this->queue->stealItem();
179 }
180 else {
181 $item = $this->queue->claimItem();
182 }
183
184 if ($item) {
185 $this->lastTaskTitle = $item->data->title;
186
187 $exception = NULL;
188 try {
189 $isOK = $item->data->run($this->getTaskContext());
190 if (!$isOK) {
191 $exception = new Exception('Task returned false');
192 }
193 }
194 catch(Exception$e) {
195 $isOK = FALSE;
196 $exception = $e;
197 }
198
199 if ($isOK) {
200 $this->queue->deleteItem($item);
201 }
202 else {
203 $this->releaseErrorItem($item);
204 }
205
206 return $this->formatTaskResult($isOK, $exception);
207 }
208 else {
209 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
210 }
211 }
212
213 /**
214 * Take the next item from the queue and attempt to run it.
215 *
216 * Individual tasks may also throw exceptions -- caller should watch for exceptions
217 *
218 * @param $useSteal bool, whether to steal active locks
219 *
220 * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
221 */
222 public function skipNext($useSteal = FALSE) {
223 if ($useSteal) {
224 $item = $this->queue->stealItem();
225 }
226 else {
227 $item = $this->queue->claimItem();
228 }
229
230 if ($item) {
231 $this->lastTaskTitle = $item->data->title;
232 $this->queue->deleteItem($item);
233 return $this->formatTaskResult(TRUE);
234 }
235 else {
236 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
237 }
238 }
239
240 /**
241 * @param $item
242 */
243 protected function releaseErrorItem($item) {
244 switch ($this->errorMode) {
245 case self::ERROR_CONTINUE:
246 $this->queue->deleteItem($item);
247 case self::ERROR_ABORT:
248 default:
249 $this->queue->releaseItem($item);
250 }
251 }
252
253 /**
254 *
255 * @return array(is_error => bool, is_continue => bool, numberOfItems => int, redirect_url => string)
256 */
257 public function handleEnd() {
258 if (is_callable($this->onEnd)) {
259 call_user_func($this->onEnd, $this->getTaskContext());
260 }
261 // Don't remove queueRunner until onEnd succeeds
262 if (!empty($_SESSION['queueRunners'][$this->qrid])) {
263 unset($_SESSION['queueRunners'][$this->qrid]);
264 }
265
266 // Fallback; web UI does redirect in Javascript
267 $result = array();
268 $result['is_error'] = 0;
269 $result['numberOfItems'] = 0;
270 $result['is_continue'] = 0;
271 if (!empty($this->onEndUrl)) {
272 $result['redirect_url'] = $this->onEndUrl;
273 }
274 return $result;
275 }
276
277 /**
278 *
279 * @param $isOK
280 * @param null $exception
281 *
282 * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
283 */
284 function formatTaskResult($isOK, $exception = NULL) {
285 $numberOfItems = $this->queue->numberOfItems();
286
287 $result = array();
288 $result['is_error'] = $isOK ? 0 : 1;
289 $result['exception'] = $exception;
290 $result['last_task_title'] = isset($this->lastTaskTitle) ? $this->lastTaskTitle : '';
291 $result['numberOfItems'] = $this->queue->numberOfItems();
292 if ($result['numberOfItems'] <= 0) {
293 // nothing to do
294 $result['is_continue'] = 0;
295 }
296 elseif ($isOK) {
297 // more tasks remain, and this task succeeded
298 $result['is_continue'] = 1;
299 }
300 elseif ($this->errorMode == CRM_Queue_Runner::ERROR_CONTINUE) {
301 // more tasks remain, and we can disregard this error
302 $result['is_continue'] = 1;
303 }
304 else {
305 // more tasks remain, but we can't disregard the error
306 $result['is_continue'] = 0;
307 }
308
309 return $result;
310 }
311
312 /**
313 * @return CRM_Queue_TaskContext
314 */
315 protected function getTaskContext() {
316 if (!is_object($this->taskCtx)) {
317 $this->taskCtx = new CRM_Queue_TaskContext();
318 $this->taskCtx->queue = $this->queue;
319 // $this->taskCtx->log = CRM_Core_Config::getLog();
320 $this->taskCtx->log = CRM_Core_Error::createDebugLogger();
321 }
322 return $this->taskCtx;
323 }
324 }
325