Commit | Line | Data |
---|---|---|
7f254ad8 AE |
1 | <?php |
2 | ||
3 | /** | |
4 | * @file | |
5 | * Queue functionality. | |
6 | */ | |
7 | ||
8 | /** | |
9 | * @defgroup queue Queue operations | |
10 | * @{ | |
11 | * Queue items to allow later processing. | |
12 | * | |
13 | * The queue system allows placing items in a queue and processing them later. | |
14 | * The system tries to ensure that only one consumer can process an item. | |
15 | * | |
16 | * Before a queue can be used it needs to be created by | |
17 | * DrupalQueueInterface::createQueue(). | |
18 | * | |
19 | * Items can be added to the queue by passing an arbitrary data object to | |
20 | * DrupalQueueInterface::createItem(). | |
21 | * | |
22 | * To process an item, call DrupalQueueInterface::claimItem() and specify how | |
23 | * long you want to have a lease for working on that item. When finished | |
24 | * processing, the item needs to be deleted by calling | |
25 | * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be | |
26 | * made available again by the DrupalQueueInterface implementation once the | |
27 | * lease expires. Another consumer will then be able to receive it when calling | |
28 | * DrupalQueueInterface::claimItem(). Due to this, the processing code should | |
29 | * be aware that an item might be handed over for processing more than once. | |
30 | * | |
31 | * The $item object used by the DrupalQueueInterface can contain arbitrary | |
32 | * metadata depending on the implementation. Systems using the interface should | |
33 | * only rely on the data property which will contain the information passed to | |
34 | * DrupalQueueInterface::createItem(). The full queue item returned by | |
35 | * DrupalQueueInterface::claimItem() needs to be passed to | |
36 | * DrupalQueueInterface::deleteItem() once processing is completed. | |
37 | * | |
38 | * There are two kinds of queue backends available: reliable, which preserves | |
39 | * the order of messages and guarantees that every item will be executed at | |
40 | * least once. The non-reliable kind only does a best effort to preserve order | |
41 | * in messages and to execute them at least once but there is a small chance | |
42 | * that some items get lost. For example, some distributed back-ends like | |
43 | * Amazon SQS will be managing jobs for a large set of producers and consumers | |
44 | * where a strict FIFO ordering will likely not be preserved. Another example | |
45 | * would be an in-memory queue backend which might lose items if it crashes. | |
46 | * However, such a backend would be able to deal with significantly more writes | |
47 | * than a reliable queue and for many tasks this is more important. See | |
48 | * aggregator_cron() for an example of how to effectively utilize a | |
49 | * non-reliable queue. Another example is doing Twitter statistics -- the small | |
50 | * possibility of losing a few items is insignificant next to power of the | |
51 | * queue being able to keep up with writes. As described in the processing | |
52 | * section, regardless of the queue being reliable or not, the processing code | |
53 | * should be aware that an item might be handed over for processing more than | |
54 | * once (because the processing code might time out before it finishes). | |
55 | */ | |
56 | ||
57 | /** | |
58 | * Factory class for interacting with queues. | |
59 | */ | |
60 | class DrupalQueue { | |
61 | /** | |
62 | * Returns the queue object for a given name. | |
63 | * | |
64 | * The following variables can be set by variable_set or $conf overrides: | |
65 | * - queue_class_$name: the class to be used for the queue $name. | |
66 | * - queue_default_class: the class to use when queue_class_$name is not | |
67 | * defined. Defaults to SystemQueue, a reliable backend using SQL. | |
68 | * - queue_default_reliable_class: the class to use when queue_class_$name is | |
69 | * not defined and the queue_default_class is not reliable. Defaults to | |
70 | * SystemQueue. | |
71 | * | |
72 | * @param $name | |
73 | * Arbitrary string. The name of the queue to work with. | |
74 | * @param $reliable | |
75 | * TRUE if the ordering of items and guaranteeing every item executes at | |
76 | * least once is important, FALSE if scalability is the main concern. | |
77 | * | |
78 | * @return | |
79 | * The queue object for a given name. | |
80 | */ | |
81 | public static function get($name, $reliable = FALSE) { | |
82 | static $queues; | |
83 | if (!isset($queues[$name])) { | |
84 | $class = variable_get('queue_class_' . $name, NULL); | |
85 | if (!$class) { | |
86 | $class = variable_get('queue_default_class', 'SystemQueue'); | |
87 | } | |
88 | $object = new $class($name); | |
89 | if ($reliable && !$object instanceof DrupalReliableQueueInterface) { | |
90 | $class = variable_get('queue_default_reliable_class', 'SystemQueue'); | |
91 | $object = new $class($name); | |
92 | } | |
93 | $queues[$name] = $object; | |
94 | } | |
95 | return $queues[$name]; | |
96 | } | |
97 | } | |
98 | ||
99 | interface DrupalQueueInterface { | |
100 | ||
101 | /** | |
102 | * Add a queue item and store it directly to the queue. | |
103 | * | |
104 | * @param $data | |
105 | * Arbitrary data to be associated with the new task in the queue. | |
106 | * @return | |
107 | * TRUE if the item was successfully created and was (best effort) added | |
108 | * to the queue, otherwise FALSE. We don't guarantee the item was | |
109 | * committed to disk etc, but as far as we know, the item is now in the | |
110 | * queue. | |
111 | */ | |
112 | public function createItem($data); | |
113 | ||
114 | /** | |
115 | * Retrieve the number of items in the queue. | |
116 | * | |
117 | * This is intended to provide a "best guess" count of the number of items in | |
118 | * the queue. Depending on the implementation and the setup, the accuracy of | |
119 | * the results of this function may vary. | |
120 | * | |
121 | * e.g. On a busy system with a large number of consumers and items, the | |
122 | * result might only be valid for a fraction of a second and not provide an | |
123 | * accurate representation. | |
124 | * | |
125 | * @return | |
126 | * An integer estimate of the number of items in the queue. | |
127 | */ | |
128 | public function numberOfItems(); | |
129 | ||
130 | /** | |
131 | * Claim an item in the queue for processing. | |
132 | * | |
133 | * @param $lease_time | |
134 | * How long the processing is expected to take in seconds, defaults to an | |
135 | * hour. After this lease expires, the item will be reset and another | |
136 | * consumer can claim the item. For idempotent tasks (which can be run | |
137 | * multiple times without side effects), shorter lease times would result | |
138 | * in lower latency in case a consumer fails. For tasks that should not be | |
139 | * run more than once (non-idempotent), a larger lease time will make it | |
140 | * more rare for a given task to run multiple times in cases of failure, | |
141 | * at the cost of higher latency. | |
142 | * @return | |
143 | * On success we return an item object. If the queue is unable to claim an | |
144 | * item it returns false. This implies a best effort to retrieve an item | |
145 | * and either the queue is empty or there is some other non-recoverable | |
146 | * problem. | |
147 | */ | |
148 | public function claimItem($lease_time = 3600); | |
149 | ||
150 | /** | |
151 | * Delete a finished item from the queue. | |
152 | * | |
153 | * @param $item | |
154 | * The item returned by DrupalQueueInterface::claimItem(). | |
155 | */ | |
156 | public function deleteItem($item); | |
157 | ||
158 | /** | |
159 | * Release an item that the worker could not process, so another | |
160 | * worker can come in and process it before the timeout expires. | |
161 | * | |
162 | * @param $item | |
163 | * @return boolean | |
164 | */ | |
165 | public function releaseItem($item); | |
166 | ||
167 | /** | |
168 | * Create a queue. | |
169 | * | |
170 | * Called during installation and should be used to perform any necessary | |
171 | * initialization operations. This should not be confused with the | |
172 | * constructor for these objects, which is called every time an object is | |
173 | * instantiated to operate on a queue. This operation is only needed the | |
174 | * first time a given queue is going to be initialized (for example, to make | |
175 | * a new database table or directory to hold tasks for the queue -- it | |
176 | * depends on the queue implementation if this is necessary at all). | |
177 | */ | |
178 | public function createQueue(); | |
179 | ||
180 | /** | |
181 | * Delete a queue and every item in the queue. | |
182 | */ | |
183 | public function deleteQueue(); | |
184 | } | |
185 | ||
186 | /** | |
187 | * Reliable queue interface. | |
188 | * | |
189 | * Classes implementing this interface preserve the order of messages and | |
190 | * guarantee that every item will be executed at least once. | |
191 | */ | |
192 | interface DrupalReliableQueueInterface extends DrupalQueueInterface { | |
193 | } | |
194 | ||
195 | /** | |
196 | * Default queue implementation. | |
197 | */ | |
198 | class SystemQueue implements DrupalReliableQueueInterface { | |
199 | /** | |
200 | * The name of the queue this instance is working with. | |
201 | * | |
202 | * @var string | |
203 | */ | |
204 | protected $name; | |
205 | ||
206 | public function __construct($name) { | |
207 | $this->name = $name; | |
208 | } | |
209 | ||
210 | public function createItem($data) { | |
211 | // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain | |
212 | // the queue table yet, so we cannot rely on drupal_write_record(). | |
213 | $query = db_insert('queue') | |
214 | ->fields(array( | |
215 | 'name' => $this->name, | |
216 | 'data' => serialize($data), | |
217 | // We cannot rely on REQUEST_TIME because many items might be created | |
218 | // by a single request which takes longer than 1 second. | |
219 | 'created' => time(), | |
220 | )); | |
221 | return (bool) $query->execute(); | |
222 | } | |
223 | ||
224 | public function numberOfItems() { | |
225 | return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); | |
226 | } | |
227 | ||
228 | public function claimItem($lease_time = 30) { | |
229 | // Claim an item by updating its expire fields. If claim is not successful | |
230 | // another thread may have claimed the item in the meantime. Therefore loop | |
231 | // until an item is successfully claimed or we are reasonably sure there | |
232 | // are no unclaimed items left. | |
233 | while (TRUE) { | |
234 | $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); | |
235 | if ($item) { | |
236 | // Try to update the item. Only one thread can succeed in UPDATEing the | |
237 | // same row. We cannot rely on REQUEST_TIME because items might be | |
238 | // claimed by a single consumer which runs longer than 1 second. If we | |
239 | // continue to use REQUEST_TIME instead of the current time(), we steal | |
240 | // time from the lease, and will tend to reset items before the lease | |
241 | // should really expire. | |
242 | $update = db_update('queue') | |
243 | ->fields(array( | |
244 | 'expire' => time() + $lease_time, | |
245 | )) | |
246 | ->condition('item_id', $item->item_id) | |
247 | ->condition('expire', 0); | |
248 | // If there are affected rows, this update succeeded. | |
249 | if ($update->execute()) { | |
250 | $item->data = unserialize($item->data); | |
251 | return $item; | |
252 | } | |
253 | } | |
254 | else { | |
255 | // No items currently available to claim. | |
256 | return FALSE; | |
257 | } | |
258 | } | |
259 | } | |
260 | ||
261 | public function releaseItem($item) { | |
262 | $update = db_update('queue') | |
263 | ->fields(array( | |
264 | 'expire' => 0, | |
265 | )) | |
266 | ->condition('item_id', $item->item_id); | |
267 | return $update->execute(); | |
268 | } | |
269 | ||
270 | public function deleteItem($item) { | |
271 | db_delete('queue') | |
272 | ->condition('item_id', $item->item_id) | |
273 | ->execute(); | |
274 | } | |
275 | ||
276 | public function createQueue() { | |
277 | // All tasks are stored in a single database table (which is created when | |
278 | // Drupal is first installed) so there is nothing we need to do to create | |
279 | // a new queue. | |
280 | } | |
281 | ||
282 | public function deleteQueue() { | |
283 | db_delete('queue') | |
284 | ->condition('name', $this->name) | |
285 | ->execute(); | |
286 | } | |
287 | } | |
288 | ||
289 | /** | |
290 | * Static queue implementation. | |
291 | * | |
292 | * This allows "undelayed" variants of processes relying on the Queue | |
293 | * interface. The queue data resides in memory. It should only be used for | |
294 | * items that will be queued and dequeued within a given page request. | |
295 | */ | |
296 | class MemoryQueue implements DrupalQueueInterface { | |
297 | /** | |
298 | * The queue data. | |
299 | * | |
300 | * @var array | |
301 | */ | |
302 | protected $queue; | |
303 | ||
304 | /** | |
305 | * Counter for item ids. | |
306 | * | |
307 | * @var int | |
308 | */ | |
309 | protected $id_sequence; | |
310 | ||
311 | /** | |
312 | * Start working with a queue. | |
313 | * | |
314 | * @param $name | |
315 | * Arbitrary string. The name of the queue to work with. | |
316 | */ | |
317 | public function __construct($name) { | |
318 | $this->queue = array(); | |
319 | $this->id_sequence = 0; | |
320 | } | |
321 | ||
322 | public function createItem($data) { | |
323 | $item = new stdClass(); | |
324 | $item->item_id = $this->id_sequence++; | |
325 | $item->data = $data; | |
326 | $item->created = time(); | |
327 | $item->expire = 0; | |
328 | $this->queue[$item->item_id] = $item; | |
329 | return TRUE; | |
330 | } | |
331 | ||
332 | public function numberOfItems() { | |
333 | return count($this->queue); | |
334 | } | |
335 | ||
336 | public function claimItem($lease_time = 30) { | |
337 | foreach ($this->queue as $key => $item) { | |
338 | if ($item->expire == 0) { | |
339 | $item->expire = time() + $lease_time; | |
340 | $this->queue[$key] = $item; | |
341 | return $item; | |
342 | } | |
343 | } | |
344 | return FALSE; | |
345 | } | |
346 | ||
347 | public function deleteItem($item) { | |
348 | unset($this->queue[$item->item_id]); | |
349 | } | |
350 | ||
351 | public function releaseItem($item) { | |
352 | if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) { | |
353 | $this->queue[$item->item_id]->expire = 0; | |
354 | return TRUE; | |
355 | } | |
356 | return FALSE; | |
357 | } | |
358 | ||
359 | public function createQueue() { | |
360 | // Nothing needed here. | |
361 | } | |
362 | ||
363 | public function deleteQueue() { | |
364 | $this->queue = array(); | |
365 | $this->id_sequence = 0; | |
366 | } | |
367 | } | |
368 | ||
369 | /** | |
370 | * @} End of "defgroup queue". | |
371 | */ |