--- /dev/null
+<?php
+
+namespace Civi\Test;
+
+/**
+ * Helper functions for testing queues.
+ */
+trait QueueTestTrait {
+
+ protected function assertQueueStats(int $total, int $ready, int $blocked, \CRM_Queue_Queue $queue) {
+ $format = 'total=%d ready=%d blocked=%d';
+ $expect = [$total, $ready, $blocked];
+ $actual = [$queue->getStatistic('total'), $queue->getStatistic('ready'), $queue->getStatistic('blocked')];
+ $this->assertEquals(sprintf($format, ...$expect), sprintf($format, ...$actual));
+
+ // Deprecated - but checking for continuity.
+ $this->assertEquals($total, $queue->numberOfItems());
+ }
+
+}
*/
class CRM_Queue_Queue_SqlTest extends CiviUnitTestCase {
+ use \Civi\Test\QueueTestTrait;
+
/* ----------------------- Queue providers ----------------------- */
/* Define a list of queue providers which should be tested */
'test-key' => 'c',
]);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->queue->deleteItem($item);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('b', $item->data['test-key']);
$this->queue->deleteItem($item);
'test-key' => 'd',
]);
- $this->assertEquals(4, $this->queue->numberOfItems());
+ $this->assertQueueStats(4, 4, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('start', $item->data['test-key']);
$this->queue->deleteItem($item);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('c', $item->data['test-key']);
$this->queue->deleteItem($item);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('d', $item->data['test-key']);
$this->queue->deleteItem($item);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('end', $item->data['test-key']);
$this->queue->deleteItem($item);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
}
*/
class CRM_Queue_QueueTest extends CiviUnitTestCase {
+ use \Civi\Test\QueueTestTrait;
+
/* ----------------------- Queue providers ----------------------- */
/* Define a list of queue providers which should be tested */
*/
public function getQueueSpecs() {
$queueSpecs = [];
- $queueSpecs[] = [
+ $queueSpecs['Sql'] = [
[
'type' => 'Sql',
'name' => 'test-queue-sql',
],
];
- $queueSpecs[] = [
+ $queueSpecs['Memory'] = [
[
'type' => 'Memory',
'name' => 'test-queue-mem',
],
];
- $queueSpecs[] = [
+ $queueSpecs['SqlParallel'] = [
[
'type' => 'SqlParallel',
'name' => 'test-queue-sqlparallel',
'test-key' => 'c',
]);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
+
$item = $this->queue->claimItem();
+ $this->assertQueueStats(3, 2, 1, $this->queue);
$this->assertEquals('a', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
$this->queue->deleteItem($item);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
$item = $this->queue->claimItem();
+ $this->assertQueueStats(2, 1, 1, $this->queue);
$this->assertEquals('b', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
$this->queue->deleteItem($item);
+ $this->assertQueueStats(1, 1, 0, $this->queue);
$this->queue->createItem([
'test-key' => 'd',
]);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
+
$item = $this->queue->claimItem();
+ $this->assertQueueStats(2, 1, 1, $this->queue);
$this->assertEquals('c', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
$this->queue->deleteItem($item);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('d', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
$this->queue->deleteItem($item);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
$item = $this->queue->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
$this->queue->releaseItem($item);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->assertEquals(2, $item->run_count);
$this->queue->deleteItem($item);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
$this->queue->createItem([
'test-key' => 'a',
]);
+ $this->assertQueueStats(1, 1, 0, $this->queue);
$item = $this->queue->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
// forget to release
// haven't reach expiration yet
$item3 = $this->queue->claimItem();
$this->assertEquals('a', $item3->data['test-key']);
$this->assertEquals(2, $item3->run_count);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
$this->queue->deleteItem($item3);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
$item = $this->queue->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->assertEquals(1, $item->run_count);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
// forget to release
// haven't reached expiration yet, so claimItem fails
$item3 = $this->queue->stealItem();
$this->assertEquals('a', $item3->data['test-key']);
$this->assertEquals(2, $item3->run_count);
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
$this->queue->deleteItem($item3);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
$this->queue->createItem([
'test-key' => 'b',
]);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
unset($this->queue);
$queue2 = $this->queueService->create(
$queueSpec + ['reset' => TRUE]
);
- $this->assertEquals(0, $queue2->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue2);
}
/**
$this->queue->createItem([
'test-key' => 'b',
]);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
unset($this->queue);
$queue2 = $this->queueService->create($queueSpec);
- $this->assertEquals(2, $queue2->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $queue2);
$item = $queue2->claimItem();
$this->assertEquals('a', $item->data['test-key']);
$this->queue->createItem([
'test-key' => 'b',
]);
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
unset($this->queue);
$queue2 = $this->queueService->create($queueSpec);
- $this->assertEquals(2, $queue2->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $queue2);
$item = $queue2->claimItem();
$this->assertEquals('a', $item->data['test-key']);
for ($i = 0; $i < 9; $i++) {
$this->queue->createItem('x' . $i);
}
- $this->assertEquals(9, $this->queue->numberOfItems());
+ $this->assertQueueStats(9, 9, 0, $this->queue);
// We expect this driver to be fully compliant with batching.
$claimsA = $this->queue->claimItems(3);
$claimsB = $this->queue->claimItems(3);
- $this->assertEquals(9, $this->queue->numberOfItems());
+ $this->assertQueueStats(9, 3, 6, $this->queue);
$this->assertEquals(['x0', 'x1', 'x2'], CRM_Utils_Array::collect('data', $claimsA));
$this->assertEquals(['x3', 'x4', 'x5'], CRM_Utils_Array::collect('data', $claimsB));
$this->queue->releaseItems([$claimsA[2]]); /* x2: will retry with next claimItems() */
$this->queue->deleteItems([$claimsB[0], $claimsB[1]]); /* x3, x4 */
/* claimsB[2]: x5: Oops, we're gonna take some time to finish this one. */
- $this->assertEquals(5, $this->queue->numberOfItems());
+ $this->assertQueueStats(5, 4, 1, $this->queue);
$claimsC = $this->queue->claimItems(3);
$this->assertEquals(['x2', 'x6', 'x7'], CRM_Utils_Array::collect('data', $claimsC));
$this->queue->deleteItem($claimsC[0]); /* x2 */
$this->queue->releaseItem($claimsC[1]); /* x6: will retry with next claimItems() */
$this->queue->deleteItem($claimsC[2]); /* x7 */
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 2, 1, $this->queue);
$claimsD = $this->queue->claimItems(3);
$this->assertEquals(['x6', 'x8'], CRM_Utils_Array::collect('data', $claimsD));
$this->queue->deleteItem($claimsD[0]); /* x6 */
$this->queue->deleteItem($claimsD[1]); /* x8 */
- $this->assertEquals(1, $this->queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $this->queue);
// claimsB took a while to wrap-up. But it finally did!
$this->queue->deleteItem($claimsB[2]); /* x5 */
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
public function testSetStatus() {
*/
class CRM_Queue_RunnerTest extends CiviUnitTestCase {
+ use \Civi\Test\QueueTestTrait;
+
public function setUp(): void {
parent::setUp();
$this->queueService = CRM_Queue_Service::singleton(TRUE);
'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
]);
$this->assertEquals(self::$_recordedValues, []);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
$result = $runner->runAll();
$this->assertEquals(TRUE, $result);
$this->assertEquals(self::$_recordedValues, ['a', 'b', 'c']);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
]);
$this->assertEquals(self::$_recordedValues, []);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
$result = $runner->runAll();
$this->assertEquals(TRUE, $result);
$this->assertEquals(self::$_recordedValues, ['a', 1, 2, 3, 'b']);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
'errorMode' => CRM_Queue_Runner::ERROR_CONTINUE,
]);
$this->assertEquals(self::$_recordedValues, []);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
+
$result = $runner->runAll();
// FIXME useless return
$this->assertEquals(TRUE, $result);
$this->assertEquals(self::$_recordedValues, ['a', 'c']);
- $this->assertEquals(0, $this->queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $this->queue);
}
/**
'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
]);
$this->assertEquals(self::$_recordedValues, []);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
+
$result = $runner->runAll();
$this->assertEquals(1, $result['is_error']);
// nothing from 'c'
$this->assertEquals(self::$_recordedValues, ['a']);
// 'b' and 'c' remain
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
}
/**
'errorMode' => CRM_Queue_Runner::ERROR_ABORT,
]);
$this->assertEquals(self::$_recordedValues, []);
- $this->assertEquals(3, $this->queue->numberOfItems());
+ $this->assertQueueStats(3, 3, 0, $this->queue);
$result = $runner->runAll();
$this->assertEquals(1, $result['is_error']);
// nothing from 'c'
$this->assertEquals(self::$_recordedValues, ['a']);
// 'b' and 'c' remain
- $this->assertEquals(2, $this->queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $this->queue);
}
/**
use Civi\Api4\Queue;
use Civi\Api4\UserJob;
use Civi\Core\Event\GenericHookEvent;
+use Civi\Test\QueueTestTrait;
/**
* @group headless
*/
class QueueTest extends Api4TestBase {
+ use QueueTestTrait;
+
protected function setUp(): void {
\Civi::$statics[__CLASS__] = [
'doSomethingResult' => TRUE,
'retry_limit' => 2,
'retry_interval' => 4,
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
$this->assertEquals(['first_ok', 'second_err', 'second_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
// All done.
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
}
public function testBasicParallelPolling() {
$queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
$queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
Queue::runItems(0)->setItems([$first])->execute();
$this->assertEquals(['second_ok', 'first_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
}
/**
'error' => 'delete',
'batch_limit' => 3,
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
for ($i = 0; $i < 7; $i++) {
\Civi::queue($queueName)->createItem(['thingy' => $i]);
public function testSelect() {
$queueName = 'QueueTest_' . md5(random_bytes(32)) . '_parallel';
$queue = \Civi::queue($queueName, ['type' => 'SqlParallel', 'runner' => 'task', 'error' => 'delete']);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
public function testEmptyPoll() {
$queueName = 'QueueTest_' . md5(random_bytes(32)) . '_linear';
$queue = \Civi::queue($queueName, ['type' => 'Sql', 'runner' => 'task', 'error' => 'delete']);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$startResult = Queue::claimItems()->setQueue($queueName)->execute();
$this->assertEquals(0, $startResult->count());
public function testDelayedStart(array $queueSpec) {
$queueName = 'QueueTest_' . md5(random_bytes(32)) . '_delayed';
$queue = \Civi::queue($queueName, $queueSpec);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$releaseTime = \CRM_Utils_Time::strtotime('+3 seconds');
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
['itwillstartanymomentnow']
), ['release_time' => $releaseTime]);
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $queue);
// Not available... yet...
$claim1 = $queue->claimItem();
'retry_limit' => 2,
'retry_interval' => 1,
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
'retry_interval' => 0,
'lease_time' => 1,
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
['playinghooky']
));
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $queue);
$claim1 = $this->waitForClaim(0.5, 5, $queueName);
// Oops, don't do anything with claim #1!
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $queue);
$this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
$claim2 = $this->waitForClaim(0.5, 5, $queueName);
// Oops, don't do anything with claim #2!
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $queue);
$this->assertEquals([], \Civi::$statics[__CLASS__]['doSomethingLog']);
$claim3 = $this->waitForClaim(0.5, 5, $queueName);
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $queue);
$result = Queue::runItems(0)->setItems([$claim3])->execute()->first();
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$this->assertEquals(['playinghooky_ok'], \Civi::$statics[__CLASS__]['doSomethingLog']);
$this->assertEquals('ok', $result['outcome']);
}
'retry_interval' => 0,
'lease_time' => 1,
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
['playinghooky']
));
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $queue);
$claimAndRun = function($expectOutcome, $expectEndCount) use ($queue, $queueName) {
$claim = $this->waitForClaim(0.5, 5, $queueName);
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 0, 1, $queue);
$result = Queue::runItems(0)->setItems([$claim])->execute()->first();
- $this->assertEquals($expectEndCount, $queue->numberOfItems());
+ $this->assertEquals($expectEndCount, $queue->getStatistic('total'));
$this->assertEquals($expectOutcome, $result['outcome']);
};
'runner' => 'task',
'error' => 'delete',
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$userJob = \Civi\Api4\UserJob::create(FALSE)->setValues([
'job_type:name' => 'contact_import',
));
// Verify initial status
- $this->assertEquals(2, $queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $queue);
$this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
$this->assertEquals(TRUE, $queue->isActive());
$this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
// OK, let's run both items - and check status afterward.
Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $queue);
$this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
$this->assertEquals(TRUE, $queue->isActive());
$this->assertEquals(4, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$this->assertEquals('completed', $firedQueueStatus[$queueName]);
$this->assertEquals(FALSE, $queue->isActive());
$this->assertEquals(1, UserJob::get()->addWhere('id', '=', $userJob['id'])->execute()->first()['status_id']);
'runner' => 'task',
'error' => 'delete',
]);
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
\Civi::queue($queueName)->createItem(new \CRM_Queue_Task(
[QueueTest::class, 'doSomething'],
));
// Verify initial status
- $this->assertEquals(2, $queue->numberOfItems());
+ $this->assertQueueStats(2, 2, 0, $queue);
$this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
$this->assertEquals(TRUE, $queue->isActive());
// OK, let's run both items - and check status afterward.
Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
- $this->assertEquals(1, $queue->numberOfItems());
+ $this->assertQueueStats(1, 1, 0, $queue);
$this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
$this->assertEquals(TRUE, $queue->isActive());
Queue::runItems(FALSE)->setQueue($queueName)->execute()->single();
- $this->assertEquals(0, $queue->numberOfItems());
+ $this->assertQueueStats(0, 0, 0, $queue);
$this->assertEquals(FALSE, isset($firedQueueStatus[$queueName]));
$this->assertEquals(TRUE, $queue->isActive());
}