Commit | Line | Data |
---|---|---|
eace050a E |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | import logging | |
120fa4ae BP |
18 | |
19 | from six.moves.urllib import request, parse | |
eace050a | 20 | |
b5059525 | 21 | import celery |
bf2dafd1 | 22 | from celery.registry import tasks |
eace050a E |
23 | |
24 | from mediagoblin import mg_globals as mgg | |
77ea4c9b | 25 | from . import mark_entry_failed, BaseProcessingFail |
5354f954 | 26 | from mediagoblin.tools.processing import json_processing_callback |
55cfa340 | 27 | from mediagoblin.processing import get_entry_and_processing_manager |
eace050a E |
28 | |
29 | _log = logging.getLogger(__name__) | |
51eb0267 JW |
30 | logging.basicConfig() |
31 | _log.setLevel(logging.DEBUG) | |
eace050a E |
32 | |
33 | ||
b5059525 | 34 | @celery.task(default_retry_delay=2 * 60) |
2cfffd5e SS |
35 | def handle_push_urls(feed_url): |
36 | """Subtask, notifying the PuSH servers of new content | |
37 | ||
38 | Retry 3 times every 2 minutes if run in separate process before failing.""" | |
39 | if not mgg.app_config["push_urls"]: | |
40 | return # Nothing to do | |
41 | _log.debug('Notifying Push servers for feed {0}'.format(feed_url)) | |
42 | hubparameters = { | |
43 | 'hub.mode': 'publish', | |
44 | 'hub.url': feed_url} | |
120fa4ae | 45 | hubdata = parse.urlencode(hubparameters) |
2cfffd5e SS |
46 | hubheaders = { |
47 | "Content-type": "application/x-www-form-urlencoded", | |
48 | "Connection": "close"} | |
49 | for huburl in mgg.app_config["push_urls"]: | |
120fa4ae | 50 | hubrequest = request.Request(huburl, hubdata, hubheaders) |
2cfffd5e | 51 | try: |
120fa4ae BP |
52 | hubresponse = request.urlopen(hubrequest) |
53 | except (request.HTTPError, request.URLError) as exc: | |
2cfffd5e SS |
54 | # We retry by default 3 times before failing |
55 | _log.info("PuSH url %r gave error %r", huburl, exc) | |
56 | try: | |
57 | return handle_push_urls.retry(exc=exc, throw=False) | |
58 | except Exception as e: | |
59 | # All retries failed, Failure is no tragedy here, probably. | |
60 | _log.warn('Failed to notify PuSH server for feed {0}. ' | |
61 | 'Giving up.'.format(feed_url)) | |
62 | return False | |
63 | ||
b5059525 | 64 | |
eace050a E |
65 | ################################ |
66 | # Media processing initial steps | |
67 | ################################ | |
b5059525 RE |
68 | class ProcessMedia(celery.Task): |
69 | """ | |
70 | Pass this entry off for processing. | |
71 | """ | |
2e1e9650 BB |
72 | |
73 | name = 'process_media' | |
74 | ||
77ea4c9b | 75 | def run(self, media_id, feed_url, reprocess_action, reprocess_info=None): |
eace050a E |
76 | """ |
77 | Pass the media entry off to the appropriate processing function | |
78 | (for now just process_image...) | |
2cfffd5e | 79 | |
067ee131 | 80 | :param media_id: MediaEntry().id |
2cfffd5e SS |
81 | :param feed_url: The feed URL that the PuSH server needs to be |
82 | updated for. | |
067ee131 BB |
83 | :param reprocess_action: What particular action should be run. For |
84 | example, 'initial'. | |
9a2c66ca RE |
85 | :param reprocess: A dict containing all of the necessary reprocessing |
86 | info for the media_type. | |
eace050a | 87 | """ |
77ea4c9b | 88 | reprocess_info = reprocess_info or {} |
55cfa340 | 89 | entry, manager = get_entry_and_processing_manager(media_id) |
eace050a E |
90 | |
91 | # Try to process, and handle expected errors. | |
92 | try: | |
77ea4c9b CAW |
93 | processor_class = manager.get_processor(reprocess_action, entry) |
94 | ||
22479c39 CAW |
95 | with processor_class(manager, entry) as processor: |
96 | # Initial state change has to be here because | |
97 | # the entry.state gets recorded on processor_class init | |
98 | entry.state = u'processing' | |
99 | entry.save() | |
64712915 | 100 | |
22479c39 | 101 | _log.debug('Processing {0}'.format(entry)) |
64712915 | 102 | |
7d3fda06 RE |
103 | try: |
104 | processor.process(**reprocess_info) | |
105 | except Exception as exc: | |
106 | if processor.entry_orig_state == 'processed': | |
107 | _log.error( | |
108 | 'Entry {0} failed to process due to the following' | |
109 | ' error: {1}'.format(entry.id, exc)) | |
110 | _log.info( | |
111 | 'Setting entry.state back to "processed"') | |
112 | pass | |
113 | else: | |
114 | raise | |
64712915 | 115 | |
6af6bc05 CAW |
116 | # We set the state to processed and save the entry here so there's |
117 | # no need to save at the end of the processing stage, probably ;) | |
64712915 JW |
118 | entry.state = u'processed' |
119 | entry.save() | |
120 | ||
2cfffd5e | 121 | # Notify the PuSH servers as async task |
c7b3d070 SS |
122 | if mgg.app_config["push_urls"] and feed_url: |
123 | handle_push_urls.subtask().delay(feed_url) | |
2cfffd5e | 124 | |
5354f954 | 125 | json_processing_callback(entry) |
51eb0267 | 126 | except BaseProcessingFail as exc: |
5c2b8486 | 127 | mark_entry_failed(entry.id, exc) |
5354f954 | 128 | json_processing_callback(entry) |
eace050a | 129 | return |
64712915 | 130 | |
51eb0267 | 131 | except ImportError as exc: |
eace050a E |
132 | _log.error( |
133 | 'Entry {0} failed to process due to an import error: {1}'\ | |
134 | .format( | |
135 | entry.title, | |
136 | exc)) | |
137 | ||
5c2b8486 | 138 | mark_entry_failed(entry.id, exc) |
5354f954 | 139 | json_processing_callback(entry) |
eace050a | 140 | |
2891b2c6 JW |
141 | except Exception as exc: |
142 | _log.error('An unhandled exception was raised while' | |
143 | + ' processing {0}'.format( | |
144 | entry)) | |
145 | ||
5c2b8486 | 146 | mark_entry_failed(entry.id, exc) |
5354f954 | 147 | json_processing_callback(entry) |
2891b2c6 JW |
148 | raise |
149 | ||
eace050a E |
150 | def on_failure(self, exc, task_id, args, kwargs, einfo): |
151 | """ | |
152 | If the processing failed we should mark that in the database. | |
153 | ||
154 | Assuming that the exception raised is a subclass of | |
155 | BaseProcessingFail, we can use that to get more information | |
156 | about the failure and store that for conveying information to | |
157 | users about the failure, etc. | |
158 | """ | |
159 | entry_id = args[0] | |
160 | mark_entry_failed(entry_id, exc) | |
5354f954 | 161 | |
939d57a0 | 162 | entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() |
5354f954 | 163 | json_processing_callback(entry) |
fbb3ee50 JT |
164 | mgg.database.reset_after_request() |
165 | ||
166 | def after_return(self, *args, **kwargs): | |
167 | """ | |
168 | This is called after the task has returned, we should clean up. | |
169 | ||
170 | We need to rollback the database to prevent ProgrammingError exceptions | |
171 | from being raised. | |
172 | """ | |
173 | # In eager mode we get DetachedInstanceError, we do rollback on_failure | |
174 | # to deal with that case though when in eager mode. | |
175 | if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']: | |
176 | mgg.database.reset_after_request() | |
177 | ||
2cfffd5e | 178 | |
bf2dafd1 | 179 | tasks.register(ProcessMedia) |