Switch to rabbitmq by default and in docs
[mediagoblin.git] / mediagoblin / processing / task.py
CommitLineData
eace050a
E
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17import logging
120fa4ae
BP
18
19from six.moves.urllib import request, parse
eace050a 20
b5059525 21import celery
bf2dafd1 22from celery.registry import tasks
eace050a
E
23
24from mediagoblin import mg_globals as mgg
77ea4c9b 25from . import mark_entry_failed, BaseProcessingFail
5354f954 26from mediagoblin.tools.processing import json_processing_callback
55cfa340 27from mediagoblin.processing import get_entry_and_processing_manager
eace050a
E
28
29_log = logging.getLogger(__name__)
51eb0267
JW
30logging.basicConfig()
31_log.setLevel(logging.DEBUG)
eace050a
E
32
33
b5059525 34@celery.task(default_retry_delay=2 * 60)
2cfffd5e
SS
35def handle_push_urls(feed_url):
36 """Subtask, notifying the PuSH servers of new content
37
38 Retry 3 times every 2 minutes if run in separate process before failing."""
39 if not mgg.app_config["push_urls"]:
40 return # Nothing to do
41 _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
42 hubparameters = {
43 'hub.mode': 'publish',
44 'hub.url': feed_url}
120fa4ae 45 hubdata = parse.urlencode(hubparameters)
2cfffd5e
SS
46 hubheaders = {
47 "Content-type": "application/x-www-form-urlencoded",
48 "Connection": "close"}
49 for huburl in mgg.app_config["push_urls"]:
120fa4ae 50 hubrequest = request.Request(huburl, hubdata, hubheaders)
2cfffd5e 51 try:
120fa4ae
BP
52 hubresponse = request.urlopen(hubrequest)
53 except (request.HTTPError, request.URLError) as exc:
2cfffd5e
SS
54 # We retry by default 3 times before failing
55 _log.info("PuSH url %r gave error %r", huburl, exc)
56 try:
57 return handle_push_urls.retry(exc=exc, throw=False)
58 except Exception as e:
59 # All retries failed, Failure is no tragedy here, probably.
60 _log.warn('Failed to notify PuSH server for feed {0}. '
61 'Giving up.'.format(feed_url))
62 return False
63
b5059525 64
eace050a
E
65################################
66# Media processing initial steps
67################################
b5059525
RE
68class ProcessMedia(celery.Task):
69 """
70 Pass this entry off for processing.
71 """
2e1e9650
BB
72
73 name = 'process_media'
74
77ea4c9b 75 def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
eace050a
E
76 """
77 Pass the media entry off to the appropriate processing function
78 (for now just process_image...)
2cfffd5e 79
067ee131 80 :param media_id: MediaEntry().id
2cfffd5e
SS
81 :param feed_url: The feed URL that the PuSH server needs to be
82 updated for.
067ee131
BB
83 :param reprocess_action: What particular action should be run. For
84 example, 'initial'.
9a2c66ca
RE
85 :param reprocess: A dict containing all of the necessary reprocessing
86 info for the media_type.
eace050a 87 """
77ea4c9b 88 reprocess_info = reprocess_info or {}
55cfa340 89 entry, manager = get_entry_and_processing_manager(media_id)
eace050a
E
90
91 # Try to process, and handle expected errors.
92 try:
77ea4c9b
CAW
93 processor_class = manager.get_processor(reprocess_action, entry)
94
22479c39
CAW
95 with processor_class(manager, entry) as processor:
96 # Initial state change has to be here because
97 # the entry.state gets recorded on processor_class init
98 entry.state = u'processing'
99 entry.save()
64712915 100
22479c39 101 _log.debug('Processing {0}'.format(entry))
64712915 102
7d3fda06
RE
103 try:
104 processor.process(**reprocess_info)
105 except Exception as exc:
106 if processor.entry_orig_state == 'processed':
107 _log.error(
108 'Entry {0} failed to process due to the following'
109 ' error: {1}'.format(entry.id, exc))
110 _log.info(
111 'Setting entry.state back to "processed"')
112 pass
113 else:
114 raise
64712915 115
6af6bc05
CAW
116 # We set the state to processed and save the entry here so there's
117 # no need to save at the end of the processing stage, probably ;)
64712915
JW
118 entry.state = u'processed'
119 entry.save()
120
2cfffd5e 121 # Notify the PuSH servers as async task
c7b3d070
SS
122 if mgg.app_config["push_urls"] and feed_url:
123 handle_push_urls.subtask().delay(feed_url)
2cfffd5e 124
5354f954 125 json_processing_callback(entry)
51eb0267 126 except BaseProcessingFail as exc:
5c2b8486 127 mark_entry_failed(entry.id, exc)
5354f954 128 json_processing_callback(entry)
eace050a 129 return
64712915 130
51eb0267 131 except ImportError as exc:
eace050a
E
132 _log.error(
133 'Entry {0} failed to process due to an import error: {1}'\
134 .format(
135 entry.title,
136 exc))
137
5c2b8486 138 mark_entry_failed(entry.id, exc)
5354f954 139 json_processing_callback(entry)
eace050a 140
2891b2c6
JW
141 except Exception as exc:
142 _log.error('An unhandled exception was raised while'
143 + ' processing {0}'.format(
144 entry))
145
5c2b8486 146 mark_entry_failed(entry.id, exc)
5354f954 147 json_processing_callback(entry)
2891b2c6
JW
148 raise
149
eace050a
E
150 def on_failure(self, exc, task_id, args, kwargs, einfo):
151 """
152 If the processing failed we should mark that in the database.
153
154 Assuming that the exception raised is a subclass of
155 BaseProcessingFail, we can use that to get more information
156 about the failure and store that for conveying information to
157 users about the failure, etc.
158 """
159 entry_id = args[0]
160 mark_entry_failed(entry_id, exc)
5354f954 161
939d57a0 162 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
5354f954 163 json_processing_callback(entry)
fbb3ee50
JT
164 mgg.database.reset_after_request()
165
166 def after_return(self, *args, **kwargs):
167 """
168 This is called after the task has returned, we should clean up.
169
170 We need to rollback the database to prevent ProgrammingError exceptions
171 from being raised.
172 """
173 # In eager mode we get DetachedInstanceError, we do rollback on_failure
174 # to deal with that case though when in eager mode.
175 if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']:
176 mgg.database.reset_after_request()
177
2cfffd5e 178
bf2dafd1 179tasks.register(ProcessMedia)