--- /dev/null
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved. |
+ | |
+ | This work is published under the GNU AGPLv3 license with some |
+ | permitted exceptions and without any warranty. For full license |
+ | and copyright information, see https://civicrm.org/licensing |
+ +--------------------------------------------------------------------+
+ */
+ * Variation on CRM_Queue_Queue which can claim/release/delete items in batches.
+ */
+interface CRM_Queue_Queue_BatchQueueInterface {
+ /**
+ * Get a batch of queue items.
+ *
+ * @param int $limit
+ * Maximum number of records to claim
+ * @param int|null $lease_time
+ * Hold a lease on the claimed item for $X seconds.
+ * If NULL, inherit a default.
+ * @return object
+ * with key 'data' that matches the inputted data
+ */
+ public function claimItems(int $limit, ?int $lease_time = NULL): array;
+ /**
+ * Remove items from the queue.
+ *
+ * @param array $items
+ * The item returned by claimItem.
+ */
+ public function deleteItems(array $items): void;
+ /**
+ * Get the full data for multiple items.
+ *
+ * This is a passive peek - it does not claim/steal/release anything.
+ *
+ * @param array $ids
+ * The unique IDs of the tasks within the queue.
+ * @return array
+ */
+ public function fetchItems(array $ids): array;
+ /**
+ * Return an item that could not be processed.
+ *
+ * @param array $items
+ * The items returned by claimItem.
+ */
+ public function releaseItems(array $items): void;
* A queue implementation which stores items in the CiviCRM SQL database
-class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue {
+class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue implements CRM_Queue_Queue_BatchQueueInterface {
use CRM_Queue_Queue_SqlTrait;
- * Get the next item.
- *
- * @param int|null $lease_time
- * Hold a lease on the claimed item for $X seconds.
- * If NULL, inherit a queue default (`$queueSpec['lease_time']`) or system default (`DEFAULT_LEASE_TIME`).
- * @return object
- * With key 'data' that matches the inputted data.
+ * @inheritDoc
public function claimItem($lease_time = NULL) {
+ $items = $this->claimItems(1, $lease_time);
+ return $items[0] ?? NULL;
+ }
+ /**
+ * @inheritDoc
+ */
+ public function claimItems(int $limit, ?int $lease_time = NULL): array {
$lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME;
+ $limit = $this->getSpec('batch_limit') ? min($limit, $this->getSpec('batch_limit')) : $limit;
- $result = NULL;
$dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;');
$sql = "SELECT id, queue_name, submit_time, release_time, run_count, data
FROM civicrm_queue_item
WHERE queue_name = %1
AND (release_time IS NULL OR release_time < %2)
ORDER BY weight ASC, id ASC
+ LIMIT %3
$params = [
1 => [$this->getName(), 'String'],
2 => [CRM_Utils_Time::getTime(), 'Timestamp'],
+ 3 => [$limit, 'Integer'],
$dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem');
if (is_a($dao, 'DB_Error')) {
- if ($dao->fetch()) {
+ $result = [];
+ while ($dao->fetch()) {
+ $result[] = (object) [
+ 'id' => $dao->id,
+ 'data' => unserialize($dao->data),
+ 'queue_name' => $dao->queue_name,
+ 'run_count' => 1 + (int) $dao->run_count,
+ ];
+ }
+ if ($result) {
$nowEpoch = CRM_Utils_Time::getTimeRaw();
- $dao->run_count++;
- CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [
- '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'],
- '2' => [$dao->id, 'Integer'],
- '3' => [$dao->run_count, 'Integer'],
+ $sql = CRM_Utils_SQL::interpolate('UPDATE civicrm_queue_item SET release_time = @RT, run_count = 1+run_count WHERE id IN (#ids)', [
+ 'RT' => date('YmdHis', $nowEpoch + $lease_time),
+ 'ids' => CRM_Utils_Array::collect('id', $result),
- // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?)
- // work-around: inconsistent date-formatting causes unintentional breakage
- # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time));
- # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time);
- # $dao->save();
- $dao->data = unserialize($dao->data);
- $result = $dao;
+ CRM_Core_DAO::executeQuery($sql);
$dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;');
* Remove an item from the queue.
- * @param CRM_Core_DAO|stdClass $dao
+ * @param CRM_Core_DAO|stdClass $item
* The item returned by claimItem.
- public function deleteItem($dao) {
- $dao->delete();
- $dao->free();
+ public function deleteItem($item) {
+ $this->deleteItems([$item]);
+ }
+ public function deleteItems($items): void {
+ if (empty($items)) {
+ return;
+ }
+ $sql = CRM_Utils_SQL::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [
+ 'ids' => CRM_Utils_Array::collect('id', $items),
+ 'name' => $this->getName(),
+ ]);
+ CRM_Core_DAO::executeQuery($sql);
+ $this->freeDAOs($items);
* @return CRM_Queue_DAO_QueueItem|object|null $dao
public function fetchItem($id) {
- $dao = new CRM_Queue_DAO_QueueItem();
- $dao->id = $id;
- $dao->queue_name = $this->getName();
- if (!$dao->find(TRUE)) {
- return NULL;
+ $items = $this->fetchItems([$id]);
+ return $items[0] ?? NULL;
+ }
+ public function fetchItems(array $ids): array {
+ $dao = CRM_Utils_SQL_Select::from('civicrm_queue_item')
+ ->select(['id', 'data', 'run_count'])
+ ->where('id IN (#ids)', ['ids' => $ids])
+ ->where('queue_name = @name', ['name' => $this->getName()])
+ ->execute();
+ $result = [];
+ while ($dao->fetch()) {
+ $result[] = (object) [
+ 'id' => $dao->id,
+ 'data' => unserialize($dao->data),
+ 'run_count' => $dao->run_count,
+ 'queue_name' => $this->getName(),
+ ];
- $dao->data = unserialize($dao->data);
- return $dao;
+ return $result;
* Return an item that could not be processed.
- * @param CRM_Core_DAO $dao
+ * @param CRM_Core_DAO $item
* The item returned by claimItem.
- public function releaseItem($dao) {
- if (empty($this->queueSpec['retry_interval'])) {
- CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1', [
- 1 => [$dao->id, 'Integer'],
- ]);
+ public function releaseItem($item) {
+ $this->releaseItems([$item]);
+ }
+ public function releaseItems($items): void {
+ if (empty($items)) {
+ return;
- else {
- CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL %2 SECOND) WHERE id = %1', [
- 1 => [$dao->id, 'Integer'],
- 2 => [$this->queueSpec['retry_interval'], 'Integer'],
- ]);
+ $sql = empty($this->queueSpec['retry_interval'])
+ ? 'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name'
+ : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name';
+ CRM_Core_DAO::executeQuery(CRM_Utils_SQL::interpolate($sql, [
+ 'ids' => CRM_Utils_Array::collect('id', $items),
+ 'name' => $this->getName(),
+ 'retry' => $this->queueSpec['retry_interval'] ?? NULL,
+ ]));
+ $this->freeDAOs($items);
+ }
+ protected function freeDAOs($mixed) {
+ $mixed = (array) $mixed;
+ foreach ($mixed as $item) {
+ if ($item instanceof CRM_Core_DAO) {
+ $item->free();
+ }
- $dao->free();
+ /**
+ * Grab items from a queue in batches.
+ *
+ * @dataProvider getQueueSpecs
+ * @param $queueSpec
+ */
+ public function testBatchClaim($queueSpec) {
+ $this->queue = $this->queueService->create($queueSpec);
+ $this->assertTrue($this->queue instanceof CRM_Queue_Queue);
+ if (!($this->queue instanceof CRM_Queue_Queue_BatchQueueInterface)) {
+ $this->markTestSkipped("Queue class does not support batch interface: " . get_class($this->queue));
+ }
+ for ($i = 0; $i < 9; $i++) {
+ $this->queue->createItem('x' . $i);
+ }
+ $this->assertEquals(9, $this->queue->numberOfItems());
+ // We expect this driver to be fully compliant with batching.
+ $claimsA = $this->queue->claimItems(3);
+ $claimsB = $this->queue->claimItems(3);
+ $this->assertEquals(9, $this->queue->numberOfItems());
+ $this->assertEquals(['x0', 'x1', 'x2'], CRM_Utils_Array::collect('data', $claimsA));
+ $this->assertEquals(['x3', 'x4', 'x5'], CRM_Utils_Array::collect('data', $claimsB));
+ $this->queue->deleteItems([$claimsA[0], $claimsA[1]]); /* x0, x1 */
+ $this->queue->releaseItems([$claimsA[2]]); /* x2: will retry with next claimItems() */
+ $this->queue->deleteItems([$claimsB[0], $claimsB[1]]); /* x3, x4 */
+ /* claimsB[2]: x5: Oops, we're gonna take some time to finish this one. */
+ $this->assertEquals(5, $this->queue->numberOfItems());
+ $claimsC = $this->queue->claimItems(3);
+ $this->assertEquals(['x2', 'x6', 'x7'], CRM_Utils_Array::collect('data', $claimsC));
+ $this->queue->deleteItem($claimsC[0]); /* x2 */
+ $this->queue->releaseItem($claimsC[1]); /* x6: will retry with next claimItems() */
+ $this->queue->deleteItem($claimsC[2]); /* x7 */
+ $this->assertEquals(3, $this->queue->numberOfItems());
+ $claimsD = $this->queue->claimItems(3);
+ $this->assertEquals(['x6', 'x8'], CRM_Utils_Array::collect('data', $claimsD));
+ $this->queue->deleteItem($claimsD[0]); /* x6 */
+ $this->queue->deleteItem($claimsD[1]); /* x8 */
+ $this->assertEquals(1, $this->queue->numberOfItems());
+ // claimsB took a while to wrap-up. But it finally did!
+ $this->queue->deleteItem($claimsB[2]); /* x5 */
+ $this->assertEquals(0, $this->queue->numberOfItems());
+ }