create(array( * 'type' => 'interactive', * 'name' => 'upgrade-tasks', * )); * $queue->createItem($myData); * * // Some time later... * $item = $queue->claimItem(); * if ($item) { * if (my_process($item->data)) { * $queue->deleteItem($item); * } else { * $queue->releaseItem($item); * } * } * @endcode */ class CRM_Queue_Service { static $_singleton; /** * FIXME: Singleton pattern should be removed when dependency-injection * becomes available. * * @param $forceNew bool * * @return \CRM_Queue_Service */ static function &singleton($forceNew = FALSE) { if ($forceNew || !self::$_singleton) { self::$_singleton = new CRM_Queue_Service(); } return self::$_singleton; } /** * @var array(queueName => CRM_Queue_Queue) */ var $queues; /** * */ function __construct() { $this->queues = array(); } /** * * @param $queueSpec, array with keys: * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk" * - name: string, required, e.g. "upgrade-tasks" * - reset: bool, optional; if a queue is found, then it should be flushed; default to TRUE * - (additional keys depending on the queue provider) * * @return CRM_Queue_Queue */ function create($queueSpec) { if (@is_object($this->queues[$queueSpec['name']]) && empty($queueSpec['reset'])) { return $this->queues[$queueSpec['name']]; } $queue = $this->instantiateQueueObject($queueSpec); $exists = $queue->existsQueue(); if (!$exists) { $queue->createQueue(); } elseif (@$queueSpec['reset']) { $queue->deleteQueue(); $queue->createQueue(); } else { $queue->loadQueue(); } $this->queues[$queueSpec['name']] = $queue; return $queue; } /** * * @param $queueSpec, array with keys: * - type: string, required, e.g. "interactive", "immediate", "stomp", "beanstalk" * - name: string, required, e.g. "upgrade-tasks" * - (additional keys depending on the queue provider) * * @return CRM_Queue_Queue */ function load($queueSpec) { if (is_object($this->queues[$queueSpec['name']])) { return $this->queues[$queueSpec['name']]; } $queue = $this->instantiateQueueObject($queueSpec); $queue->loadQueue(); $this->queues[$queueSpec['name']] = $queue; return $queue; } /** * Convert a queue "type" name to a class name * * @param $type string, e.g. "interactive", "immediate", "stomp", "beanstalk" * * @return string, class-name */ protected function getQueueClass($type) { $type = preg_replace('/[^a-zA-Z0-9]/', '', $type); $className = 'CRM_Queue_Queue_' . $type; // FIXME: when used with class-autoloader, this may be unnecessary if (!class_exists($className)) { $classFile = 'CRM/Queue/Queue/' . $type . '.php'; require_once $classFile; } return $className; } /** * * @param $queueSpec array, see create() * * @return CRM_Queue_Queue */ protected function instantiateQueueObject($queueSpec) { // note: you should probably never do anything else here $class = new ReflectionClass($this->getQueueClass($queueSpec['type'])); return $class->newInstance($queueSpec); } }