Civi\Api4\Queue - Add APIs for claiming and running enqueued tasks
authorTim Otten <totten@civicrm.org>
Wed, 2 Feb 2022 08:41:03 +0000 (00:41 -0800)
committerTim Otten <totten@civicrm.org>
Thu, 2 Jun 2022 20:31:59 +0000 (13:31 -0700)
Civi/Api4/Action/Queue/ClaimItems.php [new file with mode: 0644]
Civi/Api4/Action/Queue/RunItems.php [new file with mode: 0644]
Civi/Api4/Queue.php
tests/phpunit/api/v4/Entity/QueueTest.php [new file with mode: 0644]

diff --git a/Civi/Api4/Action/Queue/ClaimItems.php b/Civi/Api4/Action/Queue/ClaimItems.php
new file mode 100644 (file)
index 0000000..8d05495
--- /dev/null
@@ -0,0 +1,90 @@
+<?php
+
+/*
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved.                        |
+ |                                                                    |
+ | This work is published under the GNU AGPLv3 license with some      |
+ | permitted exceptions and without any warranty. For full license    |
+ | and copyright information, see https://civicrm.org/licensing       |
+ +--------------------------------------------------------------------+
+ */
+
+namespace Civi\Api4\Action\Queue;
+
+use Civi\Api4\Generic\Traits\SelectParamTrait;
+
+/**
+ * Claim an item from the queue.  Returns zero or one items.
+ *
+ * @method ?string setQueue
+ * @method $this setQueue(?string $queue)
+ */
+class ClaimItems extends \Civi\Api4\Generic\AbstractAction {
+
+  use SelectParamTrait;
+
+  /**
+   * Name of the target queue.
+   *
+   * @var string|null
+   */
+  protected $queue;
+
+  public function _run(\Civi\Api4\Generic\Result $result) {
+    $this->select = empty($this->select) ? ['id', 'data', 'queue'] : $this->select;
+    $queue = $this->queue();
+    if (!$queue->isActive()) {
+      return;
+    }
+
+    $isBatch = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface;
+    $limit = $queue->getSpec('batch_limit') ?: 1;
+    if ($limit > 1 && !$isBatch) {
+      throw new \API_Exception(sprintf('Queue "%s" (%s) does not support batching.', $queue->getName(), get_class($queue)));
+      // Note 1: Simply looping over `claimItem()` is unlikley to help the consumer b/c
+      // drivers like Sql+Memory are linear+blocking.
+      // Note 2: The default is batch_limit=1. So someone has specifically chosen an invalid configuration...
+    }
+    $items = $isBatch ? $queue->claimItems($limit) : [$queue->claimItem()];
+
+    foreach ($items as $item) {
+      if ($item) {
+        $result[] = $this->convertItemToStub($item);
+      }
+    }
+  }
+
+  /**
+   * @param \CRM_Queue_DAO_QueueItem|\stdClass $item
+   * @return array
+   */
+  protected function convertItemToStub(object $item): array {
+    $array = [];
+    foreach ($this->select as $field) {
+      switch ($field) {
+        case 'id':
+          $array['id'] = $item->id;
+          break;
+
+        case 'data':
+          $array['data'] = (array) $item->data;
+          break;
+
+        case 'queue':
+          $array['queue'] = $this->queue;
+          break;
+
+      }
+    }
+    return $array;
+  }
+
+  protected function queue(): \CRM_Queue_Queue {
+    if (empty($this->queue)) {
+      throw new \API_Exception('Missing required parameter: $queue');
+    }
+    return \Civi::queue($this->queue);
+  }
+
+}
diff --git a/Civi/Api4/Action/Queue/RunItems.php b/Civi/Api4/Action/Queue/RunItems.php
new file mode 100644 (file)
index 0000000..b65ccf7
--- /dev/null
@@ -0,0 +1,119 @@
+<?php
+
+namespace Civi\Api4\Action\Queue;
+
+/**
+ * Run an enqueued item (task).
+ *
+ * You must either:
+ *
+ * - (a) Give the target queue-item specifically (`setItem()`). Useful if you called `claimItem()` separately.
+ * - (b) Give the name of the queue from which to find an item (`setQueue()`).
+ *
+ * Note: If you use `setItem()`, the inputted will be validated (refetched) to ensure authenticity of all details.
+ *
+ * Returns 0 or 1 records which indicate the outcome of running the chosen task.
+ *
+ * ```php
+ * $todo = Civi\Api4\Queue::claimItem()->setQueue($item)->setLeaseTime(600)->execute()->single();
+ * $result = Civi\Api4\Queue::runItem()->setItem($todo)->execute()->single();
+ * assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
+ *
+ * $result = Civi\Api4\Queue::runItem()->setQueue('foo')->execute()->first();
+ * assert(in_array($result['outcome'], ['ok', 'retry', 'fail']))
+ * ```
+ *
+ * Valid outcomes are:
+ * - 'ok': Task executed normally. Removed from queue.
+ * - 'retry': Task encountered an error. Will try again later.
+ * - 'fail': Task encountered an error. Will not try again later. Removed from queue.
+ *
+ * @method $this setItem(?array $item)
+ * @method ?array getItem()
+ * @method ?string setQueue
+ * @method $this setQueue(?string $queue)
+ */
+class RunItems extends \Civi\Api4\Generic\AbstractAction {
+
+  /**
+   * Previously claimed item - which should now be released.
+   *
+   * @var array|null
+   *   Fields: {id: scalar, queue: string}
+   */
+  protected $items;
+
+  /**
+   * Name of the target queue.
+   *
+   * @var string|null
+   */
+  protected $queue;
+
+  public function _run(\Civi\Api4\Generic\Result $result) {
+    if (!empty($this->items)) {
+      $this->validateItemStubs();
+      $queue = \Civi::queue($this->items[0]['queue']);
+      $ids = \CRM_Utils_Array::collect('id', $this->items);
+      if (count($ids) > 1 && !($queue instanceof \CRM_Queue_Queue_BatchQueueInterface)) {
+        throw new \API_Exception("runItems: Error: Running multiple items requires BatchQueueInterface");
+      }
+      if (count($ids) > 1) {
+        $items = $queue->fetchItems($ids);
+      }
+      else {
+        $items = [$queue->fetchItem($ids[0])];
+      }
+    }
+    elseif (!empty($this->queue)) {
+      $queue = \Civi::queue($this->queue);
+      if (!$queue->isActive()) {
+        return;
+      }
+      $items = $queue instanceof \CRM_Queue_Queue_BatchQueueInterface
+        ? $queue->claimItems($queue->getSpec('batch_limit') ?: 1)
+        : [$queue->claimItem()];
+    }
+    else {
+      throw new \API_Exception("runItems: Requires either 'queue' or 'item'.");
+    }
+
+    if (empty($items)) {
+      return;
+    }
+
+    $outcomes = [];
+    \CRM_Utils_Hook::queueRun($queue, $items, $outcomes);
+    if (empty($outcomes)) {
+      throw new \API_Exception(sprintf('Failed to run queue items (name=%s, runner=%s, itemCount=%d, outcomeCount=%d)',
+        $queue->getName(), $queue->getSpec('runner'), count($items), count($outcomes)));
+    }
+    foreach ($items as $itemPos => $item) {
+      $result[] = ['outcome' => $outcomes[$itemPos], 'item' => $this->createItemStub($item)];
+    }
+  }
+
+  private function validateItemStubs(): void {
+    $queueNames = [];
+    if (!isset($this->items[0])) {
+      throw new \API_Exception("Queue items must be given as numeric array.");
+    }
+    foreach ($this->items as $item) {
+      if (empty($item['queue'])) {
+        throw new \API_Exception("Queue item requires property 'queue'.");
+      }
+      if (empty($item['id'])) {
+        throw new \API_Exception("Queue item requires property 'id'.");
+      }
+      $queueNames[$item['queue']] = 1;
+    }
+    if (count($queueNames) > 1) {
+      throw new \API_Exception("Queue items cannot be mixed. Found queues: " . implode(', ', array_keys($queueNames)));
+    }
+  }
+
+  private function createItemStub($item): array {
+    return ['id' => $item->id, 'queue' => $item->queue_name];
+  }
+
+}
index e3ae1acbf3524611d9c75f2815b9bd3ab2670e7e..86173f6d3c64e5f7ddb5856d1d66164ced38c2e6 100644 (file)
@@ -10,6 +10,9 @@
  */
 namespace Civi\Api4;
 
+use Civi\Api4\Action\Queue\ClaimItems;
+use Civi\Api4\Action\Queue\RunItems;
+
 /**
  * Track a list of durable/scannable queues.
  *
@@ -31,7 +34,34 @@ class Queue extends \Civi\Api4\Generic\DAOEntity {
     return [
       'meta' => ['access CiviCRM'],
       'default' => ['administer queues'],
+      'runItem' => [\CRM_Core_Permission::ALWAYS_DENY_PERMISSION],
     ];
   }
 
+  /**
+   * Claim an item from the queue. Returns zero or one items.
+   *
+   * Note: This is appropriate for persistent, auto-run queues.
+   *
+   * @param bool $checkPermissions
+   * @return \Civi\Api4\Action\Queue\ClaimItems
+   */
+  public static function claimItems($checkPermissions = TRUE) {
+    return (new ClaimItems(static::getEntityName(), __FUNCTION__))
+      ->setCheckPermissions($checkPermissions);
+  }
+
+  /**
+   * Run an item from the queue.
+   *
+   * Note: This is appropriate for persistent, auto-run queues.
+   *
+   * @param bool $checkPermissions
+   * @return \Civi\Api4\Action\Queue\RunItems
+   */
+  public static function runItems($checkPermissions = TRUE) {
+    return (new RunItems(static::getEntityName(), __FUNCTION__))
+      ->setCheckPermissions($checkPermissions);
+  }
+
 }
diff --git a/tests/phpunit/api/v4/Entity/QueueTest.php b/tests/phpunit/api/v4/Entity/QueueTest.php
new file mode 100644 (file)
index 0000000..1e0f705
--- /dev/null
@@ -0,0 +1,409 @@
+<?php
+
+/*
+ +--------------------------------------------------------------------+
+ | Copyright CiviCRM LLC. All rights reserved.                        |
+ |                                                                    |
+ | This work is published under the GNU AGPLv3 license with some      |
+ | permitted exceptions and without any warranty. For full license    |
+ | and copyright information, see https://civicrm.org/licensing       |
+ +--------------------------------------------------------------------+
+ */
+
+/**
+ *
+ * @package CRM
+ * @copyright CiviCRM LLC https://civicrm.org/licensing
+ */
+
+namespace api\v4\Entity;
+
+use api\v4\Api4TestBase;
+use Civi\Api4\Queue;
+use Civi\Core\Event\GenericHookEvent;
+
+/**
+ * @group headless
+ * @group queue
+ */
+class QueueTest extends Api4TestBase {
+
+  protected function setUp(): void {
+    \Civi::$statics[__CLASS__] = [
+      'doSomethingResult' => TRUE,
+      'doSomethingLog' => [],
+      'onHookQueueRunLog' => [],
+    ];
+    parent::setUp();
+  }
+
+  /**
+   * Setup a queue with a line of back-to-back tasks.
+   *
+   * The first task runs normally. The second task fails at first, but it is retried, and then
+   * succeeds.
+   *
+   * @throws \API_Exception
+   * @throws \Civi\API\Exception\UnauthorizedException
+   */
+  public function testBasicLinearPolling() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
+    $queue = \Civi::queue($queueName, [
+      'type' => 'Sql',
+      'runner' => 'task',
+      'error' => 'delete',
+      'retry_limit' => 2,
+      'retry_interval' => 4,
+    ]);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['first']
+    ));
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['second']
+    ));
+
+    // Get item #1. Run it. Finish it.
+    $first = Queue::claimItems()->setQueue($queueName)->execute()->single();
+    $this->assertCallback('doSomething', ['first'], $first);
+    $this->assertEquals(0, count(Queue::claimItems()->setQueue($queueName)->execute()), 'Linear queue should not return more items while first item is pending.');
+    $firstResult = Queue::runItems(0)->setItems([$first])->execute()->single();
+    $this->assertEquals('ok', $firstResult['outcome']);
+    $this->assertEquals($first['id'], $firstResult['item']['id']);
+    $this->assertEquals($first['queue'], $firstResult['item']['queue']);
+    $this->assertEquals(['first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    // Get item #2. Run it - but fail!
+    $second = Queue::claimItems()->setQueue($queueName)->execute()->single();
+    $this->assertCallback('doSomething', ['second'], $second);
+    \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
+    $secondResult = Queue::runItems(0)->setItems([$second])->execute()->single();
+    \Civi::$statics[__CLASS__]['doSomethingResult'] = TRUE;
+    $this->assertEquals('retry', $secondResult['outcome']);
+    $this->assertEquals(['first_ok', 'second_err'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    // Item #2 is delayed... it'll take a few seconds to come up...
+    $waitCount = $this->waitFor(1.0, 10, function() use ($queueName, &$retrySecond): bool {
+      $retrySecond = Queue::claimItems()->setQueue($queueName)->execute()->first();
+      return !empty($retrySecond);
+    });
+    $this->assertTrue($waitCount > 0, 'Failed task should not become available immediately. It should take a few seconds.');
+    $this->assertCallback('doSomething', ['second'], $retrySecond);
+    $retrySecondResult = Queue::runItems(0)->setItems([$retrySecond])->execute()->single();
+    $this->assertEquals('ok', $retrySecondResult['outcome']);
+    $this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    // All done.
+    $this->assertEquals(0, $queue->numberOfItems());
+  }
+
+  public function testBasicParallelPolling() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
+    $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['first']
+    ));
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['second']
+    ));
+
+    $first = Queue::claimItems()->setQueue($queueName)->execute()->single();
+    $second = Queue::claimItems()->setQueue($queueName)->execute()->single();
+
+    $this->assertCallback('doSomething', ['first'], $first);
+    $this->assertCallback('doSomething', ['second'], $second);
+
+    // Just for fun, let's run these tasks in opposite order.
+
+    Queue::runItems(0)->setItems([$second])->execute();
+    $this->assertEquals(['second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    Queue::runItems(0)->setItems([$first])->execute();
+    $this->assertEquals(['second_ok', 'first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    $this->assertEquals(0, $queue->numberOfItems());
+  }
+
+  /**
+   * Create a parallel queue. Claim and execute tasks as batches.
+   *
+   * Batches are executed via `hook_civicrm_queueRun_{runner}`.
+   *
+   * @throws \API_Exception
+   * @throws \Civi\API\Exception\UnauthorizedException
+   */
+  public function testBatchParallelPolling() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
+    \Civi::dispatcher()->addListener('hook_civicrm_queueRun_testStuff', [$this, 'onHookQueueRun']);
+    $queue = \Civi::queue($queueName, [
+      'type' => 'SqlParallel',
+      'runner' => 'testStuff',
+      'error' => 'delete',
+      'batch_limit' => 3,
+    ]);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    for ($i = 0; $i < 7; $i++) {
+      \Civi::queue($queueName)->createItem(['thingy' => $i]);
+    }
+
+    $result = Queue::runItems(0)->setQueue($queueName)->execute();
+    $this->assertEquals(3, count($result));
+    $this->assertEquals([0, 1, 2], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][0]);
+
+    $result = Queue::runItems(0)->setQueue($queueName)->execute();
+    $this->assertEquals(3, count($result));
+    $this->assertEquals([3, 4, 5], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][1]);
+
+    $result = Queue::runItems(0)->setQueue($queueName)->execute();
+    $this->assertEquals(1, count($result));
+    $this->assertEquals([6], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][2]);
+  }
+
+  /**
+   * @param \Civi\Core\Event\GenericHookEvent $e
+   * @see CRM_Utils_Hook::queueRun()
+   */
+  public function onHookQueueRun(GenericHookEvent $e): void {
+    \Civi::$statics[__CLASS__]['onHookQueueRunLog'][] = array_map(
+      function($item) {
+        return $item->data['thingy'];
+      },
+      $e->items
+    );
+
+    foreach ($e->items as $itemKey => $item) {
+      $e->outcomes[$itemKey] = 'ok';
+      $e->queue->deleteItem($item);
+    }
+  }
+
+  public function testSelect() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
+    $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['first']
+    ));
+
+    $first = Queue::claimItems()->setQueue($queueName)->setSelect(['id', 'queue'])->execute()->single();
+    $this->assertTrue(is_numeric($first['id']));
+    $this->assertEquals($queueName, $first['queue']);
+    $this->assertFalse(isset($first['data']));
+  }
+
+  public function testEmptyPoll() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
+    $queue = \Civi::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    $startResult = Queue::claimItems()->setQueue($queueName)->execute();
+    $this->assertEquals(0, $startResult->count());
+  }
+
+  public function getErrorModes(): array {
+    return [
+      'delete' => ['delete'],
+      'abort' => ['abort'],
+    ];
+  }
+
+  /**
+   * Add a task which is never going to succeed. Try it multiple times (until we run out
+   * of retries).
+   *
+   * @param string $errorMode
+   *   Either 'delete' or 'abort'
+   * @dataProvider getErrorModes
+   */
+  public function testRetryWithPoliteExhaustion(string $errorMode) {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
+    $queue = \Civi::queue($queueName, [
+      'type' => 'Sql',
+      'runner' => 'task',
+      'error' => $errorMode,
+      'retry_limit' => 2,
+      'retry_interval' => 1,
+    ]);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['nogooddirtyscoundrel']
+    ));
+
+    \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
+    $outcomes = [];
+    $this->waitFor(0.5, 15, function() use ($queueName, &$outcomes) {
+      $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first();
+      if (!$claimed) {
+        return FALSE;
+      }
+      $result = Queue::runItems(0)->setItems([$claimed])->execute()->first();
+      $outcomes[] = $result['outcome'];
+      return ($result['outcome'] !== 'retry');
+    });
+
+    $this->assertEquals(['retry', 'retry', $errorMode], $outcomes);
+    $this->assertEquals(
+      ['nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err'],
+      \Civi::$statics[__CLASS__]['doSomethingLog']
+    );
+
+    $expectActive = ['delete' => TRUE, 'abort' => FALSE];
+    $this->assertEquals($expectActive[$errorMode], $queue->isActive());
+  }
+
+  /**
+   * Add a task. The task-running agent is a bit delinquent... so it forgets the first
+   * few tasks. But the third one works!
+   */
+  public function testRetryWithDelinquencyAndSuccess() {
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
+    $queue = \Civi::queue($queueName, [
+      'type' => 'Sql',
+      'runner' => 'task',
+      'error' => 'delete',
+      'retry_limit' => 2,
+      'retry_interval' => 0,
+      'lease_time' => 1,
+    ]);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['playinghooky']
+    ));
+    $this->assertEquals(1, $queue->numberOfItems());
+
+    $claim1 = $this->waitForClaim(0.5, 5, $queueName);
+    // Oops, don't do anything with claim #1!
+    $this->assertEquals(1, $queue->numberOfItems());
+    $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    $claim2 = $this->waitForClaim(0.5, 5, $queueName);
+    // Oops, don't do anything with claim #2!
+    $this->assertEquals(1, $queue->numberOfItems());
+    $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
+
+    $claim3 = $this->waitForClaim(0.5, 5, $queueName);
+    $this->assertEquals(1, $queue->numberOfItems());
+    $result = Queue::runItems(0)->setItems([$claim3])->execute()->first();
+    $this->assertEquals(0, $queue->numberOfItems());
+    $this->assertEquals(['playinghooky_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+    $this->assertEquals('ok', $result['outcome']);
+  }
+
+  /**
+   * Add a task which is never going to succeed. The task fails every time, and eventually
+   * we either delete it or abort the queue.
+   *
+   * @param string $errorMode
+   *   Either 'delete' or 'abort'
+   * @dataProvider getErrorModes
+   */
+  public function testRetryWithEventualFailure(string $errorMode) {
+    \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
+
+    $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
+    $queue = \Civi::queue($queueName, [
+      'type' => 'Sql',
+      'runner' => 'task',
+      'error' => $errorMode,
+      'retry_limit' => 2,
+      'retry_interval' => 0,
+      'lease_time' => 1,
+    ]);
+    $this->assertEquals(0, $queue->numberOfItems());
+
+    \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
+      [QueueTest::class, 'doSomething'],
+      ['playinghooky']
+    ));
+    $this->assertEquals(1, $queue->numberOfItems());
+
+    $claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) {
+      $claim = $this->waitForClaim(0.5, 5, $queueName);
+      $this->assertEquals(1, $queue->numberOfItems());
+      $result = Queue::runItems(0)->setItems([$claim])->execute()->first();
+      $this->assertEquals($expectEndCount, $queue->numberOfItems());
+      $this->assertEquals($expectOutcome, $result['outcome']);
+    };
+
+    $claimAndRun('retry', 1);
+    $claimAndRun('retry', 1);
+    switch ($errorMode) {
+      case 'delete':
+        $claimAndRun('delete', 0);
+        $this->assertEquals(TRUE, $queue->isActive());
+        break;
+
+      case 'abort':
+        $claimAndRun('abort', 1);
+        $this->assertEquals(FALSE, $queue->isActive());
+        break;
+    }
+
+    $this->assertEquals(['playinghooky_err', 'playinghooky_err', 'playinghooky_err'], \Civi::$statics[__CLASS__]['doSomethingLog']);
+  }
+
+  public static function doSomething(\CRM_Queue_TaskContext $ctx, string $something) {
+    $ok = \Civi::$statics[__CLASS__]['doSomethingResult'];
+    \Civi::$statics[__CLASS__]['doSomethingLog'][] = $something . ($ok ? '_ok' : '_err');
+    return $ok;
+  }
+
+  protected function assertCallback($expectMethod, $expectArgs, $actualTask) {
+    $this->assertEquals([QueueTest::class, $expectMethod], $actualTask['data']['callback'], 'Claimed task should have expected method');
+    $this->assertEquals($expectArgs, $actualTask['data']['arguments'], 'Claimed task should have expected arguments');
+  }
+
+  protected function waitForClaim(float $interval, float $timeout, string $queueName): ?array {
+    $claims = [];
+    $this->waitFor($interval, $timeout, function() use ($queueName, &$claims) {
+      $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first();
+      if (!$claimed) {
+        return FALSE;
+      }
+      $claims[] = $claimed;
+      return TRUE;
+    });
+    return $claims[0] ?? NULL;
+  }
+
+  /**
+   * Repeatedly check $condition until it returns true (or until we exhaust timeout).
+   *
+   * @param float $interval
+   *   Seconds to wait between checks.
+   * @param float $timeout
+   *   Total maximum seconds to wait across all checks.
+   * @param callable $condition
+   *   The condition to check.
+   * @return int
+   *   Total number of intervals we had to wait/sleep.
+   */
+  protected function waitFor(float $interval, float $timeout, callable $condition): int {
+    $end = microtime(TRUE) + $timeout;
+    $interval *= round($interval * 1000 * 1000);
+    $waitCount = 0;
+    $ready = $condition();
+    while (!$ready && microtime(TRUE) <= $end) {
+      usleep($interval);
+      $waitCount++;
+      $ready = $condition();
+    }
+    $this->assertTrue($ready, 'Wait condition not met');
+    return $waitCount;
+  }
+
+}