9 * @defgroup queue Queue operations
11 * Queue items to allow later processing.
13 * The queue system allows placing items in a queue and processing them later.
14 * The system tries to ensure that only one consumer can process an item.
16 * Before a queue can be used it needs to be created by
17 * DrupalQueueInterface::createQueue().
19 * Items can be added to the queue by passing an arbitrary data object to
20 * DrupalQueueInterface::createItem().
22 * To process an item, call DrupalQueueInterface::claimItem() and specify how
23 * long you want to have a lease for working on that item. When finished
24 * processing, the item needs to be deleted by calling
25 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
26 * made available again by the DrupalQueueInterface implementation once the
27 * lease expires. Another consumer will then be able to receive it when calling
28 * DrupalQueueInterface::claimItem(). Due to this, the processing code should
29 * be aware that an item might be handed over for processing more than once.
31 * The $item object used by the DrupalQueueInterface can contain arbitrary
32 * metadata depending on the implementation. Systems using the interface should
33 * only rely on the data property which will contain the information passed to
34 * DrupalQueueInterface::createItem(). The full queue item returned by
35 * DrupalQueueInterface::claimItem() needs to be passed to
36 * DrupalQueueInterface::deleteItem() once processing is completed.
38 * There are two kinds of queue backends available: reliable, which preserves
39 * the order of messages and guarantees that every item will be executed at
40 * least once. The non-reliable kind only does a best effort to preserve order
41 * in messages and to execute them at least once but there is a small chance
42 * that some items get lost. For example, some distributed back-ends like
43 * Amazon SQS will be managing jobs for a large set of producers and consumers
44 * where a strict FIFO ordering will likely not be preserved. Another example
45 * would be an in-memory queue backend which might lose items if it crashes.
46 * However, such a backend would be able to deal with significantly more writes
47 * than a reliable queue and for many tasks this is more important. See
48 * aggregator_cron() for an example of how to effectively utilize a
49 * non-reliable queue. Another example is doing Twitter statistics -- the small
50 * possibility of losing a few items is insignificant next to power of the
51 * queue being able to keep up with writes. As described in the processing
52 * section, regardless of the queue being reliable or not, the processing code
53 * should be aware that an item might be handed over for processing more than
54 * once (because the processing code might time out before it finishes).
58 * Factory class for interacting with queues.
62 * Returns the queue object for a given name.
64 * The following variables can be set by variable_set or $conf overrides:
65 * - queue_class_$name: the class to be used for the queue $name.
66 * - queue_default_class: the class to use when queue_class_$name is not
67 * defined. Defaults to SystemQueue, a reliable backend using SQL.
68 * - queue_default_reliable_class: the class to use when queue_class_$name is
69 * not defined and the queue_default_class is not reliable. Defaults to
73 * Arbitrary string. The name of the queue to work with.
75 * TRUE if the ordering of items and guaranteeing every item executes at
76 * least once is important, FALSE if scalability is the main concern.
79 * The queue object for a given name.
81 public static function get($name, $reliable = FALSE) {
83 if (!isset($queues[$name])) {
84 $class = variable_get('queue_class_' . $name, NULL);
86 $class = variable_get('queue_default_class', 'SystemQueue');
88 $object = new $class($name);
89 if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
90 $class = variable_get('queue_default_reliable_class', 'SystemQueue');
91 $object = new $class($name);
93 $queues[$name] = $object;
95 return $queues[$name];
99 interface DrupalQueueInterface {
102 * Add a queue item and store it directly to the queue.
105 * Arbitrary data to be associated with the new task in the queue.
107 * TRUE if the item was successfully created and was (best effort) added
108 * to the queue, otherwise FALSE. We don't guarantee the item was
109 * committed to disk etc, but as far as we know, the item is now in the
112 public function createItem($data);
115 * Retrieve the number of items in the queue.
117 * This is intended to provide a "best guess" count of the number of items in
118 * the queue. Depending on the implementation and the setup, the accuracy of
119 * the results of this function may vary.
121 * e.g. On a busy system with a large number of consumers and items, the
122 * result might only be valid for a fraction of a second and not provide an
123 * accurate representation.
126 * An integer estimate of the number of items in the queue.
128 public function numberOfItems();
131 * Claim an item in the queue for processing.
134 * How long the processing is expected to take in seconds, defaults to an
135 * hour. After this lease expires, the item will be reset and another
136 * consumer can claim the item. For idempotent tasks (which can be run
137 * multiple times without side effects), shorter lease times would result
138 * in lower latency in case a consumer fails. For tasks that should not be
139 * run more than once (non-idempotent), a larger lease time will make it
140 * more rare for a given task to run multiple times in cases of failure,
141 * at the cost of higher latency.
143 * On success we return an item object. If the queue is unable to claim an
144 * item it returns false. This implies a best effort to retrieve an item
145 * and either the queue is empty or there is some other non-recoverable
148 public function claimItem($lease_time = 3600);
151 * Delete a finished item from the queue.
154 * The item returned by DrupalQueueInterface::claimItem().
156 public function deleteItem($item);
159 * Release an item that the worker could not process, so another
160 * worker can come in and process it before the timeout expires.
165 public function releaseItem($item);
170 * Called during installation and should be used to perform any necessary
171 * initialization operations. This should not be confused with the
172 * constructor for these objects, which is called every time an object is
173 * instantiated to operate on a queue. This operation is only needed the
174 * first time a given queue is going to be initialized (for example, to make
175 * a new database table or directory to hold tasks for the queue -- it
176 * depends on the queue implementation if this is necessary at all).
178 public function createQueue();
181 * Delete a queue and every item in the queue.
183 public function deleteQueue();
187 * Reliable queue interface.
189 * Classes implementing this interface preserve the order of messages and
190 * guarantee that every item will be executed at least once.
192 interface DrupalReliableQueueInterface extends DrupalQueueInterface {
196 * Default queue implementation.
198 class SystemQueue implements DrupalReliableQueueInterface {
200 * The name of the queue this instance is working with.
206 public function __construct($name) {
210 public function createItem($data) {
211 // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
212 // the queue table yet, so we cannot rely on drupal_write_record().
213 $query = db_insert('queue')
215 'name' => $this->name,
216 'data' => serialize($data),
217 // We cannot rely on REQUEST_TIME because many items might be created
218 // by a single request which takes longer than 1 second.
221 return (bool) $query->execute();
224 public function numberOfItems() {
225 return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
228 public function claimItem($lease_time = 30) {
229 // Claim an item by updating its expire fields. If claim is not successful
230 // another thread may have claimed the item in the meantime. Therefore loop
231 // until an item is successfully claimed or we are reasonably sure there
232 // are no unclaimed items left.
234 $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
236 // Try to update the item. Only one thread can succeed in UPDATEing the
237 // same row. We cannot rely on REQUEST_TIME because items might be
238 // claimed by a single consumer which runs longer than 1 second. If we
239 // continue to use REQUEST_TIME instead of the current time(), we steal
240 // time from the lease, and will tend to reset items before the lease
241 // should really expire.
242 $update = db_update('queue')
244 'expire' => time() + $lease_time,
246 ->condition('item_id', $item->item_id)
247 ->condition('expire', 0);
248 // If there are affected rows, this update succeeded.
249 if ($update->execute()) {
250 $item->data = unserialize($item->data);
255 // No items currently available to claim.
261 public function releaseItem($item) {
262 $update = db_update('queue')
266 ->condition('item_id', $item->item_id);
267 return $update->execute();
270 public function deleteItem($item) {
272 ->condition('item_id', $item->item_id)
276 public function createQueue() {
277 // All tasks are stored in a single database table (which is created when
278 // Drupal is first installed) so there is nothing we need to do to create
282 public function deleteQueue() {
284 ->condition('name', $this->name)
290 * Static queue implementation.
292 * This allows "undelayed" variants of processes relying on the Queue
293 * interface. The queue data resides in memory. It should only be used for
294 * items that will be queued and dequeued within a given page request.
296 class MemoryQueue implements DrupalQueueInterface {
305 * Counter for item ids.
309 protected $id_sequence;
312 * Start working with a queue.
315 * Arbitrary string. The name of the queue to work with.
317 public function __construct($name) {
318 $this->queue = array();
319 $this->id_sequence = 0;
322 public function createItem($data) {
323 $item = new stdClass();
324 $item->item_id = $this->id_sequence++;
326 $item->created = time();
328 $this->queue[$item->item_id] = $item;
332 public function numberOfItems() {
333 return count($this->queue);
336 public function claimItem($lease_time = 30) {
337 foreach ($this->queue as $key => $item) {
338 if ($item->expire == 0) {
339 $item->expire = time() + $lease_time;
340 $this->queue[$key] = $item;
347 public function deleteItem($item) {
348 unset($this->queue[$item->item_id]);
351 public function releaseItem($item) {
352 if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
353 $this->queue[$item->item_id]->expire = 0;
359 public function createQueue() {
360 // Nothing needed here.
363 public function deleteQueue() {
364 $this->queue = array();
365 $this->id_sequence = 0;
370 * @} End of "defgroup queue".