1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from six
.moves
.urllib
import request
, parse
22 from celery
.registry
import tasks
24 from mediagoblin
import mg_globals
as mgg
25 from . import mark_entry_failed
, BaseProcessingFail
26 from mediagoblin
.tools
.processing
import json_processing_callback
27 from mediagoblin
.processing
import get_entry_and_processing_manager
29 _log
= logging
.getLogger(__name__
)
31 _log
.setLevel(logging
.DEBUG
)
34 @celery.task(default_retry_delay
=2 * 60)
35 def handle_push_urls(feed_url
):
36 """Subtask, notifying the PuSH servers of new content
38 Retry 3 times every 2 minutes if run in separate process before failing."""
39 if not mgg
.app_config
["push_urls"]:
40 return # Nothing to do
41 _log
.debug('Notifying Push servers for feed {0}'.format(feed_url
))
43 'hub.mode': 'publish',
45 hubdata
= parse
.urlencode(hubparameters
)
47 "Content-type": "application/x-www-form-urlencoded",
48 "Connection": "close"}
49 for huburl
in mgg
.app_config
["push_urls"]:
50 hubrequest
= request
.Request(huburl
, hubdata
, hubheaders
)
52 hubresponse
= request
.urlopen(hubrequest
)
53 except (request
.HTTPError
, request
.URLError
) as exc
:
54 # We retry by default 3 times before failing
55 _log
.info("PuSH url %r gave error %r", huburl
, exc
)
57 return handle_push_urls
.retry(exc
=exc
, throw
=False)
58 except Exception as e
:
59 # All retries failed, Failure is no tragedy here, probably.
60 _log
.warn('Failed to notify PuSH server for feed {0}. '
61 'Giving up.'.format(feed_url
))
65 ################################
66 # Media processing initial steps
67 ################################
68 class ProcessMedia(celery
.Task
):
70 Pass this entry off for processing.
73 name
= 'process_media'
75 def run(self
, media_id
, feed_url
, reprocess_action
, reprocess_info
=None):
77 Pass the media entry off to the appropriate processing function
78 (for now just process_image...)
80 :param media_id: MediaEntry().id
81 :param feed_url: The feed URL that the PuSH server needs to be
83 :param reprocess_action: What particular action should be run. For
85 :param reprocess: A dict containing all of the necessary reprocessing
86 info for the media_type.
88 reprocess_info
= reprocess_info
or {}
89 entry
, manager
= get_entry_and_processing_manager(media_id
)
91 # Try to process, and handle expected errors.
93 processor_class
= manager
.get_processor(reprocess_action
, entry
)
95 with
processor_class(manager
, entry
) as processor
:
96 # Initial state change has to be here because
97 # the entry.state gets recorded on processor_class init
98 entry
.state
= u
'processing'
101 _log
.debug('Processing {0}'.format(entry
))
104 processor
.process(**reprocess_info
)
105 except Exception as exc
:
106 if processor
.entry_orig_state
== 'processed':
108 'Entry {0} failed to process due to the following'
109 ' error: {1}'.format(entry
.id, exc
))
111 'Setting entry.state back to "processed"')
116 # We set the state to processed and save the entry here so there's
117 # no need to save at the end of the processing stage, probably ;)
118 entry
.state
= u
'processed'
121 # Notify the PuSH servers as async task
122 if mgg
.app_config
["push_urls"] and feed_url
:
123 handle_push_urls
.subtask().delay(feed_url
)
125 json_processing_callback(entry
)
126 except BaseProcessingFail
as exc
:
127 mark_entry_failed(entry
.id, exc
)
128 json_processing_callback(entry
)
131 except ImportError as exc
:
133 'Entry {0} failed to process due to an import error: {1}'\
138 mark_entry_failed(entry
.id, exc
)
139 json_processing_callback(entry
)
141 except Exception as exc
:
142 _log
.error('An unhandled exception was raised while'
143 + ' processing {0}'.format(
146 mark_entry_failed(entry
.id, exc
)
147 json_processing_callback(entry
)
150 def on_failure(self
, exc
, task_id
, args
, kwargs
, einfo
):
152 If the processing failed we should mark that in the database.
154 Assuming that the exception raised is a subclass of
155 BaseProcessingFail, we can use that to get more information
156 about the failure and store that for conveying information to
157 users about the failure, etc.
160 mark_entry_failed(entry_id
, exc
)
162 entry
= mgg
.database
.MediaEntry
.query
.filter_by(id=entry_id
).first()
163 json_processing_callback(entry
)
164 mgg
.database
.reset_after_request()
166 def after_return(self
, *args
, **kwargs
):
168 This is called after the task has returned, we should clean up.
170 We need to rollback the database to prevent ProgrammingError exceptions
173 # In eager mode we get DetachedInstanceError, we do rollback on_failure
174 # to deal with that case though when in eager mode.
175 if not celery
.app
.default_app
.conf
['CELERY_ALWAYS_EAGER']:
176 mgg
.database
.reset_after_request()
179 tasks
.register(ProcessMedia
)