Commit | Line | Data |
---|---|---|
eace050a E |
1 | # GNU MediaGoblin -- federated, autonomous media hosting |
2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. | |
3 | # | |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | import logging | |
2cfffd5e SS |
18 | import urllib |
19 | import urllib2 | |
eace050a | 20 | |
2cfffd5e | 21 | from celery import registry, task |
eace050a E |
22 | |
23 | from mediagoblin import mg_globals as mgg | |
b0c8328e | 24 | from mediagoblin.db.models import MediaEntry |
eace050a | 25 | from mediagoblin.processing import mark_entry_failed, BaseProcessingFail |
5354f954 | 26 | from mediagoblin.tools.processing import json_processing_callback |
eace050a E |
27 | |
28 | _log = logging.getLogger(__name__) | |
51eb0267 JW |
29 | logging.basicConfig() |
30 | _log.setLevel(logging.DEBUG) | |
eace050a E |
31 | |
32 | ||
2cfffd5e SS |
33 | @task.task(default_retry_delay=2 * 60) |
34 | def handle_push_urls(feed_url): | |
35 | """Subtask, notifying the PuSH servers of new content | |
36 | ||
37 | Retry 3 times every 2 minutes if run in separate process before failing.""" | |
38 | if not mgg.app_config["push_urls"]: | |
39 | return # Nothing to do | |
40 | _log.debug('Notifying Push servers for feed {0}'.format(feed_url)) | |
41 | hubparameters = { | |
42 | 'hub.mode': 'publish', | |
43 | 'hub.url': feed_url} | |
44 | hubdata = urllib.urlencode(hubparameters) | |
45 | hubheaders = { | |
46 | "Content-type": "application/x-www-form-urlencoded", | |
47 | "Connection": "close"} | |
48 | for huburl in mgg.app_config["push_urls"]: | |
49 | hubrequest = urllib2.Request(huburl, hubdata, hubheaders) | |
50 | try: | |
51 | hubresponse = urllib2.urlopen(hubrequest) | |
52 | except (urllib2.HTTPError, urllib2.URLError) as exc: | |
53 | # We retry by default 3 times before failing | |
54 | _log.info("PuSH url %r gave error %r", huburl, exc) | |
55 | try: | |
56 | return handle_push_urls.retry(exc=exc, throw=False) | |
57 | except Exception as e: | |
58 | # All retries failed, Failure is no tragedy here, probably. | |
59 | _log.warn('Failed to notify PuSH server for feed {0}. ' | |
60 | 'Giving up.'.format(feed_url)) | |
61 | return False | |
62 | ||
eace050a E |
63 | ################################ |
64 | # Media processing initial steps | |
65 | ################################ | |
66 | ||
2cfffd5e | 67 | class ProcessMedia(task.Task): |
eace050a | 68 | """ |
eace050a E |
69 | Pass this entry off for processing. |
70 | """ | |
2cfffd5e | 71 | def run(self, media_id, feed_url): |
eace050a E |
72 | """ |
73 | Pass the media entry off to the appropriate processing function | |
74 | (for now just process_image...) | |
2cfffd5e SS |
75 | |
76 | :param feed_url: The feed URL that the PuSH server needs to be | |
77 | updated for. | |
eace050a | 78 | """ |
71717fd5 | 79 | entry = MediaEntry.query.get(media_id) |
eace050a E |
80 | |
81 | # Try to process, and handle expected errors. | |
82 | try: | |
64712915 JW |
83 | entry.state = u'processing' |
84 | entry.save() | |
85 | ||
eace050a | 86 | _log.debug('Processing {0}'.format(entry)) |
64712915 | 87 | |
6af6bc05 | 88 | # run the processing code |
5f8b4ae8 | 89 | entry.media_manager['processor'](entry) |
64712915 | 90 | |
6af6bc05 CAW |
91 | # We set the state to processed and save the entry here so there's |
92 | # no need to save at the end of the processing stage, probably ;) | |
64712915 JW |
93 | entry.state = u'processed' |
94 | entry.save() | |
95 | ||
2cfffd5e SS |
96 | # Notify the PuSH servers as async task |
97 | handle_push_urls.subtask().delay(feed_url) | |
98 | ||
5354f954 | 99 | json_processing_callback(entry) |
51eb0267 | 100 | except BaseProcessingFail as exc: |
5c2b8486 | 101 | mark_entry_failed(entry.id, exc) |
5354f954 | 102 | json_processing_callback(entry) |
eace050a | 103 | return |
64712915 | 104 | |
51eb0267 | 105 | except ImportError as exc: |
eace050a E |
106 | _log.error( |
107 | 'Entry {0} failed to process due to an import error: {1}'\ | |
108 | .format( | |
109 | entry.title, | |
110 | exc)) | |
111 | ||
5c2b8486 | 112 | mark_entry_failed(entry.id, exc) |
5354f954 | 113 | json_processing_callback(entry) |
eace050a | 114 | |
2891b2c6 JW |
115 | except Exception as exc: |
116 | _log.error('An unhandled exception was raised while' | |
117 | + ' processing {0}'.format( | |
118 | entry)) | |
119 | ||
5c2b8486 | 120 | mark_entry_failed(entry.id, exc) |
5354f954 | 121 | json_processing_callback(entry) |
2891b2c6 JW |
122 | raise |
123 | ||
eace050a E |
124 | def on_failure(self, exc, task_id, args, kwargs, einfo): |
125 | """ | |
126 | If the processing failed we should mark that in the database. | |
127 | ||
128 | Assuming that the exception raised is a subclass of | |
129 | BaseProcessingFail, we can use that to get more information | |
130 | about the failure and store that for conveying information to | |
131 | users about the failure, etc. | |
132 | """ | |
133 | entry_id = args[0] | |
134 | mark_entry_failed(entry_id, exc) | |
5354f954 | 135 | |
939d57a0 | 136 | entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first() |
5354f954 | 137 | json_processing_callback(entry) |
2cfffd5e SS |
138 | |
139 | # Register the task | |
140 | process_media = registry.tasks[ProcessMedia.name] | |
141 |