4 +--------------------------------------------------------------------+
5 | Copyright CiviCRM LLC. All rights reserved. |
7 | This work is published under the GNU AGPLv3 license with some |
8 | permitted exceptions and without any warranty. For full license |
9 | and copyright information, see https://civicrm.org/licensing |
10 +--------------------------------------------------------------------+
16 * @copyright CiviCRM LLC https://civicrm.org/licensing
19 namespace api\v
4\Entity
;
21 use api\v
4\Api4TestBase
;
23 use Civi\Api4\UserJob
;
24 use Civi\Core\Event\GenericHookEvent
;
30 class QueueTest
extends Api4TestBase
{
32 protected function setUp(): void
{
33 \Civi
::$statics[__CLASS__
] = [
34 'doSomethingResult' => TRUE,
35 'doSomethingLog' => [],
36 'onHookQueueRunLog' => [],
42 * Setup a queue with a line of back-to-back tasks.
44 * The first task runs normally. The second task fails at first, but it is retried, and then
47 * @throws \API_Exception
48 * @throws \Civi\API\Exception\UnauthorizedException
50 public function testBasicLinearPolling() {
51 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
52 $queue = \Civi
::queue($queueName, [
57 'retry_interval' => 4,
59 $this->assertEquals(0, $queue->numberOfItems());
61 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
62 [QueueTest
::class, 'doSomething'],
65 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
66 [QueueTest
::class, 'doSomething'],
70 // Get item #1. Run it. Finish it.
71 $first = Queue
::claimItems()->setQueue($queueName)->execute()->single();
72 $this->assertCallback('doSomething', ['first'], $first);
73 $this->assertEquals(0, count(Queue
::claimItems()->setQueue($queueName)->execute()), 'Linear queue should not return more items while first item is pending.');
74 $firstResult = Queue
::runItems(0)->setItems([$first])->execute()->single();
75 $this->assertEquals('ok', $firstResult['outcome']);
76 $this->assertEquals($first['id'], $firstResult['item']['id']);
77 $this->assertEquals($first['queue'], $firstResult['item']['queue']);
78 $this->assertEquals(['first_ok'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
80 // Get item #2. Run it - but fail!
81 $second = Queue
::claimItems()->setQueue($queueName)->execute()->single();
82 $this->assertCallback('doSomething', ['second'], $second);
83 \Civi
::$statics[__CLASS__
]['doSomethingResult'] = FALSE;
84 $secondResult = Queue
::runItems(0)->setItems([$second])->execute()->single();
85 \Civi
::$statics[__CLASS__
]['doSomethingResult'] = TRUE;
86 $this->assertEquals('retry', $secondResult['outcome']);
87 $this->assertEquals(['first_ok', 'second_err'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
89 // Item #2 is delayed... it'll take a few seconds to come up...
90 $waitCount = $this->waitFor(1.0, 10, function() use ($queueName, &$retrySecond): bool {
91 $retrySecond = Queue
::claimItems()->setQueue($queueName)->execute()->first();
92 return !empty($retrySecond);
94 $this->assertTrue($waitCount > 0, 'Failed task should not become available immediately. It should take a few seconds.');
95 $this->assertCallback('doSomething', ['second'], $retrySecond);
96 $retrySecondResult = Queue
::runItems(0)->setItems([$retrySecond])->execute()->single();
97 $this->assertEquals('ok', $retrySecondResult['outcome']);
98 $this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
101 $this->assertEquals(0, $queue->numberOfItems());
104 public function testBasicParallelPolling() {
105 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
106 $queue = \Civi
::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
107 $this->assertEquals(0, $queue->numberOfItems());
109 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
110 [QueueTest
::class, 'doSomething'],
113 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
114 [QueueTest
::class, 'doSomething'],
118 $first = Queue
::claimItems()->setQueue($queueName)->execute()->single();
119 $second = Queue
::claimItems()->setQueue($queueName)->execute()->single();
121 $this->assertCallback('doSomething', ['first'], $first);
122 $this->assertCallback('doSomething', ['second'], $second);
124 // Just for fun, let's run these tasks in opposite order.
126 Queue
::runItems(0)->setItems([$second])->execute();
127 $this->assertEquals(['second_ok'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
129 Queue
::runItems(0)->setItems([$first])->execute();
130 $this->assertEquals(['second_ok', 'first_ok'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
132 $this->assertEquals(0, $queue->numberOfItems());
136 * Create a parallel queue. Claim and execute tasks as batches.
138 * Batches are executed via `hook_civicrm_queueRun_{runner}`.
140 * @throws \API_Exception
141 * @throws \Civi\API\Exception\UnauthorizedException
143 public function testBatchParallelPolling() {
144 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
145 \Civi
::dispatcher()->addListener('hook_civicrm_queueRun_testStuff', [$this, 'onHookQueueRun']);
146 $queue = \Civi
::queue($queueName, [
147 'type' => 'SqlParallel',
148 'runner' => 'testStuff',
152 $this->assertEquals(0, $queue->numberOfItems());
154 for ($i = 0; $i < 7; $i++
) {
155 \Civi
::queue($queueName)->createItem(['thingy' => $i]);
158 $result = Queue
::runItems(0)->setQueue($queueName)->execute();
159 $this->assertEquals(3, count($result));
160 $this->assertEquals([0, 1, 2], \Civi
::$statics[__CLASS__
]['onHookQueueRunLog'][0]);
162 $result = Queue
::runItems(0)->setQueue($queueName)->execute();
163 $this->assertEquals(3, count($result));
164 $this->assertEquals([3, 4, 5], \Civi
::$statics[__CLASS__
]['onHookQueueRunLog'][1]);
166 $result = Queue
::runItems(0)->setQueue($queueName)->execute();
167 $this->assertEquals(1, count($result));
168 $this->assertEquals([6], \Civi
::$statics[__CLASS__
]['onHookQueueRunLog'][2]);
172 * @param \Civi\Core\Event\GenericHookEvent $e
173 * @see CRM_Utils_Hook::queueRun()
175 public function onHookQueueRun(GenericHookEvent
$e): void
{
176 \Civi
::$statics[__CLASS__
]['onHookQueueRunLog'][] = array_map(
178 return $item->data
['thingy'];
183 foreach ($e->items
as $itemKey => $item) {
184 $e->outcomes
[$itemKey] = 'ok';
185 $e->queue
->deleteItem($item);
189 public function testSelect() {
190 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
191 $queue = \Civi
::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
192 $this->assertEquals(0, $queue->numberOfItems());
194 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
195 [QueueTest
::class, 'doSomething'],
199 $first = Queue
::claimItems()->setQueue($queueName)->setSelect(['id', 'queue'])->execute()->single();
200 $this->assertTrue(is_numeric($first['id']));
201 $this->assertEquals($queueName, $first['queue']);
202 $this->assertFalse(isset($first['data']));
205 public function testEmptyPoll() {
206 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
207 $queue = \Civi
::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']);
208 $this->assertEquals(0, $queue->numberOfItems());
210 $startResult = Queue
::claimItems()->setQueue($queueName)->execute();
211 $this->assertEquals(0, $startResult->count());
214 public function getErrorModes(): array {
216 'delete' => ['delete'],
217 'abort' => ['abort'],
222 * Add a task which is never going to succeed. Try it multiple times (until we run out
225 * @param string $errorMode
226 * Either 'delete' or 'abort'
227 * @dataProvider getErrorModes
229 public function testRetryWithPoliteExhaustion(string $errorMode) {
230 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
231 $queue = \Civi
::queue($queueName, [
234 'error' => $errorMode,
236 'retry_interval' => 1,
238 $this->assertEquals(0, $queue->numberOfItems());
240 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
241 [QueueTest
::class, 'doSomething'],
242 ['nogooddirtyscoundrel']
245 \Civi
::$statics[__CLASS__
]['doSomethingResult'] = FALSE;
247 $this->waitFor(0.5, 15, function() use ($queueName, &$outcomes) {
248 $claimed = Queue
::claimItems(0)->setQueue($queueName)->execute()->first();
252 $result = Queue
::runItems(0)->setItems([$claimed])->execute()->first();
253 $outcomes[] = $result['outcome'];
254 return ($result['outcome'] !== 'retry');
257 $this->assertEquals(['retry', 'retry', $errorMode], $outcomes);
259 ['nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err'],
260 \Civi
::$statics[__CLASS__
]['doSomethingLog']
263 $expectActive = ['delete' => TRUE, 'abort' => FALSE];
264 $this->assertEquals($expectActive[$errorMode], $queue->isActive());
268 * Add a task. The task-running agent is a bit delinquent... so it forgets the first
269 * few tasks. But the third one works!
271 public function testRetryWithDelinquencyAndSuccess() {
272 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
273 $queue = \Civi
::queue($queueName, [
278 'retry_interval' => 0,
281 $this->assertEquals(0, $queue->numberOfItems());
283 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
284 [QueueTest
::class, 'doSomething'],
287 $this->assertEquals(1, $queue->numberOfItems());
289 $claim1 = $this->waitForClaim(0.5, 5, $queueName);
290 // Oops, don't do anything with claim #1!
291 $this->assertEquals(1, $queue->numberOfItems());
292 $this->assertEquals([], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
294 $claim2 = $this->waitForClaim(0.5, 5, $queueName);
295 // Oops, don't do anything with claim #2!
296 $this->assertEquals(1, $queue->numberOfItems());
297 $this->assertEquals([], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
299 $claim3 = $this->waitForClaim(0.5, 5, $queueName);
300 $this->assertEquals(1, $queue->numberOfItems());
301 $result = Queue
::runItems(0)->setItems([$claim3])->execute()->first();
302 $this->assertEquals(0, $queue->numberOfItems());
303 $this->assertEquals(['playinghooky_ok'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
304 $this->assertEquals('ok', $result['outcome']);
308 * Add a task which is never going to succeed. The task fails every time, and eventually
309 * we either delete it or abort the queue.
311 * @param string $errorMode
312 * Either 'delete' or 'abort'
313 * @dataProvider getErrorModes
315 public function testRetryWithEventualFailure(string $errorMode) {
316 \Civi
::$statics[__CLASS__
]['doSomethingResult'] = FALSE;
318 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
319 $queue = \Civi
::queue($queueName, [
322 'error' => $errorMode,
324 'retry_interval' => 0,
327 $this->assertEquals(0, $queue->numberOfItems());
329 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
330 [QueueTest
::class, 'doSomething'],
333 $this->assertEquals(1, $queue->numberOfItems());
335 $claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) {
336 $claim = $this->waitForClaim(0.5, 5, $queueName);
337 $this->assertEquals(1, $queue->numberOfItems());
338 $result = Queue
::runItems(0)->setItems([$claim])->execute()->first();
339 $this->assertEquals($expectEndCount, $queue->numberOfItems());
340 $this->assertEquals($expectOutcome, $result['outcome']);
343 $claimAndRun('retry', 1);
344 $claimAndRun('retry', 1);
345 switch ($errorMode) {
347 $claimAndRun('delete', 0);
348 $this->assertEquals(TRUE, $queue->isActive());
352 $claimAndRun('abort', 1);
353 $this->assertEquals(FALSE, $queue->isActive());
357 $this->assertEquals(['playinghooky_err', 'playinghooky_err', 'playinghooky_err'], \Civi
::$statics[__CLASS__
]['doSomethingLog']);
361 * If a queue is created as part of a user-job, then it has a fixed scope-of-work. The status
362 * should flip after completing its work.
364 * @throws \API_Exception
365 * @throws \CRM_Core_Exception
366 * @throws \Civi\API\Exception\UnauthorizedException
368 public function testUserJobQueue_Completion() {
369 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_userjob';
371 $firedQueueStatus = [];
372 \Civi
::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) {
373 $firedQueueStatus[$e->queue
->getName()] = $e->status
;
376 $queue = \Civi
::queue($queueName, [
381 $this->assertEquals(0, $queue->numberOfItems());
383 $userJob = \Civi\Api4\UserJob
::create(FALSE)->setValues([
384 'job_type:label' => 'Contact Import',
385 'status_id:name' => 'in_progress',
386 'queue_id.name' => $queue->getName(),
387 ])->execute()->single();
389 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
390 [QueueTest
::class, 'doSomething'],
393 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
394 [QueueTest
::class, 'doSomething'],
398 // Verify initial status
399 $this->assertEquals(2, $queue->numberOfItems());
400 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
401 $this->assertEquals(TRUE, $queue->isActive());
402 $this->assertEquals(4, UserJob
::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
404 // OK, let's run both items - and check status afterward.
405 Queue
::runItems(FALSE)->setQueue($queueName)->execute()->single();
406 $this->assertEquals(1, $queue->numberOfItems());
407 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
408 $this->assertEquals(TRUE, $queue->isActive());
409 $this->assertEquals(4, UserJob
::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
411 Queue
::runItems(FALSE)->setQueue($queueName)->execute()->single();
412 $this->assertEquals(0, $queue->numberOfItems());
413 $this->assertEquals('completed', $firedQueueStatus[$queueName]);
414 $this->assertEquals(FALSE, $queue->isActive());
415 $this->assertEquals(1, UserJob
::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
419 * If a queue is created as a long-term service, then its work is never complete.
421 * @throws \API_Exception
422 * @throws \CRM_Core_Exception
423 * @throws \Civi\API\Exception\UnauthorizedException
425 public function testServiceQueue_NeverComplete() {
426 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_service';
428 $firedQueueStatus = [];
429 \Civi
::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) {
430 $firedQueueStatus[$e->queue
->getName()] = $e->status
;
433 $queue = \Civi
::queue($queueName, [
438 $this->assertEquals(0, $queue->numberOfItems());
440 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
441 [QueueTest
::class, 'doSomething'],
444 \Civi
::queue($queueName)->createItem(new \
CRM_Queue_Task(
445 [QueueTest
::class, 'doSomething'],
449 // Verify initial status
450 $this->assertEquals(2, $queue->numberOfItems());
451 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
452 $this->assertEquals(TRUE, $queue->isActive());
454 // OK, let's run both items - and check status afterward.
455 Queue
::runItems(FALSE)->setQueue($queueName)->execute()->single();
456 $this->assertEquals(1, $queue->numberOfItems());
457 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
458 $this->assertEquals(TRUE, $queue->isActive());
460 Queue
::runItems(FALSE)->setQueue($queueName)->execute()->single();
461 $this->assertEquals(0, $queue->numberOfItems());
462 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
463 $this->assertEquals(TRUE, $queue->isActive());
466 public static function doSomething(\CRM_Queue_TaskContext
$ctx, string $something) {
467 $ok = \Civi
::$statics[__CLASS__
]['doSomethingResult'];
468 \Civi
::$statics[__CLASS__
]['doSomethingLog'][] = $something . ($ok ?
'_ok' : '_err');
472 protected function assertCallback($expectMethod, $expectArgs, $actualTask) {
473 $this->assertEquals([QueueTest
::class, $expectMethod], $actualTask['data']['callback'], 'Claimed task should have expected method');
474 $this->assertEquals($expectArgs, $actualTask['data']['arguments'], 'Claimed task should have expected arguments');
477 protected function waitForClaim(float $interval, float $timeout, string $queueName): ?
array {
479 $this->waitFor($interval, $timeout, function() use ($queueName, &$claims) {
480 $claimed = Queue
::claimItems(0)->setQueue($queueName)->execute()->first();
484 $claims[] = $claimed;
487 return $claims[0] ??
NULL;
491 * Repeatedly check $condition until it returns true (or until we exhaust timeout).
493 * @param float $interval
494 * Seconds to wait between checks.
495 * @param float $timeout
496 * Total maximum seconds to wait across all checks.
497 * @param callable $condition
498 * The condition to check.
500 * Total number of intervals we had to wait/sleep.
502 protected function waitFor(float $interval, float $timeout, callable
$condition): int {
503 $end = microtime(TRUE) +
$timeout;
504 $interval *= round($interval * 1000 * 1000);
506 $ready = $condition();
507 while (!$ready && microtime(TRUE) <= $end) {
510 $ready = $condition();
512 $this->assertTrue($ready, 'Wait condition not met');