Merge remote-tracking branch 'upstream/4.3' into 4.3-master-2013-08-28-20-20-34
[civicrm-core.git] / CRM / Queue / Runner.php
1 <?php
2 /*
3 +--------------------------------------------------------------------+
4 | CiviCRM version 4.4 |
5 +--------------------------------------------------------------------+
6 | Copyright CiviCRM LLC (c) 2004-2013 |
7 +--------------------------------------------------------------------+
8 | This file is a part of CiviCRM. |
9 | |
10 | CiviCRM is free software; you can copy, modify, and distribute it |
11 | under the terms of the GNU Affero General Public License |
12 | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. |
13 | |
14 | CiviCRM is distributed in the hope that it will be useful, but |
15 | WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. |
17 | See the GNU Affero General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU Affero General Public |
20 | License and the CiviCRM Licensing Exception along |
21 | with this program; if not, contact CiviCRM LLC |
22 | at info[AT]civicrm[DOT]org. If you have questions about the |
23 | GNU Affero General Public License or the licensing of CiviCRM, |
24 | see the CiviCRM license FAQ at http://civicrm.org/licensing |
25 +--------------------------------------------------------------------+
26 */
27
28 /**
29 * The queue runner is a helper which runs all jobs in a queue.
30 *
31 * The queue runner is most useful for one-off queues (such as an upgrade);
32 * if the intention is to develop a dedicated, long-running worker thread,
33 * then one should consider writing a new queue consumer.
34 */
35 class CRM_Queue_Runner {
36
37 /**
38 * The failed task should be discarded, and queue processing should continue
39 */
40 CONST ERROR_CONTINUE = 1;
41
42 /**
43 * The failed task should be kept in the queue, and queue processing should abort
44 */
45 CONST ERROR_ABORT = 2;
46
47 /**
48 * @var string
49 */
50 var $title;
51
52 /**
53 * @var CRM_Queue_Queue
54 */
55 var $queue;
56 var $errorMode;
57 var $isMinimal;
58 var $onEnd;
59 var $onEndUrl;
60 var $pathPrefix;
61 // queue-runner id; used for persistence
62 var $qrid;
63
64 /**
65 * @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE)
66 */
67 var $buttons;
68
69 /**
70 * @var CRM_Queue_TaskContext
71 */
72 var $taskCtx;
73
74 /**
75 * Locate a previously-created instance of the queue-runner
76 *
77 * @param $qrid string, the queue-runner ID
78 *
79 * @return CRM_Queue_Runner or NULL
80 */
81 static function instance($qrid) {
82 if (!empty($_SESSION['queueRunners'][$qrid])) {
83 return unserialize($_SESSION['queueRunners'][$qrid]);
84 }
85 else {
86 return NULL;
87 }
88 }
89
90 /**
91 *
92 * FIXME: parameter validation
93 * FIXME: document signature of onEnd callback
94 *
95 * @param $runnerSpec array with keys:
96 * - queue: CRM_Queue_Queue
97 * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT
98 * - onEnd: mixed, a callback to update the UI after running; should be both callable and serializable
99 * - onEndUrl: string, the URL to which one redirects
100 * - pathPrefix: string, prepended to URLs for the web-runner; default: 'civicrm/queue'
101 */
102 public function __construct($runnerSpec) {
103 $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner'));
104 $this->queue = $runnerSpec['queue'];
105 $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT);
106 $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE);
107 $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL);
108 $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL);
109 $this->pathPrefix = CRM_Utils_Array::value('pathPrefix', $runnerSpec, 'civicrm/queue');
110 $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE,'skip' => TRUE));
111 // perhaps this value should be randomized?
112 $this->qrid = $this->queue->getName();
113 }
114
115 function __sleep() {
116 // exclude taskCtx
117 return array('title', 'queue', 'errorMode', 'isMinimal', 'onEnd', 'onEndUrl', 'pathPrefix', 'qrid', 'buttons');
118 }
119
120 /**
121 * Redirect to the web-based queue-runner and evaluate all tasks in a queue.
122 */
123 public function runAllViaWeb() {
124 $_SESSION['queueRunners'][$this->qrid] = serialize($this);
125 $url = CRM_Utils_System::url($this->pathPrefix . '/runner', 'reset=1&qrid=' . urlencode($this->qrid));
126 CRM_Utils_System::redirect($url);
127 // TODO: evaluate items incrementally via AJAX polling, cleanup session, call
128 }
129
130 /**
131 * Immediately run all tasks in a queue (until either reaching the end
132 * of the queue or encountering an error)
133 *
134 * If the runner has an onEndUrl, then this function will not return
135 *
136 * @return mixed, TRUE if all tasks complete normally; otherwise, an array describing the failed task
137 */
138 public function runAll() {
139 $taskResult = $this->formatTaskResult(TRUE);
140 while ($taskResult['is_continue']) {
141 // setRaiseException should't be necessary here, but there's a bug
142 // somewhere which causes this setting to be lost. Observed while
143 // upgrading 4.0=>4.2. This preference really shouldn't be a global
144 // setting -- it should be more of a contextual/stack-based setting.
145 // This should be appropriate because queue-runners are not used with
146 // basic web pages -- they're used with CLI/REST/AJAX.
147 $errorScope = CRM_Core_TemporaryErrorScope::useException();
148 $taskResult = $this->runNext();
149 $errorScope = NULL;
150 }
151
152 if ($taskResult['numberOfItems'] == 0) {
153 $result = $this->handleEnd();
154 if (!empty($result['redirect_url'])) {
155 CRM_Utils_System::redirect($result['redirect_url']);
156 }
157 return TRUE;
158 }
159 else {
160 return $taskResult;
161 }
162 }
163
164 /**
165 * Take the next item from the queue and attempt to run it.
166 *
167 * Individual tasks may also throw exceptions -- caller should watch for exceptions
168 *
169 * @param $useSteal bool, whether to steal active locks
170 *
171 * @return array(is_error => bool, is_continue => bool, numberOfItems => int, 'last_task_title' => $, 'exception' => $)
172 */
173 public function runNext($useSteal = FALSE) {
174 if ($useSteal) {
175 $item = $this->queue->stealItem();
176 }
177 else {
178 $item = $this->queue->claimItem();
179 }
180
181 if ($item) {
182 $this->lastTaskTitle = $item->data->title;
183
184 $exception = NULL;
185 try {
186 $isOK = $item->data->run($this->getTaskContext());
187 if (!$isOK) {
188 $exception = new Exception('Task returned false');
189 }
190 }
191 catch(Exception$e) {
192 $isOK = FALSE;
193 $exception = $e;
194 }
195
196 if ($isOK) {
197 $this->queue->deleteItem($item);
198 }
199 else {
200 $this->releaseErrorItem($item);
201 }
202
203 return $this->formatTaskResult($isOK, $exception);
204 }
205 else {
206 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
207 }
208 }
209
210 /**
211 * Take the next item from the queue and attempt to run it.
212 *
213 * Individual tasks may also throw exceptions -- caller should watch for exceptions
214 *
215 * @param $useSteal bool, whether to steal active locks
216 *
217 * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
218 */
219 public function skipNext($useSteal = FALSE) {
220 if ($useSteal) {
221 $item = $this->queue->stealItem();
222 }
223 else {
224 $item = $this->queue->claimItem();
225 }
226
227 if ($item) {
228 $this->lastTaskTitle = $item->data->title;
229 $this->queue->deleteItem($item);
230 return $this->formatTaskResult(TRUE);
231 }
232 else {
233 return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task'));
234 }
235 }
236
237 protected function releaseErrorItem($item) {
238 switch ($this->errorMode) {
239 case self::ERROR_CONTINUE:
240 $this->queue->deleteItem($item);
241 case self::ERROR_ABORT:
242 default:
243 $this->queue->releaseItem($item);
244 }
245 }
246
247 /**
248 *
249 * @return array(is_error => bool, is_continue => bool, numberOfItems => int, redirect_url => string)
250 */
251 public function handleEnd() {
252 if (is_callable($this->onEnd)) {
253 call_user_func($this->onEnd, $this->getTaskContext());
254 }
255 // Don't remove queueRunner until onEnd succeeds
256 if (!empty($_SESSION['queueRunners'][$this->qrid])) {
257 unset($_SESSION['queueRunners'][$this->qrid]);
258 }
259
260 // Fallback; web UI does redirect in Javascript
261 $result = array();
262 $result['is_error'] = 0;
263 $result['numberOfItems'] = 0;
264 $result['is_continue'] = 0;
265 if (!empty($this->onEndUrl)) {
266 $result['redirect_url'] = $this->onEndUrl;
267 }
268 return $result;
269 }
270
271 /**
272 *
273 * @return array(is_error => bool, is_continue => bool, numberOfItems => int)
274 */
275 function formatTaskResult($isOK, $exception = NULL) {
276 $numberOfItems = $this->queue->numberOfItems();
277
278 $result = array();
279 $result['is_error'] = $isOK ? 0 : 1;
280 $result['exception'] = $exception;
281 $result['last_task_title'] = isset($this->lastTaskTitle) ? $this->lastTaskTitle : '';
282 $result['numberOfItems'] = $this->queue->numberOfItems();
283 if ($result['numberOfItems'] <= 0) {
284 // nothing to do
285 $result['is_continue'] = 0;
286 }
287 elseif ($isOK) {
288 // more tasks remain, and this task succeeded
289 $result['is_continue'] = 1;
290 }
291 elseif ($this->errorMode == CRM_Queue_Runner::ERROR_CONTINUE) {
292 // more tasks remain, and we can disregard this error
293 $result['is_continue'] = 1;
294 }
295 else {
296 // more tasks remain, but we can't disregard the error
297 $result['is_continue'] = 0;
298 }
299
300 return $result;
301 }
302
303 protected function getTaskContext() {
304 if (!is_object($this->taskCtx)) {
305 $this->taskCtx = new CRM_Queue_TaskContext();
306 $this->taskCtx->queue = $this->queue;
307 // $this->taskCtx->log = CRM_Core_Config::getLog();
308 $this->taskCtx->log = CRM_Core_Error::createDebugLogger();
309 }
310 return $this->taskCtx;
311 }
312 }
313