Record the original state of the media entry in the processor
[mediagoblin.git] / mediagoblin / processing / task.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import logging
18 import urllib
19 import urllib2
20
21 from celery import registry, task
22
23 from mediagoblin import mg_globals as mgg
24 from . import mark_entry_failed, BaseProcessingFail
25 from mediagoblin.tools.processing import json_processing_callback
26 from mediagoblin.processing import get_entry_and_processing_manager
27
28 _log = logging.getLogger(__name__)
29 logging.basicConfig()
30 _log.setLevel(logging.DEBUG)
31
32
33 @task.task(default_retry_delay=2 * 60)
34 def handle_push_urls(feed_url):
35 """Subtask, notifying the PuSH servers of new content
36
37 Retry 3 times every 2 minutes if run in separate process before failing."""
38 if not mgg.app_config["push_urls"]:
39 return # Nothing to do
40 _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
41 hubparameters = {
42 'hub.mode': 'publish',
43 'hub.url': feed_url}
44 hubdata = urllib.urlencode(hubparameters)
45 hubheaders = {
46 "Content-type": "application/x-www-form-urlencoded",
47 "Connection": "close"}
48 for huburl in mgg.app_config["push_urls"]:
49 hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
50 try:
51 hubresponse = urllib2.urlopen(hubrequest)
52 except (urllib2.HTTPError, urllib2.URLError) as exc:
53 # We retry by default 3 times before failing
54 _log.info("PuSH url %r gave error %r", huburl, exc)
55 try:
56 return handle_push_urls.retry(exc=exc, throw=False)
57 except Exception as e:
58 # All retries failed, Failure is no tragedy here, probably.
59 _log.warn('Failed to notify PuSH server for feed {0}. '
60 'Giving up.'.format(feed_url))
61 return False
62
63 ################################
64 # Media processing initial steps
65 ################################
66
67 class ProcessMedia(task.Task):
68 """
69 Pass this entry off for processing.
70 """
71 def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
72 """
73 Pass the media entry off to the appropriate processing function
74 (for now just process_image...)
75
76 :param feed_url: The feed URL that the PuSH server needs to be
77 updated for.
78 :param reprocess: A dict containing all of the necessary reprocessing
79 info for the media_type.
80 """
81 reprocess_info = reprocess_info or {}
82 entry, manager = get_entry_and_processing_manager(media_id)
83
84 # Try to process, and handle expected errors.
85 try:
86 processor_class = manager.get_processor(reprocess_action, entry)
87
88 with processor_class(manager, entry) as processor:
89 # Initial state change has to be here because
90 # the entry.state gets recorded on processor_class init
91 entry.state = u'processing'
92 entry.save()
93
94 _log.debug('Processing {0}'.format(entry))
95
96 processor.process(**reprocess_info)
97
98 # We set the state to processed and save the entry here so there's
99 # no need to save at the end of the processing stage, probably ;)
100 entry.state = u'processed'
101 entry.save()
102
103 # Notify the PuSH servers as async task
104 if mgg.app_config["push_urls"] and feed_url:
105 handle_push_urls.subtask().delay(feed_url)
106
107 json_processing_callback(entry)
108 except BaseProcessingFail as exc:
109 mark_entry_failed(entry.id, exc)
110 json_processing_callback(entry)
111 return
112
113 except ImportError as exc:
114 _log.error(
115 'Entry {0} failed to process due to an import error: {1}'\
116 .format(
117 entry.title,
118 exc))
119
120 mark_entry_failed(entry.id, exc)
121 json_processing_callback(entry)
122
123 except Exception as exc:
124 _log.error('An unhandled exception was raised while'
125 + ' processing {0}'.format(
126 entry))
127
128 mark_entry_failed(entry.id, exc)
129 json_processing_callback(entry)
130 raise
131
132 def on_failure(self, exc, task_id, args, kwargs, einfo):
133 """
134 If the processing failed we should mark that in the database.
135
136 Assuming that the exception raised is a subclass of
137 BaseProcessingFail, we can use that to get more information
138 about the failure and store that for conveying information to
139 users about the failure, etc.
140 """
141 entry_id = args[0]
142 mark_entry_failed(entry_id, exc)
143
144 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
145 json_processing_callback(entry)
146
147 # Register the task
148 process_media = registry.tasks[ProcessMedia.name]
149