1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from celery
import registry
, task
23 from mediagoblin
import mg_globals
as mgg
24 from mediagoblin
.db
.models
import MediaEntry
25 from . import mark_entry_failed
, BaseProcessingFail
, ProcessingState
26 from mediagoblin
.tools
.processing
import json_processing_callback
28 _log
= logging
.getLogger(__name__
)
30 _log
.setLevel(logging
.DEBUG
)
33 @task.task(default_retry_delay
=2 * 60)
34 def handle_push_urls(feed_url
):
35 """Subtask, notifying the PuSH servers of new content
37 Retry 3 times every 2 minutes if run in separate process before failing."""
38 if not mgg
.app_config
["push_urls"]:
39 return # Nothing to do
40 _log
.debug('Notifying Push servers for feed {0}'.format(feed_url
))
42 'hub.mode': 'publish',
44 hubdata
= urllib
.urlencode(hubparameters
)
46 "Content-type": "application/x-www-form-urlencoded",
47 "Connection": "close"}
48 for huburl
in mgg
.app_config
["push_urls"]:
49 hubrequest
= urllib2
.Request(huburl
, hubdata
, hubheaders
)
51 hubresponse
= urllib2
.urlopen(hubrequest
)
52 except (urllib2
.HTTPError
, urllib2
.URLError
) as exc
:
53 # We retry by default 3 times before failing
54 _log
.info("PuSH url %r gave error %r", huburl
, exc
)
56 return handle_push_urls
.retry(exc
=exc
, throw
=False)
57 except Exception as e
:
58 # All retries failed, Failure is no tragedy here, probably.
59 _log
.warn('Failed to notify PuSH server for feed {0}. '
60 'Giving up.'.format(feed_url
))
63 ################################
64 # Media processing initial steps
65 ################################
67 class ProcessMedia(task
.Task
):
69 Pass this entry off for processing.
71 def run(self
, media_id
, feed_url
):
73 Pass the media entry off to the appropriate processing function
74 (for now just process_image...)
76 :param feed_url: The feed URL that the PuSH server needs to be
79 entry
= MediaEntry
.query
.get(media_id
)
81 # Try to process, and handle expected errors.
83 entry
.state
= u
'processing'
86 _log
.debug('Processing {0}'.format(entry
))
88 proc_state
= ProcessingState(entry
)
89 with mgg
.workbench_manager
.create() as workbench
:
90 proc_state
.set_workbench(workbench
)
91 # run the processing code
92 entry
.media_manager
.processor(proc_state
)
94 # We set the state to processed and save the entry here so there's
95 # no need to save at the end of the processing stage, probably ;)
96 entry
.state
= u
'processed'
99 # Notify the PuSH servers as async task
100 if mgg
.app_config
["push_urls"] and feed_url
:
101 handle_push_urls
.subtask().delay(feed_url
)
103 json_processing_callback(entry
)
104 except BaseProcessingFail
as exc
:
105 mark_entry_failed(entry
.id, exc
)
106 json_processing_callback(entry
)
109 except ImportError as exc
:
111 'Entry {0} failed to process due to an import error: {1}'\
116 mark_entry_failed(entry
.id, exc
)
117 json_processing_callback(entry
)
119 except Exception as exc
:
120 _log
.error('An unhandled exception was raised while'
121 + ' processing {0}'.format(
124 mark_entry_failed(entry
.id, exc
)
125 json_processing_callback(entry
)
128 def on_failure(self
, exc
, task_id
, args
, kwargs
, einfo
):
130 If the processing failed we should mark that in the database.
132 Assuming that the exception raised is a subclass of
133 BaseProcessingFail, we can use that to get more information
134 about the failure and store that for conveying information to
135 users about the failure, etc.
138 mark_entry_failed(entry_id
, exc
)
140 entry
= mgg
.database
.MediaEntry
.query
.filter_by(id=entry_id
).first()
141 json_processing_callback(entry
)
144 process_media
= registry
.tasks
[ProcessMedia
.name
]