Commit | Line | Data |
---|---|---|
e231d9e8 | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
e231d9e8 CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
4f9f969d LS |
17 | import os |
18 | import sys | |
0679545f | 19 | import datetime |
2d7b6bde | 20 | import logging |
4f9f969d | 21 | |
386c9c7c BP |
22 | import six |
23 | ||
d693f6bd | 24 | from celery import Celery |
9a27fa60 | 25 | from kombu import Exchange, Queue |
c5d8d301 | 26 | from mediagoblin.tools.pluginapi import hook_runall |
d693f6bd | 27 | |
4f9f969d | 28 | |
2d7b6bde JW |
29 | _log = logging.getLogger(__name__) |
30 | ||
31 | ||
32 | MANDATORY_CELERY_IMPORTS = [ | |
33 | 'mediagoblin.processing.task', | |
9246a6ba JT |
34 | 'mediagoblin.notifications.task', |
35 | 'mediagoblin.submit.task', | |
9a27fa60 | 36 | 'mediagoblin.media_types.video.processing', |
9246a6ba | 37 | ] |
4f9f969d LS |
38 | |
39 | DEFAULT_SETTINGS_MODULE = 'mediagoblin.init.celery.dummy_settings_module' | |
40 | ||
41 | ||
d693f6bd CAW |
42 | def get_celery_settings_dict(app_config, global_config, |
43 | force_celery_always_eager=False): | |
4f9f969d | 44 | """ |
d693f6bd | 45 | Get a celery settings dictionary from reading the config |
4f9f969d LS |
46 | """ |
47 | if 'celery' in global_config: | |
48 | celery_conf = global_config['celery'] | |
49 | else: | |
50 | celery_conf = {} | |
51 | ||
9a27fa60 | 52 | # Add x-max-priority to config |
53 | celery_conf['CELERY_QUEUES'] = ( | |
54 | Queue('default', Exchange('default'), routing_key='default', | |
55 | queue_arguments={'x-max-priority': 10}), | |
56 | ) | |
57 | ||
d77eb562 | 58 | print "CELERY_ACKS_LATE", celery_conf['CELERY_ACKS_LATE'] |
59 | print "CELERYD_PREFETCH_MULTIPLIER", celery_conf['CELERYD_PREFETCH_MULTIPLIER'] | |
60 | ||
4f9f969d LS |
61 | celery_settings = {} |
62 | ||
63 | # Add all celery settings from config | |
386c9c7c | 64 | for key, value in six.iteritems(celery_conf): |
4f9f969d LS |
65 | celery_settings[key] = value |
66 | ||
67 | # TODO: use default result stuff here if it exists | |
68 | ||
69 | # add mandatory celery imports | |
70 | celery_imports = celery_settings.setdefault('CELERY_IMPORTS', []) | |
71 | celery_imports.extend(MANDATORY_CELERY_IMPORTS) | |
72 | ||
73 | if force_celery_always_eager: | |
74 | celery_settings['CELERY_ALWAYS_EAGER'] = True | |
75 | celery_settings['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True | |
76 | ||
0679545f JT |
77 | # Garbage collection periodic task |
78 | frequency = app_config.get('garbage_collection', 60) | |
79 | if frequency: | |
d8f55f2b | 80 | frequency = int(frequency) |
0679545f JT |
81 | celery_settings['CELERYBEAT_SCHEDULE'] = { |
82 | 'garbage-collection': { | |
584520e3 | 83 | 'task': 'mediagoblin.submit.task.collect_garbage', |
0679545f JT |
84 | 'schedule': datetime.timedelta(minutes=frequency), |
85 | } | |
86 | } | |
87 | celery_settings['BROKER_HEARTBEAT'] = 1 | |
88 | ||
d693f6bd CAW |
89 | return celery_settings |
90 | ||
91 | ||
92 | def setup_celery_app(app_config, global_config, | |
93 | settings_module=DEFAULT_SETTINGS_MODULE, | |
94 | force_celery_always_eager=False): | |
95 | """ | |
96 | Setup celery without using terrible setup-celery-module hacks. | |
97 | """ | |
98 | celery_settings = get_celery_settings_dict( | |
99 | app_config, global_config, force_celery_always_eager) | |
100 | celery_app = Celery() | |
101 | celery_app.config_from_object(celery_settings) | |
102 | ||
c5d8d301 | 103 | hook_runall('celery_setup', celery_app) |
f3f53028 | 104 | |
d693f6bd CAW |
105 | |
106 | def setup_celery_from_config(app_config, global_config, | |
107 | settings_module=DEFAULT_SETTINGS_MODULE, | |
108 | force_celery_always_eager=False, | |
109 | set_environ=True): | |
110 | """ | |
111 | Take a mediagoblin app config and try to set up a celery settings | |
112 | module from this. | |
113 | ||
114 | Args: | |
115 | - app_config: the application config section | |
116 | - global_config: the entire ConfigObj loaded config, all sections | |
117 | - settings_module: the module to populate, as a string | |
118 | - force_celery_always_eager: whether or not to force celery into | |
119 | always eager mode; good for development and small installs | |
120 | - set_environ: if set, this will CELERY_CONFIG_MODULE to the | |
121 | settings_module | |
122 | """ | |
123 | celery_settings = get_celery_settings_dict( | |
124 | app_config, global_config, force_celery_always_eager) | |
125 | ||
4f9f969d LS |
126 | __import__(settings_module) |
127 | this_module = sys.modules[settings_module] | |
128 | ||
386c9c7c | 129 | for key, value in six.iteritems(celery_settings): |
4f9f969d LS |
130 | setattr(this_module, key, value) |
131 | ||
132 | if set_environ: | |
133 | os.environ['CELERY_CONFIG_MODULE'] = settings_module | |
2d7b6bde JW |
134 | |
135 | # Replace the default celery.current_app.conf if celery has already been | |
136 | # initiated | |
137 | from celery import current_app | |
138 | ||
139 | _log.info('Setting celery configuration from object "{0}"'.format( | |
140 | settings_module)) | |
141 | current_app.config_from_object(this_module) | |
142 | ||
143 | _log.debug('Celery broker host: {0}'.format(current_app.conf['BROKER_HOST'])) |