Pull changes and resolve merge conflict.
[mediagoblin.git] / mediagoblin / processing / task.py
CommitLineData
eace050a
E
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17import logging
2cfffd5e
SS
18import urllib
19import urllib2
eace050a 20
b5059525 21import celery
bf2dafd1 22from celery.registry import tasks
eace050a
E
23
24from mediagoblin import mg_globals as mgg
77ea4c9b 25from . import mark_entry_failed, BaseProcessingFail
5354f954 26from mediagoblin.tools.processing import json_processing_callback
55cfa340 27from mediagoblin.processing import get_entry_and_processing_manager
eace050a
E
28
29_log = logging.getLogger(__name__)
51eb0267
JW
30logging.basicConfig()
31_log.setLevel(logging.DEBUG)
eace050a
E
32
33
b5059525 34@celery.task(default_retry_delay=2 * 60)
2cfffd5e
SS
35def handle_push_urls(feed_url):
36 """Subtask, notifying the PuSH servers of new content
37
38 Retry 3 times every 2 minutes if run in separate process before failing."""
39 if not mgg.app_config["push_urls"]:
40 return # Nothing to do
41 _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
42 hubparameters = {
43 'hub.mode': 'publish',
44 'hub.url': feed_url}
45 hubdata = urllib.urlencode(hubparameters)
46 hubheaders = {
47 "Content-type": "application/x-www-form-urlencoded",
48 "Connection": "close"}
49 for huburl in mgg.app_config["push_urls"]:
50 hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
51 try:
52 hubresponse = urllib2.urlopen(hubrequest)
53 except (urllib2.HTTPError, urllib2.URLError) as exc:
54 # We retry by default 3 times before failing
55 _log.info("PuSH url %r gave error %r", huburl, exc)
56 try:
57 return handle_push_urls.retry(exc=exc, throw=False)
58 except Exception as e:
59 # All retries failed, Failure is no tragedy here, probably.
60 _log.warn('Failed to notify PuSH server for feed {0}. '
61 'Giving up.'.format(feed_url))
62 return False
63
b5059525 64
eace050a
E
65################################
66# Media processing initial steps
67################################
b5059525
RE
68class ProcessMedia(celery.Task):
69 """
70 Pass this entry off for processing.
71 """
77ea4c9b 72 def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
eace050a
E
73 """
74 Pass the media entry off to the appropriate processing function
75 (for now just process_image...)
2cfffd5e
SS
76
77 :param feed_url: The feed URL that the PuSH server needs to be
78 updated for.
9a2c66ca
RE
79 :param reprocess: A dict containing all of the necessary reprocessing
80 info for the media_type.
eace050a 81 """
77ea4c9b 82 reprocess_info = reprocess_info or {}
55cfa340 83 entry, manager = get_entry_and_processing_manager(media_id)
eace050a
E
84
85 # Try to process, and handle expected errors.
86 try:
77ea4c9b
CAW
87 processor_class = manager.get_processor(reprocess_action, entry)
88
22479c39
CAW
89 with processor_class(manager, entry) as processor:
90 # Initial state change has to be here because
91 # the entry.state gets recorded on processor_class init
92 entry.state = u'processing'
93 entry.save()
64712915 94
22479c39 95 _log.debug('Processing {0}'.format(entry))
64712915 96
7d3fda06
RE
97 try:
98 processor.process(**reprocess_info)
99 except Exception as exc:
100 if processor.entry_orig_state == 'processed':
101 _log.error(
102 'Entry {0} failed to process due to the following'
103 ' error: {1}'.format(entry.id, exc))
104 _log.info(
105 'Setting entry.state back to "processed"')
106 pass
107 else:
108 raise
64712915 109
6af6bc05
CAW
110 # We set the state to processed and save the entry here so there's
111 # no need to save at the end of the processing stage, probably ;)
64712915
JW
112 entry.state = u'processed'
113 entry.save()
114
2cfffd5e 115 # Notify the PuSH servers as async task
c7b3d070
SS
116 if mgg.app_config["push_urls"] and feed_url:
117 handle_push_urls.subtask().delay(feed_url)
2cfffd5e 118
5354f954 119 json_processing_callback(entry)
51eb0267 120 except BaseProcessingFail as exc:
5c2b8486 121 mark_entry_failed(entry.id, exc)
5354f954 122 json_processing_callback(entry)
eace050a 123 return
64712915 124
51eb0267 125 except ImportError as exc:
eace050a
E
126 _log.error(
127 'Entry {0} failed to process due to an import error: {1}'\
128 .format(
129 entry.title,
130 exc))
131
5c2b8486 132 mark_entry_failed(entry.id, exc)
5354f954 133 json_processing_callback(entry)
eace050a 134
2891b2c6
JW
135 except Exception as exc:
136 _log.error('An unhandled exception was raised while'
137 + ' processing {0}'.format(
138 entry))
139
5c2b8486 140 mark_entry_failed(entry.id, exc)
5354f954 141 json_processing_callback(entry)
2891b2c6
JW
142 raise
143
eace050a
E
144 def on_failure(self, exc, task_id, args, kwargs, einfo):
145 """
146 If the processing failed we should mark that in the database.
147
148 Assuming that the exception raised is a subclass of
149 BaseProcessingFail, we can use that to get more information
150 about the failure and store that for conveying information to
151 users about the failure, etc.
152 """
153 entry_id = args[0]
154 mark_entry_failed(entry_id, exc)
5354f954 155
939d57a0 156 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
5354f954 157 json_processing_callback(entry)
2cfffd5e 158
bf2dafd1 159tasks.register(ProcessMedia)