From 0374a8cc0dedd8f421367805a2bce804f2fb96e8 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Fri, 11 Feb 2022 01:07:25 -0800 Subject: [PATCH] SqlParallel - Implement BatchQueueInterface Before ------ Each of the `CRM_Queue_Queue_*` drivers supports a set of methods for claiming/releasing one queue-item at a time (eg `claimItem()`, `releaseItem()`, `deleteItem()`). After ----- All drivers still support the same queue-item methods. Additionally, the `SqlParallel` driver supports batch-oriented equivalents (`claimItems()`, `deleteItems()`, etc). Comments -------- I initially looked at updating all of the drivers to support queues. There were a few obstacles. Firstly, batched-claims seem semantically questionable for queues that run 1-by-1 (`Sql`, `Memory`). Secondly, there are a few extensions in contrib that extend these classes and override methods (consequently, they're looking for stable signatures). The approach here seemed to resolve those two concerns in an OOP-safe way: define an optional interface (`BatchQueueInterface`) which can be used to mark enhanced functionality on queues where it makes sense (eg `SqlParallel`). --- CRM/Queue/Queue/BatchQueueInterface.php | 57 +++++++++++++++++ CRM/Queue/Queue/SqlParallel.php | 51 +++++++-------- CRM/Queue/Queue/SqlTrait.php | 82 ++++++++++++++++++------- tests/phpunit/CRM/Queue/QueueTest.php | 50 +++++++++++++++ 4 files changed, 193 insertions(+), 47 deletions(-) create mode 100644 CRM/Queue/Queue/BatchQueueInterface.php diff --git a/CRM/Queue/Queue/BatchQueueInterface.php b/CRM/Queue/Queue/BatchQueueInterface.php new file mode 100644 index 0000000000..fd0c836a03 --- /dev/null +++ b/CRM/Queue/Queue/BatchQueueInterface.php @@ -0,0 +1,57 @@ +claimItems(1, $lease_time); + return $items[0] ?? NULL; + } + + /** + * @inheritDoc + */ + public function claimItems(int $limit, ?int $lease_time = NULL): array { $lease_time = $lease_time ?: $this->getSpec('lease_time') ?: static::DEFAULT_LEASE_TIME; + $limit = $this->getSpec('batch_limit') ? min($limit, $this->getSpec('batch_limit')) : $limit; - $result = NULL; $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;'); $sql = "SELECT id, queue_name, submit_time, release_time, run_count, data FROM civicrm_queue_item WHERE queue_name = %1 AND (release_time IS NULL OR release_time < %2) ORDER BY weight ASC, id ASC - LIMIT 1 + LIMIT %3 "; $params = [ 1 => [$this->getName(), 'String'], 2 => [CRM_Utils_Time::getTime(), 'Timestamp'], + 3 => [$limit, 'Integer'], ]; $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if (is_a($dao, 'DB_Error')) { @@ -65,22 +68,22 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { CRM_Core_Error::fatal(); } - if ($dao->fetch()) { + $result = []; + while ($dao->fetch()) { + $result[] = (object) [ + 'id' => $dao->id, + 'data' => unserialize($dao->data), + 'queue_name' => $dao->queue_name, + 'run_count' => 1 + (int) $dao->run_count, + ]; + } + if ($result) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); - $dao->run_count++; - CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [ - '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], - '2' => [$dao->id, 'Integer'], - '3' => [$dao->run_count, 'Integer'], + $sql = CRM_Utils_SQL::interpolate('UPDATE civicrm_queue_item SET release_time = @RT, run_count = 1+run_count WHERE id IN (#ids)', [ + 'RT' => date('YmdHis', $nowEpoch + $lease_time), + 'ids' => CRM_Utils_Array::collect('id', $result), ]); - - // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?) - // work-around: inconsistent date-formatting causes unintentional breakage - # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); - # $dao->release_time = date('YmdHis', $nowEpoch + $lease_time); - # $dao->save(); - $dao->data = unserialize($dao->data); - $result = $dao; + CRM_Core_DAO::executeQuery($sql); } $dao = CRM_Core_DAO::executeQuery('UNLOCK TABLES;'); diff --git a/CRM/Queue/Queue/SqlTrait.php b/CRM/Queue/Queue/SqlTrait.php index 40a9ad6002..0f68620c71 100644 --- a/CRM/Queue/Queue/SqlTrait.php +++ b/CRM/Queue/Queue/SqlTrait.php @@ -86,12 +86,23 @@ trait CRM_Queue_Queue_SqlTrait { /** * Remove an item from the queue. * - * @param CRM_Core_DAO|stdClass $dao + * @param CRM_Core_DAO|stdClass $item * The item returned by claimItem. */ - public function deleteItem($dao) { - $dao->delete(); - $dao->free(); + public function deleteItem($item) { + $this->deleteItems([$item]); + } + + public function deleteItems($items): void { + if (empty($items)) { + return; + } + $sql = CRM_Utils_SQL::interpolate('DELETE FROM civicrm_queue_item WHERE id IN (#ids) AND queue_name = @name', [ + 'ids' => CRM_Utils_Array::collect('id', $items), + 'name' => $this->getName(), + ]); + CRM_Core_DAO::executeQuery($sql); + $this->freeDAOs($items); } /** @@ -104,35 +115,60 @@ trait CRM_Queue_Queue_SqlTrait { * @return CRM_Queue_DAO_QueueItem|object|null $dao */ public function fetchItem($id) { - $dao = new CRM_Queue_DAO_QueueItem(); - $dao->id = $id; - $dao->queue_name = $this->getName(); - if (!$dao->find(TRUE)) { - return NULL; + $items = $this->fetchItems([$id]); + return $items[0] ?? NULL; + } + + public function fetchItems(array $ids): array { + $dao = CRM_Utils_SQL_Select::from('civicrm_queue_item') + ->select(['id', 'data', 'run_count']) + ->where('id IN (#ids)', ['ids' => $ids]) + ->where('queue_name = @name', ['name' => $this->getName()]) + ->execute(); + $result = []; + while ($dao->fetch()) { + $result[] = (object) [ + 'id' => $dao->id, + 'data' => unserialize($dao->data), + 'run_count' => $dao->run_count, + 'queue_name' => $this->getName(), + ]; } - $dao->data = unserialize($dao->data); - return $dao; + return $result; } /** * Return an item that could not be processed. * - * @param CRM_Core_DAO $dao + * @param CRM_Core_DAO $item * The item returned by claimItem. */ - public function releaseItem($dao) { - if (empty($this->queueSpec['retry_interval'])) { - CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = NULL WHERE id = %1', [ - 1 => [$dao->id, 'Integer'], - ]); + public function releaseItem($item) { + $this->releaseItems([$item]); + } + + public function releaseItems($items): void { + if (empty($items)) { + return; } - else { - CRM_Core_DAO::executeQuery('UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL %2 SECOND) WHERE id = %1', [ - 1 => [$dao->id, 'Integer'], - 2 => [$this->queueSpec['retry_interval'], 'Integer'], - ]); + $sql = empty($this->queueSpec['retry_interval']) + ? 'UPDATE civicrm_queue_item SET release_time = NULL WHERE id IN (#ids) AND queue_name = @name' + : 'UPDATE civicrm_queue_item SET release_time = DATE_ADD(NOW(), INTERVAL #retry SECOND) WHERE id IN (#ids) AND queue_name = @name'; + CRM_Core_DAO::executeQuery(CRM_Utils_SQL::interpolate($sql, [ + 'ids' => CRM_Utils_Array::collect('id', $items), + 'name' => $this->getName(), + 'retry' => $this->queueSpec['retry_interval'] ?? NULL, + ])); + $this->freeDAOs($items); + } + + protected function freeDAOs($mixed) { + $mixed = (array) $mixed; + foreach ($mixed as $item) { + if ($item instanceof CRM_Core_DAO) { + $item->free(); + } } - $dao->free(); } } diff --git a/tests/phpunit/CRM/Queue/QueueTest.php b/tests/phpunit/CRM/Queue/QueueTest.php index a5416e5ae8..43912aaeae 100644 --- a/tests/phpunit/CRM/Queue/QueueTest.php +++ b/tests/phpunit/CRM/Queue/QueueTest.php @@ -412,4 +412,54 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $queue2->releaseItem($item); } + /** + * Grab items from a queue in batches. + * + * @dataProvider getQueueSpecs + * @param $queueSpec + */ + public function testBatchClaim($queueSpec) { + $this->queue = $this->queueService->create($queueSpec); + $this->assertTrue($this->queue instanceof CRM_Queue_Queue); + if (!($this->queue instanceof CRM_Queue_Queue_BatchQueueInterface)) { + $this->markTestSkipped("Queue class does not support batch interface: " . get_class($this->queue)); + } + + for ($i = 0; $i < 9; $i++) { + $this->queue->createItem('x' . $i); + } + $this->assertEquals(9, $this->queue->numberOfItems()); + + // We expect this driver to be fully compliant with batching. + $claimsA = $this->queue->claimItems(3); + $claimsB = $this->queue->claimItems(3); + $this->assertEquals(9, $this->queue->numberOfItems()); + + $this->assertEquals(['x0', 'x1', 'x2'], CRM_Utils_Array::collect('data', $claimsA)); + $this->assertEquals(['x3', 'x4', 'x5'], CRM_Utils_Array::collect('data', $claimsB)); + + $this->queue->deleteItems([$claimsA[0], $claimsA[1]]); /* x0, x1 */ + $this->queue->releaseItems([$claimsA[2]]); /* x2: will retry with next claimItems() */ + $this->queue->deleteItems([$claimsB[0], $claimsB[1]]); /* x3, x4 */ + /* claimsB[2]: x5: Oops, we're gonna take some time to finish this one. */ + $this->assertEquals(5, $this->queue->numberOfItems()); + + $claimsC = $this->queue->claimItems(3); + $this->assertEquals(['x2', 'x6', 'x7'], CRM_Utils_Array::collect('data', $claimsC)); + $this->queue->deleteItem($claimsC[0]); /* x2 */ + $this->queue->releaseItem($claimsC[1]); /* x6: will retry with next claimItems() */ + $this->queue->deleteItem($claimsC[2]); /* x7 */ + $this->assertEquals(3, $this->queue->numberOfItems()); + + $claimsD = $this->queue->claimItems(3); + $this->assertEquals(['x6', 'x8'], CRM_Utils_Array::collect('data', $claimsD)); + $this->queue->deleteItem($claimsD[0]); /* x6 */ + $this->queue->deleteItem($claimsD[1]); /* x8 */ + $this->assertEquals(1, $this->queue->numberOfItems()); + + // claimsB took a while to wrap-up. But it finally did! + $this->queue->deleteItem($claimsB[2]); /* x5 */ + $this->assertEquals(0, $this->queue->numberOfItems()); + } + } -- 2.25.1