From 9a27fa60a42cd39596cb8c4bb0331279b998bae7 Mon Sep 17 00:00:00 2001 From: vijeth-aradhya Date: Sun, 11 Jun 2017 19:07:58 +0530 Subject: [PATCH] Add additional celery config settings Fixes older webm_video backward compatibilty issue. Add 'default' queue to be used from now. Add other necessary celery settings for priority. --- mediagoblin/config_spec.ini | 5 +++-- mediagoblin/init/celery/__init__.py | 8 ++++++++ mediagoblin/media_types/video/processing.py | 5 ++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/mediagoblin/config_spec.ini b/mediagoblin/config_spec.ini index bd3003d0..e1a0d0c5 100644 --- a/mediagoblin/config_spec.ini +++ b/mediagoblin/config_spec.ini @@ -154,6 +154,7 @@ CELERY_RESULT_DBURI = string(default="sqlite:///%(here)s/celery.db") # default kombu stuff BROKER_URL = string(default="amqp://") +CELERY_DEFAULT_QUEUE = string(default="default") # known booleans CELERY_RESULT_PERSISTENT = boolean() @@ -165,7 +166,7 @@ CELERY_EAGER_PROPAGATES_EXCEPTIONS = boolean() CELERY_IGNORE_RESULT = boolean() CELERY_TRACK_STARTED = boolean() CELERY_DISABLE_RATE_LIMITS = boolean() -CELERY_ACKS_LATE = boolean() +CELERY_ACKS_LATE = boolean(default=True) CELERY_STORE_ERRORS_EVEN_IF_IGNORED = boolean() CELERY_SEND_TASK_ERROR_EMAILS = boolean() CELERY_SEND_EVENTS = boolean() @@ -175,7 +176,7 @@ CELERY_REDIRECT_STDOUTS = boolean() # known ints CELERYD_CONCURRENCY = integer() -CELERYD_PREFETCH_MULTIPLIER = integer() +CELERYD_PREFETCH_MULTIPLIER = integer(default=1) CELERY_AMQP_TASK_RESULT_EXPIRES = integer() CELERY_AMQP_TASK_RESULT_CONNECTION_MAX = integer() REDIS_PORT = integer() diff --git a/mediagoblin/init/celery/__init__.py b/mediagoblin/init/celery/__init__.py index 780e0055..9a67942c 100644 --- a/mediagoblin/init/celery/__init__.py +++ b/mediagoblin/init/celery/__init__.py @@ -22,6 +22,7 @@ import logging import six from celery import Celery +from kombu import Exchange, Queue from mediagoblin.tools.pluginapi import hook_runall @@ -32,6 +33,7 @@ MANDATORY_CELERY_IMPORTS = [ 'mediagoblin.processing.task', 'mediagoblin.notifications.task', 'mediagoblin.submit.task', + 'mediagoblin.media_types.video.processing', ] DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module' @@ -47,6 +49,12 @@ def get_celery_settings_dict(app_config, global_config, else: celery_conf = {} + # Add x-max-priority to config + celery_conf['CELERY_QUEUES'] = ( + Queue('default', Exchange('default'), routing_key='default', + queue_arguments={'x-max-priority': 10}), + ) + celery_settings = {} # Add all celery settings from config diff --git a/mediagoblin/media_types/video/processing.py b/mediagoblin/media_types/video/processing.py index 4dee8d55..5cae42f5 100644 --- a/mediagoblin/media_types/video/processing.py +++ b/mediagoblin/media_types/video/processing.py @@ -18,6 +18,7 @@ import argparse import os.path import logging import datetime +import celery import six @@ -163,6 +164,7 @@ def store_metadata(media_entry, metadata): # ===================== +@celery.task() def main_task(**process_info): processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor.common_setup(process_info['resolution']) @@ -172,6 +174,7 @@ def main_task(**process_info): processor.store_orig_metadata() +@celery.task() def complimentary_task(**process_info): processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor.common_setup(process_info['resolution']) @@ -179,6 +182,7 @@ def complimentary_task(**process_info): vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality']) +@celery.task() def processing_cleanup(**process_info): processor = CommonVideoProcessor(process_info['manager'], process_info['entry']) processor.delete_queue_file() @@ -408,7 +412,6 @@ class InitialProcessor(CommonVideoProcessor): self.transcode(medium_size=medium_size, vp8_quality=vp8_quality, vp8_threads=vp8_threads, vorbis_quality=vorbis_quality) - self.copy_original() self.generate_thumb(thumb_size=thumb_size) self.delete_queue_file() -- 2.25.1