Merge remote-tracking branch 'refs/remotes/breton/new_gst10'
[mediagoblin.git] / mediagoblin / processing / task.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import logging
18
19 from six.moves.urllib import request, parse
20
21 import celery
22 from celery.registry import tasks
23
24 from mediagoblin import mg_globals as mgg
25 from . import mark_entry_failed, BaseProcessingFail
26 from mediagoblin.tools.processing import json_processing_callback
27 from mediagoblin.processing import get_entry_and_processing_manager
28
29 _log = logging.getLogger(__name__)
30 logging.basicConfig()
31 _log.setLevel(logging.DEBUG)
32
33
34 @celery.task(default_retry_delay=2 * 60)
35 def handle_push_urls(feed_url):
36 """Subtask, notifying the PuSH servers of new content
37
38 Retry 3 times every 2 minutes if run in separate process before failing."""
39 if not mgg.app_config["push_urls"]:
40 return # Nothing to do
41 _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
42 hubparameters = {
43 'hub.mode': 'publish',
44 'hub.url': feed_url}
45 hubdata = parse.urlencode(hubparameters)
46 hubheaders = {
47 "Content-type": "application/x-www-form-urlencoded",
48 "Connection": "close"}
49 for huburl in mgg.app_config["push_urls"]:
50 hubrequest = request.Request(huburl, hubdata, hubheaders)
51 try:
52 hubresponse = request.urlopen(hubrequest)
53 except (request.HTTPError, request.URLError) as exc:
54 # We retry by default 3 times before failing
55 _log.info("PuSH url %r gave error %r", huburl, exc)
56 try:
57 return handle_push_urls.retry(exc=exc, throw=False)
58 except Exception as e:
59 # All retries failed, Failure is no tragedy here, probably.
60 _log.warn('Failed to notify PuSH server for feed {0}. '
61 'Giving up.'.format(feed_url))
62 return False
63
64
65 ################################
66 # Media processing initial steps
67 ################################
68 class ProcessMedia(celery.Task):
69 """
70 Pass this entry off for processing.
71 """
72 def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
73 """
74 Pass the media entry off to the appropriate processing function
75 (for now just process_image...)
76
77 :param media_id: MediaEntry().id
78 :param feed_url: The feed URL that the PuSH server needs to be
79 updated for.
80 :param reprocess_action: What particular action should be run. For
81 example, 'initial'.
82 :param reprocess: A dict containing all of the necessary reprocessing
83 info for the media_type.
84 """
85 reprocess_info = reprocess_info or {}
86 entry, manager = get_entry_and_processing_manager(media_id)
87
88 # Try to process, and handle expected errors.
89 try:
90 processor_class = manager.get_processor(reprocess_action, entry)
91
92 with processor_class(manager, entry) as processor:
93 # Initial state change has to be here because
94 # the entry.state gets recorded on processor_class init
95 entry.state = u'processing'
96 entry.save()
97
98 _log.debug('Processing {0}'.format(entry))
99
100 try:
101 processor.process(**reprocess_info)
102 except Exception as exc:
103 if processor.entry_orig_state == 'processed':
104 _log.error(
105 'Entry {0} failed to process due to the following'
106 ' error: {1}'.format(entry.id, exc))
107 _log.info(
108 'Setting entry.state back to "processed"')
109 pass
110 else:
111 raise
112
113 # We set the state to processed and save the entry here so there's
114 # no need to save at the end of the processing stage, probably ;)
115 entry.state = u'processed'
116 entry.save()
117
118 # Notify the PuSH servers as async task
119 if mgg.app_config["push_urls"] and feed_url:
120 handle_push_urls.subtask().delay(feed_url)
121
122 json_processing_callback(entry)
123 except BaseProcessingFail as exc:
124 mark_entry_failed(entry.id, exc)
125 json_processing_callback(entry)
126 return
127
128 except ImportError as exc:
129 _log.error(
130 'Entry {0} failed to process due to an import error: {1}'\
131 .format(
132 entry.title,
133 exc))
134
135 mark_entry_failed(entry.id, exc)
136 json_processing_callback(entry)
137
138 except Exception as exc:
139 _log.error('An unhandled exception was raised while'
140 + ' processing {0}'.format(
141 entry))
142
143 mark_entry_failed(entry.id, exc)
144 json_processing_callback(entry)
145 raise
146
147 def on_failure(self, exc, task_id, args, kwargs, einfo):
148 """
149 If the processing failed we should mark that in the database.
150
151 Assuming that the exception raised is a subclass of
152 BaseProcessingFail, we can use that to get more information
153 about the failure and store that for conveying information to
154 users about the failure, etc.
155 """
156 entry_id = args[0]
157 mark_entry_failed(entry_id, exc)
158
159 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
160 json_processing_callback(entry)
161 mgg.database.reset_after_request()
162
163 def after_return(self, *args, **kwargs):
164 """
165 This is called after the task has returned, we should clean up.
166
167 We need to rollback the database to prevent ProgrammingError exceptions
168 from being raised.
169 """
170 # In eager mode we get DetachedInstanceError, we do rollback on_failure
171 # to deal with that case though when in eager mode.
172 if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']:
173 mgg.database.reset_after_request()
174
175
176 tasks.register(ProcessMedia)