Added support for http callbacks on processing
[mediagoblin.git] / mediagoblin / processing / task.py
CommitLineData
eace050a
E
1# GNU MediaGoblin -- federated, autonomous media hosting
2# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU Affero General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17import logging
18
19from celery.task import Task
20
21from mediagoblin import mg_globals as mgg
22from mediagoblin.db.util import ObjectId
23from mediagoblin.media_types import get_media_manager
24from mediagoblin.processing import mark_entry_failed, BaseProcessingFail
5354f954 25from mediagoblin.tools.processing import json_processing_callback
eace050a
E
26
27_log = logging.getLogger(__name__)
51eb0267
JW
28logging.basicConfig()
29_log.setLevel(logging.DEBUG)
eace050a
E
30
31
32################################
33# Media processing initial steps
34################################
35
36class ProcessMedia(Task):
37 """
eace050a
E
38 Pass this entry off for processing.
39 """
40 def run(self, media_id):
41 """
42 Pass the media entry off to the appropriate processing function
43 (for now just process_image...)
44 """
45 entry = mgg.database.MediaEntry.one(
46 {'_id': ObjectId(media_id)})
47
48 # Try to process, and handle expected errors.
49 try:
eace050a 50 manager = get_media_manager(entry.media_type)
64712915
JW
51
52 entry.state = u'processing'
53 entry.save()
54
eace050a 55 _log.debug('Processing {0}'.format(entry))
64712915 56
eace050a 57 manager['processor'](entry)
64712915
JW
58
59 entry.state = u'processed'
60 entry.save()
61
5354f954 62 json_processing_callback(entry)
51eb0267 63 except BaseProcessingFail as exc:
eace050a 64 mark_entry_failed(entry._id, exc)
5354f954 65 json_processing_callback(entry)
eace050a 66 return
64712915 67
51eb0267 68 except ImportError as exc:
eace050a
E
69 _log.error(
70 'Entry {0} failed to process due to an import error: {1}'\
71 .format(
72 entry.title,
73 exc))
74
75 mark_entry_failed(entry._id, exc)
5354f954 76 json_processing_callback(entry)
eace050a 77
2891b2c6
JW
78 except Exception as exc:
79 _log.error('An unhandled exception was raised while'
80 + ' processing {0}'.format(
81 entry))
82
83 mark_entry_failed(entry._id, exc)
5354f954 84 json_processing_callback(entry)
2891b2c6
JW
85 raise
86
eace050a
E
87 def on_failure(self, exc, task_id, args, kwargs, einfo):
88 """
89 If the processing failed we should mark that in the database.
90
91 Assuming that the exception raised is a subclass of
92 BaseProcessingFail, we can use that to get more information
93 about the failure and store that for conveying information to
94 users about the failure, etc.
95 """
96 entry_id = args[0]
97 mark_entry_failed(entry_id, exc)
5354f954
JW
98
99 entry = mgg.database.MediaEntry.query.filter_by(id=entry_id)
100 json_processing_callback(entry)