Make PuSHing the Pubhubsubbub server an async task (#436, #585)
[mediagoblin.git] / mediagoblin / processing / task.py
CommitLineData
eace050a
E
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17import logging
2cfffd5e
SS
18import urllib
19import urllib2
eace050a 20
2cfffd5e 21from celery import registry, task
eace050a
E
22
23from mediagoblin import mg_globals as mgg
b0c8328e 24from mediagoblin.db.models import MediaEntry
eace050a 25from mediagoblin.processing import mark_entry_failed, BaseProcessingFail
5354f954 26from mediagoblin.tools.processing import json_processing_callback
eace050a
E
27
28_log = logging.getLogger(__name__)
51eb0267
JW
29logging.basicConfig()
30_log.setLevel(logging.DEBUG)
eace050a
E
31
32
2cfffd5e
SS
33@task.task(default_retry_delay=2 * 60)
34def handle_push_urls(feed_url):
35 """Subtask, notifying the PuSH servers of new content
36
37 Retry 3 times every 2 minutes if run in separate process before failing."""
38 if not mgg.app_config["push_urls"]:
39 return # Nothing to do
40 _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
41 hubparameters = {
42 'hub.mode': 'publish',
43 'hub.url': feed_url}
44 hubdata = urllib.urlencode(hubparameters)
45 hubheaders = {
46 "Content-type": "application/x-www-form-urlencoded",
47 "Connection": "close"}
48 for huburl in mgg.app_config["push_urls"]:
49 hubrequest = urllib2.Request(huburl, hubdata, hubheaders)
50 try:
51 hubresponse = urllib2.urlopen(hubrequest)
52 except (urllib2.HTTPError, urllib2.URLError) as exc:
53 # We retry by default 3 times before failing
54 _log.info("PuSH url %r gave error %r", huburl, exc)
55 try:
56 return handle_push_urls.retry(exc=exc, throw=False)
57 except Exception as e:
58 # All retries failed, Failure is no tragedy here, probably.
59 _log.warn('Failed to notify PuSH server for feed {0}. '
60 'Giving up.'.format(feed_url))
61 return False
62
eace050a
E
63################################
64# Media processing initial steps
65################################
66
2cfffd5e 67class ProcessMedia(task.Task):
eace050a 68 """
eace050a
E
69 Pass this entry off for processing.
70 """
2cfffd5e 71 def run(self, media_id, feed_url):
eace050a
E
72 """
73 Pass the media entry off to the appropriate processing function
74 (for now just process_image...)
2cfffd5e
SS
75
76 :param feed_url: The feed URL that the PuSH server needs to be
77 updated for.
eace050a 78 """
71717fd5 79 entry = MediaEntry.query.get(media_id)
eace050a
E
80
81 # Try to process, and handle expected errors.
82 try:
64712915
JW
83 entry.state = u'processing'
84 entry.save()
85
eace050a 86 _log.debug('Processing {0}'.format(entry))
64712915 87
6af6bc05 88 # run the processing code
5f8b4ae8 89 entry.media_manager['processor'](entry)
64712915 90
6af6bc05
CAW
91 # We set the state to processed and save the entry here so there's
92 # no need to save at the end of the processing stage, probably ;)
64712915
JW
93 entry.state = u'processed'
94 entry.save()
95
2cfffd5e
SS
96 # Notify the PuSH servers as async task
97 handle_push_urls.subtask().delay(feed_url)
98
5354f954 99 json_processing_callback(entry)
51eb0267 100 except BaseProcessingFail as exc:
5c2b8486 101 mark_entry_failed(entry.id, exc)
5354f954 102 json_processing_callback(entry)
eace050a 103 return
64712915 104
51eb0267 105 except ImportError as exc:
eace050a
E
106 _log.error(
107 'Entry {0} failed to process due to an import error: {1}'\
108 .format(
109 entry.title,
110 exc))
111
5c2b8486 112 mark_entry_failed(entry.id, exc)
5354f954 113 json_processing_callback(entry)
eace050a 114
2891b2c6
JW
115 except Exception as exc:
116 _log.error('An unhandled exception was raised while'
117 + ' processing {0}'.format(
118 entry))
119
5c2b8486 120 mark_entry_failed(entry.id, exc)
5354f954 121 json_processing_callback(entry)
2891b2c6
JW
122 raise
123
eace050a
E
124 def on_failure(self, exc, task_id, args, kwargs, einfo):
125 """
126 If the processing failed we should mark that in the database.
127
128 Assuming that the exception raised is a subclass of
129 BaseProcessingFail, we can use that to get more information
130 about the failure and store that for conveying information to
131 users about the failure, etc.
132 """
133 entry_id = args[0]
134 mark_entry_failed(entry_id, exc)
5354f954 135
939d57a0 136 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
5354f954 137 json_processing_callback(entry)
2cfffd5e
SS
138
139# Register the task
140process_media = registry.tasks[ProcessMedia.name]
141