Multimedia support - Commiting from a not yet finished state - Details below
[mediagoblin.git] / mediagoblin / media_types / video / processing.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 import Image
18 import tempfile
19
20 from celery.task import Task
21 from celery import registry
22
23 from mediagoblin.db.util import ObjectId
24 from mediagoblin import mg_globals as mgg
25
26 from mediagoblin.util import lazy_pass_to_ugettext as _
27
28 import gobject
29
30 import gst
31 import arista
32
33 from arista.transcoder import TranscoderOptions
34
35 THUMB_SIZE = 180, 180
36 MEDIUM_SIZE = 640, 640
37 ARISTA_DEVICE_KEY = 'web'
38
39
40 loop = None
41
42
43 def process_video(entry):
44 """
45 Code to process a video
46 """
47 info = {}
48 workbench = mgg.workbench_manager.create_workbench()
49
50 queued_filepath = entry['queued_media_file']
51 queued_filename = workbench.localized_file(
52 mgg.queue_store, queued_filepath,
53 'source')
54
55 arista.init()
56
57 devices = arista.presets.get()
58 device = devices[ARISTA_DEVICE_KEY]
59
60 queue = arista.queue.TranscodeQueue()
61
62 info['tmp_file'] = tmp_file = tempfile.NamedTemporaryFile()
63
64 info['medium_filepath'] = medium_filepath = create_pub_filepath(entry, 'video.webm')
65
66 output = tmp_file.name
67
68 uri = 'file://' + queued_filename
69
70 preset = device.presets[device.default]
71
72 opts = TranscoderOptions(uri, preset, output)
73
74 queue.append(opts)
75
76 info['entry'] = entry
77
78 queue.connect("entry-start", entry_start, info)
79 # queue.connect("entry-pass-setup", entry_pass_setup, options)
80 queue.connect("entry-error", entry_error, info)
81 queue.connect("entry-complete", entry_complete, info)
82
83 info['loop'] = loop = gobject.MainLoop()
84
85 loop.run()
86
87 # we have to re-read because unlike PIL, not everything reads
88 # things in string representation :)
89 queued_file = file(queued_filename, 'rb')
90
91 with queued_file:
92 original_filepath = create_pub_filepath(entry, queued_filepath[-1])
93
94 with mgg.public_store.get_file(original_filepath, 'wb') as original_file:
95 original_file.write(queued_file.read())
96
97 mgg.queue_store.delete_file(queued_filepath)
98 entry['queued_media_file'] = []
99 media_files_dict = entry.setdefault('media_files', {})
100 media_files_dict['original'] = original_filepath
101
102 # clean up workbench
103 workbench.destroy_self()
104
105
106 def create_pub_filepath(entry, filename):
107 return mgg.public_store.get_unique_filepath(
108 ['media_entries',
109 unicode(entry['_id']),
110 filename])
111
112
113 class BaseProcessingFail(Exception):
114 """
115 Base exception that all other processing failure messages should
116 subclass from.
117
118 You shouldn't call this itself; instead you should subclass it
119 and provid the exception_path and general_message applicable to
120 this error.
121 """
122 general_message = u''
123
124 @property
125 def exception_path(self):
126 return u"%s:%s" % (
127 self.__class__.__module__, self.__class__.__name__)
128
129 def __init__(self, **metadata):
130 self.metadata = metadata or {}
131
132
133 class BadMediaFail(BaseProcessingFail):
134 """
135 Error that should be raised when an inappropriate file was given
136 for the media type specified.
137 """
138 general_message = _(u'Invalid file given for media type.')
139
140
141 ################################
142 # Media processing initial steps
143 ################################
144
145 class ProcessMedia(Task):
146 """
147 Pass this entry off for processing.
148 """
149 def run(self, media_id):
150 """
151 Pass the media entry off to the appropriate processing function
152 (for now just process_image...)
153 """
154 entry = mgg.database.MediaEntry.one(
155 {'_id': ObjectId(media_id)})
156
157 # Try to process, and handle expected errors.
158 try:
159 process_video(entry)
160 except BaseProcessingFail, exc:
161 mark_entry_failed(entry[u'_id'], exc)
162 return
163
164 entry['state'] = u'processed'
165 entry.save()
166
167 def on_failure(self, exc, task_id, args, kwargs, einfo):
168 """
169 If the processing failed we should mark that in the database.
170
171 Assuming that the exception raised is a subclass of BaseProcessingFail,
172 we can use that to get more information about the failure and store that
173 for conveying information to users about the failure, etc.
174 """
175 entry_id = args[0]
176 mark_entry_failed(entry_id, exc)
177
178
179 process_media = registry.tasks[ProcessMedia.name]
180
181
182 def mark_entry_failed(entry_id, exc):
183 """
184 Mark a media entry as having failed in its conversion.
185
186 Uses the exception that was raised to mark more information. If the
187 exception is a derivative of BaseProcessingFail then we can store extra
188 information that can be useful for users telling them why their media failed
189 to process.
190
191 Args:
192 - entry_id: The id of the media entry
193
194 """
195 # Was this a BaseProcessingFail? In other words, was this a
196 # type of error that we know how to handle?
197 if isinstance(exc, BaseProcessingFail):
198 # Looks like yes, so record information about that failure and any
199 # metadata the user might have supplied.
200 mgg.database['media_entries'].update(
201 {'_id': entry_id},
202 {'$set': {u'state': u'failed',
203 u'fail_error': exc.exception_path,
204 u'fail_metadata': exc.metadata}})
205 else:
206 # Looks like no, so just mark it as failed and don't record a
207 # failure_error (we'll assume it wasn't handled) and don't record
208 # metadata (in fact overwrite it if somehow it had previous info
209 # here)
210 mgg.database['media_entries'].update(
211 {'_id': entry_id},
212 {'$set': {u'state': u'failed',
213 u'fail_error': None,
214 u'fail_metadata': {}}})
215
216
217 def entry_start(queue, entry, options):
218 print(queue, entry, options)
219
220 def entry_complete(queue, entry, info):
221 entry.transcoder.stop()
222 gobject.idle_add(info['loop'].quit)
223
224 with info['tmp_file'] as tmp_file:
225 mgg.public_store.get_file(info['medium_filepath'], 'wb').write(
226 tmp_file.read())
227 info['entry']['media_files']['medium'] = info['medium_filepath']
228
229 print('\n=== DONE! ===\n')
230
231 print(queue, entry, info)
232
233 def entry_error(queue, entry, options):
234 print(queue, entry, options)
235
236 def signal_handler(signum, frame):
237 """
238 Handle Ctr-C gracefully and shut down the transcoder.
239 """
240 global interrupted
241 print
242 print _("Interrupt caught. Cleaning up... (Ctrl-C to force exit)")
243 interrupted = True
244 signal.signal(signal.SIGINT, signal.SIG_DFL)
245
246 def check_interrupted():
247 """
248 Check whether we have been interrupted by Ctrl-C and stop the
249 transcoder.
250 """
251 if interrupted:
252 try:
253 source = transcoder.pipe.get_by_name("source")
254 source.send_event(gst.event_new_eos())
255 except:
256 # Something pretty bad happened... just exit!
257 gobject.idle_add(loop.quit)
258
259 return False
260 return True