3 +--------------------------------------------------------------------+
4 | Copyright CiviCRM LLC. All rights reserved. |
6 | This work is published under the GNU AGPLv3 license with some |
7 | permitted exceptions and without any warranty. For full license |
8 | and copyright information, see https://civicrm.org/licensing |
9 +--------------------------------------------------------------------+
13 * The queue service provides an interface for creating or locating
14 * queues. Note that this approach hides the details of data-storage:
15 * different queue-providers may store the queue content in different
16 * ways (in memory, in SQL, or in an external service).
19 * $queue = CRM_Queue_Service::singleton()->create(array(
20 * 'type' => 'interactive',
21 * 'name' => 'upgrade-tasks',
23 * $queue->createItem($myData);
25 * // Some time later...
26 * $item = $queue->claimItem();
28 * if (my_process($item->data)) {
29 * $queue->deleteItem($item);
31 * $queue->releaseItem($item);
36 class CRM_Queue_Service
{
38 protected static $_singleton;
41 * FIXME: Singleton pattern should be removed when dependency-injection
44 * @param bool $forceNew
45 * TRUE if a new instance must be created.
47 * @return \CRM_Queue_Service
49 public static function &singleton($forceNew = FALSE) {
50 if ($forceNew ||
!self
::$_singleton) {
51 self
::$_singleton = new CRM_Queue_Service();
53 return self
::$_singleton;
59 * Format is (string $queueName => CRM_Queue_Queue).
68 public function __construct() {
73 * Create a queue. If one already exists, then it will be reused.
75 * @param array $queueSpec
77 * - type: string, required, e.g. "interactive", "immediate", "stomp",
79 * - name: string, required, e.g. "upgrade-tasks"
80 * - reset: bool, optional; if a queue is found, then it should be
81 * flushed; default to TRUE
82 * - (additional keys depending on the queue provider).
84 * @return CRM_Queue_Queue
86 public function create($queueSpec) {
87 if (@is_object
($this->queues
[$queueSpec['name']]) && empty($queueSpec['reset'])) {
88 return $this->queues
[$queueSpec['name']];
91 $queue = $this->instantiateQueueObject($queueSpec);
92 $exists = $queue->existsQueue();
94 $queue->createQueue();
96 elseif (@$queueSpec['reset']) {
97 $queue->deleteQueue();
98 $queue->createQueue();
103 $this->queues
[$queueSpec['name']] = $queue;
108 * Look up an existing queue.
110 * @param array $queueSpec
112 * - type: string, required, e.g. "interactive", "immediate", "stomp",
114 * - name: string, required, e.g. "upgrade-tasks"
115 * - (additional keys depending on the queue provider).
117 * @return CRM_Queue_Queue
119 public function load($queueSpec) {
120 if (is_object($this->queues
[$queueSpec['name']] ??
NULL)) {
121 return $this->queues
[$queueSpec['name']];
123 $queue = $this->instantiateQueueObject($queueSpec);
125 $this->queues
[$queueSpec['name']] = $queue;
130 * Convert a queue "type" name to a class name.
132 * @param string $type
133 * E.g. "interactive", "immediate", "stomp", "beanstalk".
138 protected function getQueueClass($type) {
139 $type = preg_replace('/[^a-zA-Z0-9]/', '', $type);
140 $className = 'CRM_Queue_Queue_' . $type;
141 // FIXME: when used with class-autoloader, this may be unnecessary
142 if (!class_exists($className)) {
143 $classFile = 'CRM/Queue/Queue/' . $type . '.php';
144 require_once $classFile;
150 * @param array $queueSpec
153 * @return CRM_Queue_Queue
155 protected function instantiateQueueObject($queueSpec) {
156 // note: you should probably never do anything else here
157 $class = new ReflectionClass($this->getQueueClass($queueSpec['type']));
158 return $class->newInstance($queueSpec);