From c038970446c3827c28294629e31b61eeac224fc5 Mon Sep 17 00:00:00 2001 From: Tim Otten Date: Thu, 10 Feb 2022 19:32:02 -0800 Subject: [PATCH] CRM_Queue_Queue_* - Track and report `run_count` for each item Before: Whenever you `claimItem()` from the queue, it marks the item `release_time`. After: Whenever you `claimItem()` from the queue, it marks _both_ the `release_time` and the `run_count`. Comments: * This is the basis for enforcing a `retry_limit` policy. * This doesn't require any extra queries or joins - it fits into the existing update query. --- CRM/Queue/Queue/Memory.php | 16 ++++++++++++++++ CRM/Queue/Queue/Sql.php | 9 ++++++--- CRM/Queue/Queue/SqlParallel.php | 13 +++++++++---- tests/phpunit/CRM/Queue/QueueTest.php | 10 ++++++++++ 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/CRM/Queue/Queue/Memory.php b/CRM/Queue/Queue/Memory.php index db0c490bfe..abe8ee1236 100644 --- a/CRM/Queue/Queue/Memory.php +++ b/CRM/Queue/Queue/Memory.php @@ -26,6 +26,14 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { */ public $releaseTimes; + /** + * Number of times each queue item has been attempted. + * + * @var array + * array(queueItemId => int $count), + */ + protected $runCounts; + public $nextQueueItemId = 1; /** @@ -52,6 +60,7 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { public function createQueue() { $this->items = []; $this->releaseTimes = []; + $this->runCounts = []; } /** @@ -68,6 +77,7 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { public function deleteQueue() { $this->items = NULL; $this->releaseTimes = NULL; + $this->runCounts = NULL; } /** @@ -92,6 +102,7 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { $id = $this->nextQueueItemId++; // force copy, no unintendedsharing effects from pointers $this->items[$id] = serialize($data); + $this->runCounts[$id] = 0; } /** @@ -118,10 +129,12 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { $nowEpoch = CRM_Utils_Time::getTimeRaw(); if (empty($this->releaseTimes[$id]) || $this->releaseTimes[$id] < $nowEpoch) { $this->releaseTimes[$id] = $nowEpoch + $leaseTime; + $this->runCounts[$id]++; $item = new stdClass(); $item->id = $id; $item->data = unserialize($data); + $item->run_count = $this->runCounts[$id]; return $item; } else { @@ -147,10 +160,12 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { foreach ($this->items as $id => $data) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); $this->releaseTimes[$id] = $nowEpoch + $leaseTime; + $this->runCounts[$id]++; $item = new stdClass(); $item->id = $id; $item->data = unserialize($data); + $item->run_count = $this->runCounts[$id]; return $item; } // nothing in queue @@ -166,6 +181,7 @@ class CRM_Queue_Queue_Memory extends CRM_Queue_Queue { public function deleteItem($item) { unset($this->items[$item->id]); unset($this->releaseTimes[$item->id]); + unset($this->runCounts[$item->id]); } /** diff --git a/CRM/Queue/Queue/Sql.php b/CRM/Queue/Queue/Sql.php index 1cee3894c3..ff924af533 100644 --- a/CRM/Queue/Queue/Sql.php +++ b/CRM/Queue/Queue/Sql.php @@ -49,7 +49,7 @@ class CRM_Queue_Queue_Sql extends CRM_Queue_Queue { $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;'); $sql = " SELECT first_in_queue.* FROM ( - SELECT id, queue_name, submit_time, release_time, data + SELECT id, queue_name, submit_time, release_time, run_count, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC @@ -69,9 +69,11 @@ class CRM_Queue_Queue_Sql extends CRM_Queue_Queue { if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); - CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ + $dao->run_count++; + CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], + '3' => [$dao->run_count, 'Integer'], ]); // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?) // work-around: inconsistent date-formatting causes unintentional breakage @@ -98,7 +100,7 @@ class CRM_Queue_Queue_Sql extends CRM_Queue_Queue { */ public function stealItem($lease_time = 3600) { $sql = " - SELECT id, queue_name, submit_time, release_time, data + SELECT id, queue_name, submit_time, release_time, run_count, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC @@ -110,6 +112,7 @@ class CRM_Queue_Queue_Sql extends CRM_Queue_Queue { $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); + $dao->run_count++; CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], diff --git a/CRM/Queue/Queue/SqlParallel.php b/CRM/Queue/Queue/SqlParallel.php index bf48d45e93..8ca07e07dd 100644 --- a/CRM/Queue/Queue/SqlParallel.php +++ b/CRM/Queue/Queue/SqlParallel.php @@ -47,7 +47,7 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { $result = NULL; $dao = CRM_Core_DAO::executeQuery('LOCK TABLES civicrm_queue_item WRITE;'); - $sql = "SELECT id, queue_name, submit_time, release_time, data + $sql = "SELECT id, queue_name, submit_time, release_time, run_count, data FROM civicrm_queue_item WHERE queue_name = %1 AND (release_time IS NULL OR release_time < %2) @@ -66,10 +66,13 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); - CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ + $dao->run_count++; + CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], + '3' => [$dao->run_count, 'Integer'], ]); + // (Comment by artfulrobot Sep 2019: Not sure what the below comment means, should be removed/clarified?) // work-around: inconsistent date-formatting causes unintentional breakage # $dao->submit_time = date('YmdHis', strtotime($dao->submit_time)); @@ -95,7 +98,7 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { */ public function stealItem($lease_time = 3600) { $sql = " - SELECT id, queue_name, submit_time, release_time, data + SELECT id, queue_name, submit_time, release_time, run_count, data FROM civicrm_queue_item WHERE queue_name = %1 ORDER BY weight ASC, id ASC @@ -107,9 +110,11 @@ class CRM_Queue_Queue_SqlParallel extends CRM_Queue_Queue { $dao = CRM_Core_DAO::executeQuery($sql, $params, TRUE, 'CRM_Queue_DAO_QueueItem'); if ($dao->fetch()) { $nowEpoch = CRM_Utils_Time::getTimeRaw(); - CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1 WHERE id = %2", [ + $dao->run_count++; + CRM_Core_DAO::executeQuery("UPDATE civicrm_queue_item SET release_time = %1, run_count = %3 WHERE id = %2", [ '1' => [date('YmdHis', $nowEpoch + $lease_time), 'String'], '2' => [$dao->id, 'Integer'], + '3' => [$dao->run_count, 'Integer'], ]); $dao->data = unserialize($dao->data); return $dao; diff --git a/tests/phpunit/CRM/Queue/QueueTest.php b/tests/phpunit/CRM/Queue/QueueTest.php index 1b20bb7e51..a5416e5ae8 100644 --- a/tests/phpunit/CRM/Queue/QueueTest.php +++ b/tests/phpunit/CRM/Queue/QueueTest.php @@ -134,11 +134,13 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $this->assertEquals(3, $this->queue->numberOfItems()); $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); $this->assertEquals(2, $this->queue->numberOfItems()); $item = $this->queue->claimItem(); $this->assertEquals('b', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); $this->queue->createItem([ @@ -148,11 +150,13 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $this->assertEquals(2, $this->queue->numberOfItems()); $item = $this->queue->claimItem(); $this->assertEquals('c', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); $this->assertEquals(1, $this->queue->numberOfItems()); $item = $this->queue->claimItem(); $this->assertEquals('d', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->queue->deleteItem($item); $this->assertEquals(0, $this->queue->numberOfItems()); @@ -174,12 +178,14 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->assertEquals(1, $this->queue->numberOfItems()); $this->queue->releaseItem($item); $this->assertEquals(1, $this->queue->numberOfItems()); $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); + $this->assertEquals(2, $item->run_count); $this->queue->deleteItem($item); $this->assertEquals(0, $this->queue->numberOfItems()); @@ -203,6 +209,7 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->assertEquals(1, $this->queue->numberOfItems()); // forget to release @@ -215,6 +222,7 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { CRM_Utils_Time::setTime('2012-04-01 2:00:03'); $item3 = $this->queue->claimItem(); $this->assertEquals('a', $item3->data['test-key']); + $this->assertEquals(2, $item3->run_count); $this->assertEquals(1, $this->queue->numberOfItems()); $this->queue->deleteItem($item3); @@ -238,6 +246,7 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { $item = $this->queue->claimItem(); $this->assertEquals('a', $item->data['test-key']); + $this->assertEquals(1, $item->run_count); $this->assertEquals(1, $this->queue->numberOfItems()); // forget to release @@ -249,6 +258,7 @@ class CRM_Queue_QueueTest extends CiviUnitTestCase { // but stealItem works $item3 = $this->queue->stealItem(); $this->assertEquals('a', $item3->data['test-key']); + $this->assertEquals(2, $item3->run_count); $this->assertEquals(1, $this->queue->numberOfItems()); $this->queue->deleteItem($item3); -- 2.25.1