Commit | Line | Data |
---|---|---|
eace050a E |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | import logging | |
2cfffd5e SS |
18 | import urllib |
19 | import urllib2 | |
eace050a | 20 | |
b5059525 | 21 | import celery |
bf2dafd1 | 22 | from celery.registry import tasks |
eace050a E |
23 | |
24 | from mediagoblin import mg_globals as mgg | |
77ea4c9b | 25 | from . import mark_entry_failed, BaseProcessingFail |
5354f954 | 26 | from mediagoblin.tools.processing import json_processing_callback |
55cfa340 | 27 | from mediagoblin.processing import get_entry_and_processing_manager |
eace050a E |
28 | |
29 | _log = logging.getLogger(__name__) | |
51eb0267 JW |
30 | logging.basicConfig() |
31 | _log.setLevel(logging.DEBUG) | |
eace050a E |
32 | |
33 | ||
b5059525 | 34 | @celery.task(default_retry_delay=2 * 60) |
2cfffd5e SS |
35 | def handle_push_urls(feed_url): |
36 | """Subtask, notifying the PuSH servers of new content | |
37 | ||
38 | Retry 3 times every 2 minutes if run in separate process before failing.""" | |
39 | if not mgg.app_config["push_urls"]: | |
40 | return # Nothing to do | |
41 | _log.debug('Notifying Push servers for feed {0}'.format(feed_url)) | |
42 | hubparameters = { | |
43 | 'hub.mode': 'publish', | |
44 | 'hub.url': feed_url} | |
45 | hubdata = urllib.urlencode(hubparameters) | |
46 | hubheaders = { | |
47 | "Content-type": "application/x-www-form-urlencoded", | |
48 | "Connection": "close"} | |
49 | for huburl in mgg.app_config["push_urls"]: | |
50 | hubrequest = urllib2.Request(huburl, hubdata, hubheaders) | |
51 | try: | |
52 | hubresponse = urllib2.urlopen(hubrequest) | |
53 | except (urllib2.HTTPError, urllib2.URLError) as exc: | |
54 | # We retry by default 3 times before failing | |
55 | _log.info("PuSH url %r gave error %r", huburl, exc) | |
56 | try: | |
57 | return handle_push_urls.retry(exc=exc, throw=False) | |
58 | except Exception as e: | |
59 | # All retries failed, Failure is no tragedy here, probably. | |
60 | _log.warn('Failed to notify PuSH server for feed {0}. ' | |
61 | 'Giving up.'.format(feed_url)) | |
62 | return False | |
63 | ||
b5059525 | 64 | |
eace050a E |
65 | ################################ |
66 | # Media processing initial steps | |
67 | ################################ | |
b5059525 RE |
68 | class ProcessMedia(celery.Task): |
69 | """ | |
70 | Pass this entry off for processing. | |
71 | """ | |
77ea4c9b | 72 | def run(self, media_id, feed_url, reprocess_action, reprocess_info=None): |
eace050a E |
73 | """ |
74 | Pass the media entry off to the appropriate processing function | |
75 | (for now just process_image...) | |
2cfffd5e SS |
76 | |
77 | :param feed_url: The feed URL that the PuSH server needs to be | |
78 | updated for. | |
9a2c66ca RE |
79 | :param reprocess: A dict containing all of the necessary reprocessing |
80 | info for the media_type. | |
eace050a | 81 | """ |
77ea4c9b | 82 | reprocess_info = reprocess_info or {} |
55cfa340 | 83 | entry, manager = get_entry_and_processing_manager(media_id) |
eace050a E |
84 | |
85 | # Try to process, and handle expected errors. | |
86 | try: | |
77ea4c9b CAW |
87 | processor_class = manager.get_processor(reprocess_action, entry) |
88 | ||
22479c39 CAW |
89 | with processor_class(manager, entry) as processor: |
90 | # Initial state change has to be here because | |
91 | # the entry.state gets recorded on processor_class init | |
92 | entry.state = u'processing' | |
93 | entry.save() | |
64712915 | 94 | |
22479c39 | 95 | _log.debug('Processing {0}'.format(entry)) |
64712915 | 96 | |
7d3fda06 RE |
97 | try: |
98 | processor.process(**reprocess_info) | |
99 | except Exception as exc: | |
100 | if processor.entry_orig_state == 'processed': | |
101 | _log.error( | |
102 | 'Entry {0} failed to process due to the following' | |
103 | ' error: {1}'.format(entry.id, exc)) | |
104 | _log.info( | |
105 | 'Setting entry.state back to "processed"') | |
106 | pass | |
107 | else: | |
108 | raise | |
64712915 | 109 | |
6af6bc05 CAW |
110 | # We set the state to processed and save the entry here so there's |
111 | # no need to save at the end of the processing stage, probably ;) | |
64712915 JW |
112 | entry.state = u'processed' |
113 | entry.save() | |
114 | ||
2cfffd5e | 115 | # Notify the PuSH servers as async task |
c7b3d070 SS |
116 | if mgg.app_config["push_urls"] and feed_url: |
117 | handle_push_urls.subtask().delay(feed_url) | |
2cfffd5e | 118 | |
5354f954 | 119 | json_processing_callback(entry) |
51eb0267 | 120 | except BaseProcessingFail as exc: |
5c2b8486 | 121 | mark_entry_failed(entry.id, exc) |
5354f954 | 122 | json_processing_callback(entry) |
eace050a | 123 | return |
64712915 | 124 | |
51eb0267 | 125 | except ImportError as exc: |
eace050a E |
126 | _log.error( |
127 | 'Entry {0} failed to process due to an import error: {1}'\ | |
128 | .format( | |
129 | entry.title, | |
130 | exc)) | |
131 | ||
5c2b8486 | 132 | mark_entry_failed(entry.id, exc) |
5354f954 | 133 | json_processing_callback(entry) |
eace050a | 134 | |
2891b2c6 JW |
135 | except Exception as exc: |
136 | _log.error('An unhandled exception was raised while' | |
137 | + ' processing {0}'.format( | |
138 | entry)) | |
139 | ||
5c2b8486 | 140 | mark_entry_failed(entry.id, exc) |
5354f954 | 141 | json_processing_callback(entry) |
2891b2c6 JW |
142 | raise |
143 | ||
eace050a E |
144 | def on_failure(self, exc, task_id, args, kwargs, einfo): |
145 | """ | |
146 | If the processing failed we should mark that in the database. | |
147 | ||
148 | Assuming that the exception raised is a subclass of | |
149 | BaseProcessingFail, we can use that to get more information | |
150 | about the failure and store that for conveying information to | |
151 | users about the failure, etc. | |
152 | """ | |
153 | entry_id = args[0] | |
154 | mark_entry_failed(entry_id, exc) | |
5354f954 | 155 | |
939d57a0 | 156 | entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() |
5354f954 | 157 | json_processing_callback(entry) |
2cfffd5e | 158 | |
bf2dafd1 | 159 | tasks.register(ProcessMedia) |