Commit | Line | Data |
---|---|---|
6a488035 TO |
1 | <?php |
2 | /* | |
3 | +--------------------------------------------------------------------+ | |
06b69b18 | 4 | | CiviCRM version 4.5 | |
6a488035 | 5 | +--------------------------------------------------------------------+ |
06b69b18 | 6 | | Copyright CiviCRM LLC (c) 2004-2014 | |
6a488035 TO |
7 | +--------------------------------------------------------------------+ |
8 | | This file is a part of CiviCRM. | | |
9 | | | | |
10 | | CiviCRM is free software; you can copy, modify, and distribute it | | |
11 | | under the terms of the GNU Affero General Public License | | |
12 | | Version 3, 19 November 2007 and the CiviCRM Licensing Exception. | | |
13 | | | | |
14 | | CiviCRM is distributed in the hope that it will be useful, but | | |
15 | | WITHOUT ANY WARRANTY; without even the implied warranty of | | |
16 | | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. | | |
17 | | See the GNU Affero General Public License for more details. | | |
18 | | | | |
19 | | You should have received a copy of the GNU Affero General Public | | |
20 | | License and the CiviCRM Licensing Exception along | | |
21 | | with this program; if not, contact CiviCRM LLC | | |
22 | | at info[AT]civicrm[DOT]org. If you have questions about the | | |
23 | | GNU Affero General Public License or the licensing of CiviCRM, | | |
24 | | see the CiviCRM license FAQ at http://civicrm.org/licensing | | |
25 | +--------------------------------------------------------------------+ | |
26 | */ | |
27 | ||
28 | /** | |
29 | * The queue runner is a helper which runs all jobs in a queue. | |
30 | * | |
31 | * The queue runner is most useful for one-off queues (such as an upgrade); | |
32 | * if the intention is to develop a dedicated, long-running worker thread, | |
33 | * then one should consider writing a new queue consumer. | |
34 | */ | |
35 | class CRM_Queue_Runner { | |
36 | ||
37 | /** | |
38 | * The failed task should be discarded, and queue processing should continue | |
39 | */ | |
40 | CONST ERROR_CONTINUE = 1; | |
41 | ||
42 | /** | |
43 | * The failed task should be kept in the queue, and queue processing should abort | |
44 | */ | |
45 | CONST ERROR_ABORT = 2; | |
46 | ||
47 | /** | |
48 | * @var string | |
49 | */ | |
50 | var $title; | |
51 | ||
52 | /** | |
53 | * @var CRM_Queue_Queue | |
54 | */ | |
55 | var $queue; | |
56 | var $errorMode; | |
57 | var $isMinimal; | |
58 | var $onEnd; | |
59 | var $onEndUrl; | |
60 | var $pathPrefix; | |
61 | // queue-runner id; used for persistence | |
62 | var $qrid; | |
63 | ||
64 | /** | |
65 | * @var array whether to display buttons, eg ('retry' => TRUE, 'skip' => FALSE) | |
66 | */ | |
67 | var $buttons; | |
68 | ||
69 | /** | |
70 | * @var CRM_Queue_TaskContext | |
71 | */ | |
72 | var $taskCtx; | |
73 | ||
74 | /** | |
75 | * Locate a previously-created instance of the queue-runner | |
76 | * | |
77 | * @param $qrid string, the queue-runner ID | |
78 | * | |
79 | * @return CRM_Queue_Runner or NULL | |
80 | */ | |
81 | static function instance($qrid) { | |
82 | if (!empty($_SESSION['queueRunners'][$qrid])) { | |
83 | return unserialize($_SESSION['queueRunners'][$qrid]); | |
84 | } | |
85 | else { | |
86 | return NULL; | |
87 | } | |
88 | } | |
89 | ||
90 | /** | |
91 | * | |
92 | * FIXME: parameter validation | |
93 | * FIXME: document signature of onEnd callback | |
94 | * | |
95 | * @param $runnerSpec array with keys: | |
96 | * - queue: CRM_Queue_Queue | |
97 | * - errorMode: int, ERROR_CONTINUE or ERROR_ABORT | |
98 | * - onEnd: mixed, a callback to update the UI after running; should be both callable and serializable | |
99 | * - onEndUrl: string, the URL to which one redirects | |
100 | * - pathPrefix: string, prepended to URLs for the web-runner; default: 'civicrm/queue' | |
101 | */ | |
102 | public function __construct($runnerSpec) { | |
103 | $this->title = CRM_Utils_Array::value('title', $runnerSpec, ts('Queue Runner')); | |
104 | $this->queue = $runnerSpec['queue']; | |
105 | $this->errorMode = CRM_Utils_Array::value('errorMode', $runnerSpec, self::ERROR_ABORT); | |
106 | $this->isMinimal = CRM_Utils_Array::value('isMinimal', $runnerSpec, FALSE); | |
107 | $this->onEnd = CRM_Utils_Array::value('onEnd', $runnerSpec, NULL); | |
108 | $this->onEndUrl = CRM_Utils_Array::value('onEndUrl', $runnerSpec, NULL); | |
109 | $this->pathPrefix = CRM_Utils_Array::value('pathPrefix', $runnerSpec, 'civicrm/queue'); | |
110 | $this->buttons = CRM_Utils_Array::value('buttons', $runnerSpec, array('retry' => TRUE,'skip' => TRUE)); | |
111 | // perhaps this value should be randomized? | |
112 | $this->qrid = $this->queue->getName(); | |
113 | } | |
114 | ||
115 | function __sleep() { | |
116 | // exclude taskCtx | |
117 | return array('title', 'queue', 'errorMode', 'isMinimal', 'onEnd', 'onEndUrl', 'pathPrefix', 'qrid', 'buttons'); | |
118 | } | |
119 | ||
120 | /** | |
121 | * Redirect to the web-based queue-runner and evaluate all tasks in a queue. | |
122 | */ | |
123 | public function runAllViaWeb() { | |
124 | $_SESSION['queueRunners'][$this->qrid] = serialize($this); | |
125 | $url = CRM_Utils_System::url($this->pathPrefix . '/runner', 'reset=1&qrid=' . urlencode($this->qrid)); | |
126 | CRM_Utils_System::redirect($url); | |
127 | // TODO: evaluate items incrementally via AJAX polling, cleanup session, call | |
128 | } | |
129 | ||
130 | /** | |
131 | * Immediately run all tasks in a queue (until either reaching the end | |
132 | * of the queue or encountering an error) | |
133 | * | |
134 | * If the runner has an onEndUrl, then this function will not return | |
135 | * | |
136 | * @return mixed, TRUE if all tasks complete normally; otherwise, an array describing the failed task | |
137 | */ | |
138 | public function runAll() { | |
139 | $taskResult = $this->formatTaskResult(TRUE); | |
140 | while ($taskResult['is_continue']) { | |
141 | // setRaiseException should't be necessary here, but there's a bug | |
142 | // somewhere which causes this setting to be lost. Observed while | |
143 | // upgrading 4.0=>4.2. This preference really shouldn't be a global | |
03e04002 | 144 | // setting -- it should be more of a contextual/stack-based setting. |
6a488035 TO |
145 | // This should be appropriate because queue-runners are not used with |
146 | // basic web pages -- they're used with CLI/REST/AJAX. | |
147 | $errorScope = CRM_Core_TemporaryErrorScope::useException(); | |
148 | $taskResult = $this->runNext(); | |
149 | $errorScope = NULL; | |
150 | } | |
151 | ||
152 | if ($taskResult['numberOfItems'] == 0) { | |
153 | $result = $this->handleEnd(); | |
154 | if (!empty($result['redirect_url'])) { | |
155 | CRM_Utils_System::redirect($result['redirect_url']); | |
156 | } | |
157 | return TRUE; | |
158 | } | |
159 | else { | |
160 | return $taskResult; | |
161 | } | |
162 | } | |
163 | ||
164 | /** | |
165 | * Take the next item from the queue and attempt to run it. | |
166 | * | |
167 | * Individual tasks may also throw exceptions -- caller should watch for exceptions | |
168 | * | |
169 | * @param $useSteal bool, whether to steal active locks | |
170 | * | |
171 | * @return array(is_error => bool, is_continue => bool, numberOfItems => int, 'last_task_title' => $, 'exception' => $) | |
172 | */ | |
173 | public function runNext($useSteal = FALSE) { | |
174 | if ($useSteal) { | |
175 | $item = $this->queue->stealItem(); | |
176 | } | |
177 | else { | |
178 | $item = $this->queue->claimItem(); | |
179 | } | |
180 | ||
181 | if ($item) { | |
182 | $this->lastTaskTitle = $item->data->title; | |
183 | ||
184 | $exception = NULL; | |
185 | try { | |
186 | $isOK = $item->data->run($this->getTaskContext()); | |
187 | if (!$isOK) { | |
188 | $exception = new Exception('Task returned false'); | |
189 | } | |
190 | } | |
191 | catch(Exception$e) { | |
192 | $isOK = FALSE; | |
193 | $exception = $e; | |
194 | } | |
195 | ||
196 | if ($isOK) { | |
197 | $this->queue->deleteItem($item); | |
198 | } | |
199 | else { | |
200 | $this->releaseErrorItem($item); | |
201 | } | |
202 | ||
203 | return $this->formatTaskResult($isOK, $exception); | |
204 | } | |
205 | else { | |
206 | return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task')); | |
207 | } | |
208 | } | |
209 | ||
210 | /** | |
211 | * Take the next item from the queue and attempt to run it. | |
212 | * | |
213 | * Individual tasks may also throw exceptions -- caller should watch for exceptions | |
214 | * | |
215 | * @param $useSteal bool, whether to steal active locks | |
216 | * | |
217 | * @return array(is_error => bool, is_continue => bool, numberOfItems => int) | |
218 | */ | |
219 | public function skipNext($useSteal = FALSE) { | |
220 | if ($useSteal) { | |
221 | $item = $this->queue->stealItem(); | |
222 | } | |
223 | else { | |
224 | $item = $this->queue->claimItem(); | |
225 | } | |
226 | ||
227 | if ($item) { | |
228 | $this->lastTaskTitle = $item->data->title; | |
229 | $this->queue->deleteItem($item); | |
230 | return $this->formatTaskResult(TRUE); | |
231 | } | |
232 | else { | |
233 | return $this->formatTaskResult(FALSE, new Exception('Failed to claim next task')); | |
234 | } | |
235 | } | |
236 | ||
237 | protected function releaseErrorItem($item) { | |
238 | switch ($this->errorMode) { | |
239 | case self::ERROR_CONTINUE: | |
240 | $this->queue->deleteItem($item); | |
241 | case self::ERROR_ABORT: | |
242 | default: | |
243 | $this->queue->releaseItem($item); | |
244 | } | |
245 | } | |
246 | ||
247 | /** | |
248 | * | |
249 | * @return array(is_error => bool, is_continue => bool, numberOfItems => int, redirect_url => string) | |
250 | */ | |
251 | public function handleEnd() { | |
252 | if (is_callable($this->onEnd)) { | |
253 | call_user_func($this->onEnd, $this->getTaskContext()); | |
254 | } | |
255 | // Don't remove queueRunner until onEnd succeeds | |
256 | if (!empty($_SESSION['queueRunners'][$this->qrid])) { | |
257 | unset($_SESSION['queueRunners'][$this->qrid]); | |
258 | } | |
259 | ||
260 | // Fallback; web UI does redirect in Javascript | |
261 | $result = array(); | |
262 | $result['is_error'] = 0; | |
263 | $result['numberOfItems'] = 0; | |
264 | $result['is_continue'] = 0; | |
265 | if (!empty($this->onEndUrl)) { | |
266 | $result['redirect_url'] = $this->onEndUrl; | |
267 | } | |
268 | return $result; | |
269 | } | |
270 | ||
271 | /** | |
272 | * | |
273 | * @return array(is_error => bool, is_continue => bool, numberOfItems => int) | |
274 | */ | |
275 | function formatTaskResult($isOK, $exception = NULL) { | |
276 | $numberOfItems = $this->queue->numberOfItems(); | |
277 | ||
278 | $result = array(); | |
279 | $result['is_error'] = $isOK ? 0 : 1; | |
280 | $result['exception'] = $exception; | |
281 | $result['last_task_title'] = isset($this->lastTaskTitle) ? $this->lastTaskTitle : ''; | |
282 | $result['numberOfItems'] = $this->queue->numberOfItems(); | |
283 | if ($result['numberOfItems'] <= 0) { | |
284 | // nothing to do | |
285 | $result['is_continue'] = 0; | |
286 | } | |
287 | elseif ($isOK) { | |
288 | // more tasks remain, and this task succeeded | |
289 | $result['is_continue'] = 1; | |
290 | } | |
291 | elseif ($this->errorMode == CRM_Queue_Runner::ERROR_CONTINUE) { | |
292 | // more tasks remain, and we can disregard this error | |
293 | $result['is_continue'] = 1; | |
294 | } | |
295 | else { | |
296 | // more tasks remain, but we can't disregard the error | |
297 | $result['is_continue'] = 0; | |
298 | } | |
299 | ||
300 | return $result; | |
301 | } | |
302 | ||
303 | protected function getTaskContext() { | |
304 | if (!is_object($this->taskCtx)) { | |
305 | $this->taskCtx = new CRM_Queue_TaskContext(); | |
306 | $this->taskCtx->queue = $this->queue; | |
307 | // $this->taskCtx->log = CRM_Core_Config::getLog(); | |
308 | $this->taskCtx->log = CRM_Core_Error::createDebugLogger(); | |
309 | } | |
310 | return $this->taskCtx; | |
311 | } | |
312 | } | |
313 |