Add additional celery config settings
authorvijeth-aradhya <vijthaaa@gmail.com>
Sun, 11 Jun 2017 13:37:58 +0000 (19:07 +0530)
committervijeth-aradhya <vijthaaa@gmail.com>
Sun, 11 Jun 2017 13:37:58 +0000 (19:07 +0530)
Fixes older webm_video backward compatibilty issue.
Add 'default' queue to be used from now.
Add other necessary celery settings for priority.

mediagoblin/config_spec.ini
mediagoblin/init/celery/__init__.py
mediagoblin/media_types/video/processing.py

index bd3003d0e599b701f9d4be3a258df898e5010d07..e1a0d0c5e8a8a5c1586003b2aedd6d10a04ea670 100644 (file)
@@ -154,6 +154,7 @@ CELERY_RESULT_DBURI = string(default="sqlite:///%(here)s/celery.db")
 
 # default kombu stuff
 BROKER_URL = string(default="amqp://")
+CELERY_DEFAULT_QUEUE = string(default="default")
 
 # known booleans
 CELERY_RESULT_PERSISTENT = boolean()
@@ -165,7 +166,7 @@ CELERY_EAGER_PROPAGATES_EXCEPTIONS = boolean()
 CELERY_IGNORE_RESULT = boolean()
 CELERY_TRACK_STARTED = boolean()
 CELERY_DISABLE_RATE_LIMITS = boolean()
-CELERY_ACKS_LATE = boolean()
+CELERY_ACKS_LATE = boolean(default=True)
 CELERY_STORE_ERRORS_EVEN_IF_IGNORED = boolean()
 CELERY_SEND_TASK_ERROR_EMAILS = boolean()
 CELERY_SEND_EVENTS = boolean()
@@ -175,7 +176,7 @@ CELERY_REDIRECT_STDOUTS = boolean()
 
 # known ints
 CELERYD_CONCURRENCY = integer()
-CELERYD_PREFETCH_MULTIPLIER = integer()
+CELERYD_PREFETCH_MULTIPLIER = integer(default=1)
 CELERY_AMQP_TASK_RESULT_EXPIRES = integer()
 CELERY_AMQP_TASK_RESULT_CONNECTION_MAX = integer()
 REDIS_PORT = integer()
index 780e00553da6b19929b9aa3635a319bca94a219e..9a67942cbf2b23e1633ced8decdbf65ffb523b13 100644 (file)
@@ -22,6 +22,7 @@ import logging
 import six
 
 from celery import Celery
+from kombu import Exchange, Queue
 from mediagoblin.tools.pluginapi import hook_runall
 
 
@@ -32,6 +33,7 @@ MANDATORY_CELERY_IMPORTS = [
     'mediagoblin.processing.task',
     'mediagoblin.notifications.task',
     'mediagoblin.submit.task',
+    'mediagoblin.media_types.video.processing',
 ]
 
 DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module'
@@ -47,6 +49,12 @@ def get_celery_settings_dict(app_config, global_config,
     else:
         celery_conf = {}
 
+    # Add x-max-priority to config
+    celery_conf['CELERY_QUEUES'] = (
+        Queue('default', Exchange('default'), routing_key='default',
+              queue_arguments={'x-max-priority': 10}),
+    )
+
     celery_settings = {}
 
     # Add all celery settings from config
index 4dee8d555fb5c279a5ba9e3ae6b389ae3068e0fc..5cae42f5454a23fb7952c98b09f5fcf69cdd95b9 100644 (file)
@@ -18,6 +18,7 @@ import argparse
 import os.path
 import logging
 import datetime
+import celery
 
 import six
 
@@ -163,6 +164,7 @@ def store_metadata(media_entry, metadata):
 # =====================
 
 
+@celery.task()
 def main_task(**process_info):
     processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
     processor.common_setup(process_info['resolution'])
@@ -172,6 +174,7 @@ def main_task(**process_info):
     processor.store_orig_metadata()
 
 
+@celery.task()
 def complimentary_task(**process_info):
     processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
     processor.common_setup(process_info['resolution'])
@@ -179,6 +182,7 @@ def complimentary_task(**process_info):
                         vp8_threads=process_info['vp8_threads'], vorbis_quality=process_info['vorbis_quality'])
 
 
+@celery.task()
 def processing_cleanup(**process_info):
     processor = CommonVideoProcessor(process_info['manager'], process_info['entry'])
     processor.delete_queue_file()
@@ -408,7 +412,6 @@ class InitialProcessor(CommonVideoProcessor):
         self.transcode(medium_size=medium_size, vp8_quality=vp8_quality,
                        vp8_threads=vp8_threads, vorbis_quality=vorbis_quality)
 
-        self.copy_original()
         self.generate_thumb(thumb_size=thumb_size)
         self.delete_queue_file()