commiting uncommited changes on live site
[weblabels.fsf.org.git] / crm.fsf.org / 20131203 / files / modules / system / system.queue.inc
1 <?php
2
3 /**
4 * @file
5 * Queue functionality.
6 */
7
8 /**
9 * @defgroup queue Queue operations
10 * @{
11 * Queue items to allow later processing.
12 *
13 * The queue system allows placing items in a queue and processing them later.
14 * The system tries to ensure that only one consumer can process an item.
15 *
16 * Before a queue can be used it needs to be created by
17 * DrupalQueueInterface::createQueue().
18 *
19 * Items can be added to the queue by passing an arbitrary data object to
20 * DrupalQueueInterface::createItem().
21 *
22 * To process an item, call DrupalQueueInterface::claimItem() and specify how
23 * long you want to have a lease for working on that item. When finished
24 * processing, the item needs to be deleted by calling
25 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
26 * made available again by the DrupalQueueInterface implementation once the
27 * lease expires. Another consumer will then be able to receive it when calling
28 * DrupalQueueInterface::claimItem(). Due to this, the processing code should
29 * be aware that an item might be handed over for processing more than once.
30 *
31 * The $item object used by the DrupalQueueInterface can contain arbitrary
32 * metadata depending on the implementation. Systems using the interface should
33 * only rely on the data property which will contain the information passed to
34 * DrupalQueueInterface::createItem(). The full queue item returned by
35 * DrupalQueueInterface::claimItem() needs to be passed to
36 * DrupalQueueInterface::deleteItem() once processing is completed.
37 *
38 * There are two kinds of queue backends available: reliable, which preserves
39 * the order of messages and guarantees that every item will be executed at
40 * least once. The non-reliable kind only does a best effort to preserve order
41 * in messages and to execute them at least once but there is a small chance
42 * that some items get lost. For example, some distributed back-ends like
43 * Amazon SQS will be managing jobs for a large set of producers and consumers
44 * where a strict FIFO ordering will likely not be preserved. Another example
45 * would be an in-memory queue backend which might lose items if it crashes.
46 * However, such a backend would be able to deal with significantly more writes
47 * than a reliable queue and for many tasks this is more important. See
48 * aggregator_cron() for an example of how to effectively utilize a
49 * non-reliable queue. Another example is doing Twitter statistics -- the small
50 * possibility of losing a few items is insignificant next to power of the
51 * queue being able to keep up with writes. As described in the processing
52 * section, regardless of the queue being reliable or not, the processing code
53 * should be aware that an item might be handed over for processing more than
54 * once (because the processing code might time out before it finishes).
55 */
56
57 /**
58 * Factory class for interacting with queues.
59 */
60 class DrupalQueue {
61 /**
62 * Returns the queue object for a given name.
63 *
64 * The following variables can be set by variable_set or $conf overrides:
65 * - queue_class_$name: the class to be used for the queue $name.
66 * - queue_default_class: the class to use when queue_class_$name is not
67 * defined. Defaults to SystemQueue, a reliable backend using SQL.
68 * - queue_default_reliable_class: the class to use when queue_class_$name is
69 * not defined and the queue_default_class is not reliable. Defaults to
70 * SystemQueue.
71 *
72 * @param $name
73 * Arbitrary string. The name of the queue to work with.
74 * @param $reliable
75 * TRUE if the ordering of items and guaranteeing every item executes at
76 * least once is important, FALSE if scalability is the main concern.
77 *
78 * @return
79 * The queue object for a given name.
80 */
81 public static function get($name, $reliable = FALSE) {
82 static $queues;
83 if (!isset($queues[$name])) {
84 $class = variable_get('queue_class_' . $name, NULL);
85 if (!$class) {
86 $class = variable_get('queue_default_class', 'SystemQueue');
87 }
88 $object = new $class($name);
89 if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
90 $class = variable_get('queue_default_reliable_class', 'SystemQueue');
91 $object = new $class($name);
92 }
93 $queues[$name] = $object;
94 }
95 return $queues[$name];
96 }
97 }
98
99 interface DrupalQueueInterface {
100
101 /**
102 * Add a queue item and store it directly to the queue.
103 *
104 * @param $data
105 * Arbitrary data to be associated with the new task in the queue.
106 * @return
107 * TRUE if the item was successfully created and was (best effort) added
108 * to the queue, otherwise FALSE. We don't guarantee the item was
109 * committed to disk etc, but as far as we know, the item is now in the
110 * queue.
111 */
112 public function createItem($data);
113
114 /**
115 * Retrieve the number of items in the queue.
116 *
117 * This is intended to provide a "best guess" count of the number of items in
118 * the queue. Depending on the implementation and the setup, the accuracy of
119 * the results of this function may vary.
120 *
121 * e.g. On a busy system with a large number of consumers and items, the
122 * result might only be valid for a fraction of a second and not provide an
123 * accurate representation.
124 *
125 * @return
126 * An integer estimate of the number of items in the queue.
127 */
128 public function numberOfItems();
129
130 /**
131 * Claim an item in the queue for processing.
132 *
133 * @param $lease_time
134 * How long the processing is expected to take in seconds, defaults to an
135 * hour. After this lease expires, the item will be reset and another
136 * consumer can claim the item. For idempotent tasks (which can be run
137 * multiple times without side effects), shorter lease times would result
138 * in lower latency in case a consumer fails. For tasks that should not be
139 * run more than once (non-idempotent), a larger lease time will make it
140 * more rare for a given task to run multiple times in cases of failure,
141 * at the cost of higher latency.
142 * @return
143 * On success we return an item object. If the queue is unable to claim an
144 * item it returns false. This implies a best effort to retrieve an item
145 * and either the queue is empty or there is some other non-recoverable
146 * problem.
147 */
148 public function claimItem($lease_time = 3600);
149
150 /**
151 * Delete a finished item from the queue.
152 *
153 * @param $item
154 * The item returned by DrupalQueueInterface::claimItem().
155 */
156 public function deleteItem($item);
157
158 /**
159 * Release an item that the worker could not process, so another
160 * worker can come in and process it before the timeout expires.
161 *
162 * @param $item
163 * @return boolean
164 */
165 public function releaseItem($item);
166
167 /**
168 * Create a queue.
169 *
170 * Called during installation and should be used to perform any necessary
171 * initialization operations. This should not be confused with the
172 * constructor for these objects, which is called every time an object is
173 * instantiated to operate on a queue. This operation is only needed the
174 * first time a given queue is going to be initialized (for example, to make
175 * a new database table or directory to hold tasks for the queue -- it
176 * depends on the queue implementation if this is necessary at all).
177 */
178 public function createQueue();
179
180 /**
181 * Delete a queue and every item in the queue.
182 */
183 public function deleteQueue();
184 }
185
186 /**
187 * Reliable queue interface.
188 *
189 * Classes implementing this interface preserve the order of messages and
190 * guarantee that every item will be executed at least once.
191 */
192 interface DrupalReliableQueueInterface extends DrupalQueueInterface {
193 }
194
195 /**
196 * Default queue implementation.
197 */
198 class SystemQueue implements DrupalReliableQueueInterface {
199 /**
200 * The name of the queue this instance is working with.
201 *
202 * @var string
203 */
204 protected $name;
205
206 public function __construct($name) {
207 $this->name = $name;
208 }
209
210 public function createItem($data) {
211 // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
212 // the queue table yet, so we cannot rely on drupal_write_record().
213 $query = db_insert('queue')
214 ->fields(array(
215 'name' => $this->name,
216 'data' => serialize($data),
217 // We cannot rely on REQUEST_TIME because many items might be created
218 // by a single request which takes longer than 1 second.
219 'created' => time(),
220 ));
221 return (bool) $query->execute();
222 }
223
224 public function numberOfItems() {
225 return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
226 }
227
228 public function claimItem($lease_time = 30) {
229 // Claim an item by updating its expire fields. If claim is not successful
230 // another thread may have claimed the item in the meantime. Therefore loop
231 // until an item is successfully claimed or we are reasonably sure there
232 // are no unclaimed items left.
233 while (TRUE) {
234 $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
235 if ($item) {
236 // Try to update the item. Only one thread can succeed in UPDATEing the
237 // same row. We cannot rely on REQUEST_TIME because items might be
238 // claimed by a single consumer which runs longer than 1 second. If we
239 // continue to use REQUEST_TIME instead of the current time(), we steal
240 // time from the lease, and will tend to reset items before the lease
241 // should really expire.
242 $update = db_update('queue')
243 ->fields(array(
244 'expire' => time() + $lease_time,
245 ))
246 ->condition('item_id', $item->item_id)
247 ->condition('expire', 0);
248 // If there are affected rows, this update succeeded.
249 if ($update->execute()) {
250 $item->data = unserialize($item->data);
251 return $item;
252 }
253 }
254 else {
255 // No items currently available to claim.
256 return FALSE;
257 }
258 }
259 }
260
261 public function releaseItem($item) {
262 $update = db_update('queue')
263 ->fields(array(
264 'expire' => 0,
265 ))
266 ->condition('item_id', $item->item_id);
267 return $update->execute();
268 }
269
270 public function deleteItem($item) {
271 db_delete('queue')
272 ->condition('item_id', $item->item_id)
273 ->execute();
274 }
275
276 public function createQueue() {
277 // All tasks are stored in a single database table (which is created when
278 // Drupal is first installed) so there is nothing we need to do to create
279 // a new queue.
280 }
281
282 public function deleteQueue() {
283 db_delete('queue')
284 ->condition('name', $this->name)
285 ->execute();
286 }
287 }
288
289 /**
290 * Static queue implementation.
291 *
292 * This allows "undelayed" variants of processes relying on the Queue
293 * interface. The queue data resides in memory. It should only be used for
294 * items that will be queued and dequeued within a given page request.
295 */
296 class MemoryQueue implements DrupalQueueInterface {
297 /**
298 * The queue data.
299 *
300 * @var array
301 */
302 protected $queue;
303
304 /**
305 * Counter for item ids.
306 *
307 * @var int
308 */
309 protected $id_sequence;
310
311 /**
312 * Start working with a queue.
313 *
314 * @param $name
315 * Arbitrary string. The name of the queue to work with.
316 */
317 public function __construct($name) {
318 $this->queue = array();
319 $this->id_sequence = 0;
320 }
321
322 public function createItem($data) {
323 $item = new stdClass();
324 $item->item_id = $this->id_sequence++;
325 $item->data = $data;
326 $item->created = time();
327 $item->expire = 0;
328 $this->queue[$item->item_id] = $item;
329 return TRUE;
330 }
331
332 public function numberOfItems() {
333 return count($this->queue);
334 }
335
336 public function claimItem($lease_time = 30) {
337 foreach ($this->queue as $key => $item) {
338 if ($item->expire == 0) {
339 $item->expire = time() + $lease_time;
340 $this->queue[$key] = $item;
341 return $item;
342 }
343 }
344 return FALSE;
345 }
346
347 public function deleteItem($item) {
348 unset($this->queue[$item->item_id]);
349 }
350
351 public function releaseItem($item) {
352 if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
353 $this->queue[$item->item_id]->expire = 0;
354 return TRUE;
355 }
356 return FALSE;
357 }
358
359 public function createQueue() {
360 // Nothing needed here.
361 }
362
363 public function deleteQueue() {
364 $this->queue = array();
365 $this->id_sequence = 0;
366 }
367 }
368
369 /**
370 * @} End of "defgroup queue".
371 */