a809266fe8a9937af3729d8800625023137e428d
[civicrm-core.git] / tests / phpunit / api / v4 / Entity / QueueTest.php
1 <?php
2
3 /*
4 +--------------------------------------------------------------------+
5 | Copyright CiviCRM LLC. All rights reserved. |
6 | |
7 | This work is published under the GNU AGPLv3 license with some |
8 | permitted exceptions and without any warranty. For full license |
9 | and copyright information, see https://civicrm.org/licensing |
10 +--------------------------------------------------------------------+
11 */
12
13 /**
14 *
15 * @package CRM
16 * @copyright CiviCRM LLC https://civicrm.org/licensing
17 */
18
19 namespace api\v4\Entity;
20
21 use api\v4\Api4TestBase;
22 use Civi\Api4\Queue;
23 use Civi\Api4\UserJob;
24 use Civi\Core\Event\GenericHookEvent;
25
26 /**
27 * @group headless
28 * @group queue
29 */
30 class QueueTest extends Api4TestBase {
31
32 protected function setUp(): void {
33 \Civi::$statics[__CLASS__] = [
34 'doSomethingResult' => TRUE,
35 'doSomethingLog' => [],
36 'onHookQueueRunLog' => [],
37 ];
38 parent::setUp();
39 }
40
41 /**
42 * Setup a queue with a line of back-to-back tasks.
43 *
44 * The first task runs normally. The second task fails at first, but it is retried, and then
45 * succeeds.
46 *
47 * @throws \API_Exception
48 * @throws \Civi\API\Exception\UnauthorizedException
49 */
50 public function testBasicLinearPolling() {
51 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
52 $queue = \Civi::queue($queueName, [
53 'type' => 'Sql',
54 'runner' => 'task',
55 'error' => 'delete',
56 'retry_limit' => 2,
57 'retry_interval' => 4,
58 ]);
59 $this->assertEquals(0, $queue->numberOfItems());
60
61 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
62 [QueueTest::class, 'doSomething'],
63 ['first']
64 ));
65 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
66 [QueueTest::class, 'doSomething'],
67 ['second']
68 ));
69
70 // Get item #1. Run it. Finish it.
71 $first = Queue::claimItems()->setQueue($queueName)->execute()->single();
72 $this->assertCallback('doSomething', ['first'], $first);
73 $this->assertEquals(0, count(Queue::claimItems()->setQueue($queueName)->execute()), 'Linear queue should not return more items while first item is pending.');
74 $firstResult = Queue::runItems(0)->setItems([$first])->execute()->single();
75 $this->assertEquals('ok', $firstResult['outcome']);
76 $this->assertEquals($first['id'], $firstResult['item']['id']);
77 $this->assertEquals($first['queue'], $firstResult['item']['queue']);
78 $this->assertEquals(['first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
79
80 // Get item #2. Run it - but fail!
81 $second = Queue::claimItems()->setQueue($queueName)->execute()->single();
82 $this->assertCallback('doSomething', ['second'], $second);
83 \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
84 $secondResult = Queue::runItems(0)->setItems([$second])->execute()->single();
85 \Civi::$statics[__CLASS__]['doSomethingResult'] = TRUE;
86 $this->assertEquals('retry', $secondResult['outcome']);
87 $this->assertEquals(['first_ok', 'second_err'], \Civi::$statics[__CLASS__]['doSomethingLog']);
88
89 // Item #2 is delayed... it'll take a few seconds to come up...
90 $waitCount = $this->waitFor(1.0, 10, function() use ($queueName, &$retrySecond): bool {
91 $retrySecond = Queue::claimItems()->setQueue($queueName)->execute()->first();
92 return !empty($retrySecond);
93 });
94 $this->assertTrue($waitCount > 0, 'Failed task should not become available immediately. It should take a few seconds.');
95 $this->assertCallback('doSomething', ['second'], $retrySecond);
96 $retrySecondResult = Queue::runItems(0)->setItems([$retrySecond])->execute()->single();
97 $this->assertEquals('ok', $retrySecondResult['outcome']);
98 $this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
99
100 // All done.
101 $this->assertEquals(0, $queue->numberOfItems());
102 }
103
104 public function testBasicParallelPolling() {
105 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
106 $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
107 $this->assertEquals(0, $queue->numberOfItems());
108
109 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
110 [QueueTest::class, 'doSomething'],
111 ['first']
112 ));
113 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
114 [QueueTest::class, 'doSomething'],
115 ['second']
116 ));
117
118 $first = Queue::claimItems()->setQueue($queueName)->execute()->single();
119 $second = Queue::claimItems()->setQueue($queueName)->execute()->single();
120
121 $this->assertCallback('doSomething', ['first'], $first);
122 $this->assertCallback('doSomething', ['second'], $second);
123
124 // Just for fun, let's run these tasks in opposite order.
125
126 Queue::runItems(0)->setItems([$second])->execute();
127 $this->assertEquals(['second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
128
129 Queue::runItems(0)->setItems([$first])->execute();
130 $this->assertEquals(['second_ok', 'first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
131
132 $this->assertEquals(0, $queue->numberOfItems());
133 }
134
135 /**
136 * Create a parallel queue. Claim and execute tasks as batches.
137 *
138 * Batches are executed via `hook_civicrm_queueRun_{runner}`.
139 *
140 * @throws \API_Exception
141 * @throws \Civi\API\Exception\UnauthorizedException
142 */
143 public function testBatchParallelPolling() {
144 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
145 \Civi::dispatcher()->addListener('hook_civicrm_queueRun_testStuff', [$this, 'onHookQueueRun']);
146 $queue = \Civi::queue($queueName, [
147 'type' => 'SqlParallel',
148 'runner' => 'testStuff',
149 'error' => 'delete',
150 'batch_limit' => 3,
151 ]);
152 $this->assertEquals(0, $queue->numberOfItems());
153
154 for ($i = 0; $i < 7; $i++) {
155 \Civi::queue($queueName)->createItem(['thingy' => $i]);
156 }
157
158 $result = Queue::runItems(0)->setQueue($queueName)->execute();
159 $this->assertEquals(3, count($result));
160 $this->assertEquals([0, 1, 2], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][0]);
161
162 $result = Queue::runItems(0)->setQueue($queueName)->execute();
163 $this->assertEquals(3, count($result));
164 $this->assertEquals([3, 4, 5], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][1]);
165
166 $result = Queue::runItems(0)->setQueue($queueName)->execute();
167 $this->assertEquals(1, count($result));
168 $this->assertEquals([6], \Civi::$statics[__CLASS__]['onHookQueueRunLog'][2]);
169 }
170
171 /**
172 * @param \Civi\Core\Event\GenericHookEvent $e
173 * @see CRM_Utils_Hook::queueRun()
174 */
175 public function onHookQueueRun(GenericHookEvent $e): void {
176 \Civi::$statics[__CLASS__]['onHookQueueRunLog'][] = array_map(
177 function($item) {
178 return $item->data['thingy'];
179 },
180 $e->items
181 );
182
183 foreach ($e->items as $itemKey => $item) {
184 $e->outcomes[$itemKey] = 'ok';
185 $e->queue->deleteItem($item);
186 }
187 }
188
189 public function testSelect() {
190 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
191 $queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
192 $this->assertEquals(0, $queue->numberOfItems());
193
194 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
195 [QueueTest::class, 'doSomething'],
196 ['first']
197 ));
198
199 $first = Queue::claimItems()->setQueue($queueName)->setSelect(['id', 'queue'])->execute()->single();
200 $this->assertTrue(is_numeric($first['id']));
201 $this->assertEquals($queueName, $first['queue']);
202 $this->assertFalse(isset($first['data']));
203 }
204
205 public function testEmptyPoll() {
206 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
207 $queue = \Civi::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']);
208 $this->assertEquals(0, $queue->numberOfItems());
209
210 $startResult = Queue::claimItems()->setQueue($queueName)->execute();
211 $this->assertEquals(0, $startResult->count());
212 }
213
214 public function getErrorModes(): array {
215 return [
216 'delete' => ['delete'],
217 'abort' => ['abort'],
218 ];
219 }
220
221 /**
222 * Add a task which is never going to succeed. Try it multiple times (until we run out
223 * of retries).
224 *
225 * @param string $errorMode
226 * Either 'delete' or 'abort'
227 * @dataProvider getErrorModes
228 */
229 public function testRetryWithPoliteExhaustion(string $errorMode) {
230 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
231 $queue = \Civi::queue($queueName, [
232 'type' => 'Sql',
233 'runner' => 'task',
234 'error' => $errorMode,
235 'retry_limit' => 2,
236 'retry_interval' => 1,
237 ]);
238 $this->assertEquals(0, $queue->numberOfItems());
239
240 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
241 [QueueTest::class, 'doSomething'],
242 ['nogooddirtyscoundrel']
243 ));
244
245 \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
246 $outcomes = [];
247 $this->waitFor(0.5, 15, function() use ($queueName, &$outcomes) {
248 $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first();
249 if (!$claimed) {
250 return FALSE;
251 }
252 $result = Queue::runItems(0)->setItems([$claimed])->execute()->first();
253 $outcomes[] = $result['outcome'];
254 return ($result['outcome'] !== 'retry');
255 });
256
257 $this->assertEquals(['retry', 'retry', $errorMode], $outcomes);
258 $this->assertEquals(
259 ['nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err', 'nogooddirtyscoundrel_err'],
260 \Civi::$statics[__CLASS__]['doSomethingLog']
261 );
262
263 $expectActive = ['delete' => TRUE, 'abort' => FALSE];
264 $this->assertEquals($expectActive[$errorMode], $queue->isActive());
265 }
266
267 /**
268 * Add a task. The task-running agent is a bit delinquent... so it forgets the first
269 * few tasks. But the third one works!
270 */
271 public function testRetryWithDelinquencyAndSuccess() {
272 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
273 $queue = \Civi::queue($queueName, [
274 'type' => 'Sql',
275 'runner' => 'task',
276 'error' => 'delete',
277 'retry_limit' => 2,
278 'retry_interval' => 0,
279 'lease_time' => 1,
280 ]);
281 $this->assertEquals(0, $queue->numberOfItems());
282
283 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
284 [QueueTest::class, 'doSomething'],
285 ['playinghooky']
286 ));
287 $this->assertEquals(1, $queue->numberOfItems());
288
289 $claim1 = $this->waitForClaim(0.5, 5, $queueName);
290 // Oops, don't do anything with claim #1!
291 $this->assertEquals(1, $queue->numberOfItems());
292 $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
293
294 $claim2 = $this->waitForClaim(0.5, 5, $queueName);
295 // Oops, don't do anything with claim #2!
296 $this->assertEquals(1, $queue->numberOfItems());
297 $this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
298
299 $claim3 = $this->waitForClaim(0.5, 5, $queueName);
300 $this->assertEquals(1, $queue->numberOfItems());
301 $result = Queue::runItems(0)->setItems([$claim3])->execute()->first();
302 $this->assertEquals(0, $queue->numberOfItems());
303 $this->assertEquals(['playinghooky_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
304 $this->assertEquals('ok', $result['outcome']);
305 }
306
307 /**
308 * Add a task which is never going to succeed. The task fails every time, and eventually
309 * we either delete it or abort the queue.
310 *
311 * @param string $errorMode
312 * Either 'delete' or 'abort'
313 * @dataProvider getErrorModes
314 */
315 public function testRetryWithEventualFailure(string $errorMode) {
316 \Civi::$statics[__CLASS__]['doSomethingResult'] = FALSE;
317
318 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
319 $queue = \Civi::queue($queueName, [
320 'type' => 'Sql',
321 'runner' => 'task',
322 'error' => $errorMode,
323 'retry_limit' => 2,
324 'retry_interval' => 0,
325 'lease_time' => 1,
326 ]);
327 $this->assertEquals(0, $queue->numberOfItems());
328
329 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
330 [QueueTest::class, 'doSomething'],
331 ['playinghooky']
332 ));
333 $this->assertEquals(1, $queue->numberOfItems());
334
335 $claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) {
336 $claim = $this->waitForClaim(0.5, 5, $queueName);
337 $this->assertEquals(1, $queue->numberOfItems());
338 $result = Queue::runItems(0)->setItems([$claim])->execute()->first();
339 $this->assertEquals($expectEndCount, $queue->numberOfItems());
340 $this->assertEquals($expectOutcome, $result['outcome']);
341 };
342
343 $claimAndRun('retry', 1);
344 $claimAndRun('retry', 1);
345 switch ($errorMode) {
346 case 'delete':
347 $claimAndRun('delete', 0);
348 $this->assertEquals(TRUE, $queue->isActive());
349 break;
350
351 case 'abort':
352 $claimAndRun('abort', 1);
353 $this->assertEquals(FALSE, $queue->isActive());
354 break;
355 }
356
357 $this->assertEquals(['playinghooky_err', 'playinghooky_err', 'playinghooky_err'], \Civi::$statics[__CLASS__]['doSomethingLog']);
358 }
359
360 /**
361 * If a queue is created as part of a user-job, then it has a fixed scope-of-work. The status
362 * should flip after completing its work.
363 *
364 * @throws \API_Exception
365 * @throws \CRM_Core_Exception
366 * @throws \Civi\API\Exception\UnauthorizedException
367 */
368 public function testUserJobQueue_Completion() {
369 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_userjob';
370
371 $firedQueueStatus = [];
372 \Civi::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) {
373 $firedQueueStatus[$e->queue->getName()] = $e->status;
374 });
375
376 $queue = \Civi::queue($queueName, [
377 'type' => 'Sql',
378 'runner' => 'task',
379 'error' => 'delete',
380 ]);
381 $this->assertEquals(0, $queue->numberOfItems());
382
383 $userJob = \Civi\Api4\UserJob::create(FALSE)->setValues([
384 'job_type:label' => 'Contact Import',
385 'status_id:name' => 'in_progress',
386 'queue_id.name' => $queue->getName(),
387 ])->execute()->single();
388
389 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
390 [QueueTest::class, 'doSomething'],
391 ['first']
392 ));
393 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
394 [QueueTest::class, 'doSomething'],
395 ['second']
396 ));
397
398 // Verify initial status
399 $this->assertEquals(2, $queue->numberOfItems());
400 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
401 $this->assertEquals(TRUE, $queue->isActive());
402 $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
403
404 // OK, let's run both items - and check status afterward.
405 Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
406 $this->assertEquals(1, $queue->numberOfItems());
407 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
408 $this->assertEquals(TRUE, $queue->isActive());
409 $this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
410
411 Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
412 $this->assertEquals(0, $queue->numberOfItems());
413 $this->assertEquals('completed', $firedQueueStatus[$queueName]);
414 $this->assertEquals(FALSE, $queue->isActive());
415 $this->assertEquals(1, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
416 }
417
418 /**
419 * If a queue is created as a long-term service, then its work is never complete.
420 *
421 * @throws \API_Exception
422 * @throws \CRM_Core_Exception
423 * @throws \Civi\API\Exception\UnauthorizedException
424 */
425 public function testServiceQueue_NeverComplete() {
426 $queueName = 'QueueTest_' . md5(random_bytes(32)) . '_service';
427
428 $firedQueueStatus = [];
429 \Civi::dispatcher()->addListener('hook_civicrm_queueStatus', function($e) use (&$firedQueueStatus) {
430 $firedQueueStatus[$e->queue->getName()] = $e->status;
431 });
432
433 $queue = \Civi::queue($queueName, [
434 'type' => 'Sql',
435 'runner' => 'task',
436 'error' => 'delete',
437 ]);
438 $this->assertEquals(0, $queue->numberOfItems());
439
440 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
441 [QueueTest::class, 'doSomething'],
442 ['first']
443 ));
444 \Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
445 [QueueTest::class, 'doSomething'],
446 ['second']
447 ));
448
449 // Verify initial status
450 $this->assertEquals(2, $queue->numberOfItems());
451 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
452 $this->assertEquals(TRUE, $queue->isActive());
453
454 // OK, let's run both items - and check status afterward.
455 Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
456 $this->assertEquals(1, $queue->numberOfItems());
457 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
458 $this->assertEquals(TRUE, $queue->isActive());
459
460 Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
461 $this->assertEquals(0, $queue->numberOfItems());
462 $this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
463 $this->assertEquals(TRUE, $queue->isActive());
464 }
465
466 public static function doSomething(\CRM_Queue_TaskContext $ctx, string $something) {
467 $ok = \Civi::$statics[__CLASS__]['doSomethingResult'];
468 \Civi::$statics[__CLASS__]['doSomethingLog'][] = $something . ($ok ? '_ok' : '_err');
469 return $ok;
470 }
471
472 protected function assertCallback($expectMethod, $expectArgs, $actualTask) {
473 $this->assertEquals([QueueTest::class, $expectMethod], $actualTask['data']['callback'], 'Claimed task should have expected method');
474 $this->assertEquals($expectArgs, $actualTask['data']['arguments'], 'Claimed task should have expected arguments');
475 }
476
477 protected function waitForClaim(float $interval, float $timeout, string $queueName): ?array {
478 $claims = [];
479 $this->waitFor($interval, $timeout, function() use ($queueName, &$claims) {
480 $claimed = Queue::claimItems(0)->setQueue($queueName)->execute()->first();
481 if (!$claimed) {
482 return FALSE;
483 }
484 $claims[] = $claimed;
485 return TRUE;
486 });
487 return $claims[0] ?? NULL;
488 }
489
490 /**
491 * Repeatedly check $condition until it returns true (or until we exhaust timeout).
492 *
493 * @param float $interval
494 * Seconds to wait between checks.
495 * @param float $timeout
496 * Total maximum seconds to wait across all checks.
497 * @param callable $condition
498 * The condition to check.
499 * @return int
500 * Total number of intervals we had to wait/sleep.
501 */
502 protected function waitFor(float $interval, float $timeout, callable $condition): int {
503 $end = microtime(TRUE) + $timeout;
504 $interval *= round($interval * 1000 * 1000);
505 $waitCount = 0;
506 $ready = $condition();
507 while (!$ready && microtime(TRUE) <= $end) {
508 usleep($interval);
509 $waitCount++;
510 $ready = $condition();
511 }
512 $this->assertTrue($ready, 'Wait condition not met');
513 return $waitCount;
514 }
515
516 }