Use of deprecated frombytes when processing videos causes the task to fail
[mediagoblin.git] / mediagoblin / media_types / video / transcoders.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17 from __future__ import division
18
19 import os
20 import sys
21 import logging
22 import multiprocessing
23
24 from mediagoblin.media_types.tools import discover
25 from mediagoblin.tools.translate import lazy_pass_to_ugettext as _
26
27 #os.environ['GST_DEBUG'] = '4,python:4'
28
29 old_argv = sys.argv
30 sys.argv = []
31
32 import gi
33 gi.require_version('Gst', '1.0')
34 from gi.repository import GObject, Gst, GstPbutils
35 Gst.init(None)
36
37 sys.argv = old_argv
38 import struct
39 try:
40 from PIL import Image
41 except ImportError:
42 import Image
43
44 _log = logging.getLogger(__name__)
45
46 CPU_COUNT = 2
47
48 try:
49 CPU_COUNT = multiprocessing.cpu_count()
50 except NotImplementedError:
51 _log.warning('multiprocessing.cpu_count not implemented')
52
53 os.putenv('GST_DEBUG_DUMP_DOT_DIR', '/tmp')
54
55
56 def capture_thumb(video_path, dest_path, width=None, height=None, percent=0.5):
57 def pad_added(element, pad, connect_to):
58 '''This is a callback to dynamically add element to pipeline'''
59 caps = pad.query_caps(None)
60 name = caps.to_string()
61 _log.debug('on_pad_added: {0}'.format(name))
62 if name.startswith('video') and not connect_to.is_linked():
63 pad.link(connect_to)
64
65 # construct pipeline: uridecodebin ! videoconvert ! videoscale ! \
66 # ! CAPS ! appsink
67 pipeline = Gst.Pipeline()
68 uridecodebin = Gst.ElementFactory.make('uridecodebin', None)
69 uridecodebin.set_property('uri', 'file://{0}'.format(video_path))
70 videoconvert = Gst.ElementFactory.make('videoconvert', None)
71 uridecodebin.connect('pad-added', pad_added,
72 videoconvert.get_static_pad('sink'))
73 videoscale = Gst.ElementFactory.make('videoscale', None)
74
75 # create caps for video scaling
76 caps_struct = Gst.Structure.new_empty('video/x-raw')
77 caps_struct.set_value('pixel-aspect-ratio', Gst.Fraction(1, 1))
78 caps_struct.set_value('format', 'RGB')
79 if height:
80 caps_struct.set_value('height', height)
81 if width:
82 caps_struct.set_value('width', width)
83 caps = Gst.Caps.new_empty()
84 caps.append_structure(caps_struct)
85
86 # sink everything to memory
87 appsink = Gst.ElementFactory.make('appsink', None)
88 appsink.set_property('caps', caps)
89
90 # add everything to pipeline
91 elements = [uridecodebin, videoconvert, videoscale, appsink]
92 for e in elements:
93 pipeline.add(e)
94 videoconvert.link(videoscale)
95 videoscale.link(appsink)
96
97 # pipeline constructed, starting playing, but first some preparations
98 # seek to 50% of the file is required
99 pipeline.set_state(Gst.State.PAUSED)
100 # timeout of 3 seconds below was set experimentally
101 state = pipeline.get_state(Gst.SECOND * 3)
102 if state[0] != Gst.StateChangeReturn.SUCCESS:
103 _log.warning('state change failed, {0}'.format(state))
104 return
105
106 # get duration
107 (success, duration) = pipeline.query_duration(Gst.Format.TIME)
108 if not success:
109 _log.warning('query_duration failed')
110 return
111
112 seek_to = int(duration * int(percent * 100) / 100)
113 _log.debug('Seeking to {0} of {1}'.format(
114 float(seek_to) / Gst.SECOND, float(duration) / Gst.SECOND))
115 seek = pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, seek_to)
116 if not seek:
117 _log.warning('seek failed')
118 return
119
120 # get sample, retrieve it's format and save
121 sample = appsink.emit("pull-preroll")
122 if not sample:
123 _log.warning('could not get sample')
124 return
125 caps = sample.get_caps()
126 if not caps:
127 _log.warning('could not get snapshot format')
128 return
129 structure = caps.get_structure(0)
130 (success, width) = structure.get_int('width')
131 (success, height) = structure.get_int('height')
132 buffer = sample.get_buffer()
133
134 # get the image from the buffer and save it to disk
135 im = Image.fromstring('RGB', (width, height),
136 buffer.extract_dup(0, buffer.get_size()))
137 im.save(dest_path)
138 _log.info('thumbnail saved to {0}'.format(dest_path))
139
140 # cleanup
141 pipeline.set_state(Gst.State.NULL)
142
143
144 class VideoTranscoder(object):
145 '''
146 Video transcoder
147
148 Transcodes the SRC video file to a VP8 WebM video file at DST
149
150 - Produces a WebM vp8 and vorbis video file.
151 '''
152 def __init__(self):
153 _log.info('Initializing VideoTranscoder...')
154 self.progress_percentage = None
155 self.loop = GObject.MainLoop()
156
157 def transcode(self, src, dst, **kwargs):
158 '''
159 Transcode a video file into a 'medium'-sized version.
160 '''
161 self.source_path = src
162 self.destination_path = dst
163
164 # vp8enc options
165 self.destination_dimensions = kwargs.get('dimensions', (640, 640))
166 self.vp8_quality = kwargs.get('vp8_quality', 8)
167 # Number of threads used by vp8enc:
168 # number of real cores - 1 as per recommendation on
169 # <http://www.webmproject.org/tools/encoder-parameters/#6-multi-threaded-encode-and-decode>
170 self.vp8_threads = kwargs.get('vp8_threads', CPU_COUNT - 1)
171
172 # 0 means auto-detect, but dict.get() only falls back to CPU_COUNT
173 # if value is None, this will correct our incompatibility with
174 # dict.get()
175 # This will also correct cases where there's only 1 CPU core, see
176 # original self.vp8_threads assignment above.
177 if self.vp8_threads == 0:
178 self.vp8_threads = CPU_COUNT
179
180 # vorbisenc options
181 self.vorbis_quality = kwargs.get('vorbis_quality', 0.3)
182
183 self._progress_callback = kwargs.get('progress_callback') or None
184
185 if not type(self.destination_dimensions) == tuple:
186 raise Exception('dimensions must be tuple: (width, height)')
187
188 self._setup_pipeline()
189 self.data = discover(self.source_path)
190 self._link_elements()
191 self.__setup_videoscale_capsfilter()
192 self.pipeline.set_state(Gst.State.PLAYING)
193 _log.info('Transcoding...')
194 _log.debug('Initializing MainLoop()')
195 self.loop.run()
196
197
198 def _setup_pipeline(self):
199 _log.debug('Setting up transcoding pipeline')
200 # Create the pipeline bin.
201 self.pipeline = Gst.Pipeline.new('VideoTranscoderPipeline')
202
203 # Create all GStreamer elements, starting with
204 # filesrc & decoder
205 self.filesrc = Gst.ElementFactory.make('filesrc', 'filesrc')
206 self.filesrc.set_property('location', self.source_path)
207 self.pipeline.add(self.filesrc)
208
209 self.decoder = Gst.ElementFactory.make('decodebin', 'decoder')
210 self.decoder.connect('pad-added', self._on_dynamic_pad)
211 self.pipeline.add(self.decoder)
212
213 # Video elements
214 self.videoqueue = Gst.ElementFactory.make('queue', 'videoqueue')
215 self.pipeline.add(self.videoqueue)
216
217 self.videorate = Gst.ElementFactory.make('videorate', 'videorate')
218 self.pipeline.add(self.videorate)
219
220 self.videoconvert = Gst.ElementFactory.make('videoconvert',
221 'videoconvert')
222 self.pipeline.add(self.videoconvert)
223
224 self.videoscale = Gst.ElementFactory.make('videoscale', 'videoscale')
225 self.pipeline.add(self.videoscale)
226
227 self.capsfilter = Gst.ElementFactory.make('capsfilter', 'capsfilter')
228 self.pipeline.add(self.capsfilter)
229
230 self.vp8enc = Gst.ElementFactory.make('vp8enc', 'vp8enc')
231 self.vp8enc.set_property('threads', self.vp8_threads)
232 self.pipeline.add(self.vp8enc)
233
234 # Audio elements
235 self.audioqueue = Gst.ElementFactory.make('queue', 'audioqueue')
236 self.pipeline.add(self.audioqueue)
237
238 self.audiorate = Gst.ElementFactory.make('audiorate', 'audiorate')
239 self.audiorate.set_property('tolerance', 80000000)
240 self.pipeline.add(self.audiorate)
241
242 self.audioconvert = Gst.ElementFactory.make('audioconvert', 'audioconvert')
243 self.pipeline.add(self.audioconvert)
244 self.audiocapsfilter = Gst.ElementFactory.make('capsfilter',
245 'audiocapsfilter')
246 audiocaps = Gst.Caps.new_empty()
247 audiocaps_struct = Gst.Structure.new_empty('audio/x-raw')
248 audiocaps.append_structure(audiocaps_struct)
249 self.audiocapsfilter.set_property('caps', audiocaps)
250 self.pipeline.add(self.audiocapsfilter)
251
252 self.vorbisenc = Gst.ElementFactory.make('vorbisenc', 'vorbisenc')
253 self.vorbisenc.set_property('quality', self.vorbis_quality)
254 self.pipeline.add(self.vorbisenc)
255
256 # WebMmux & filesink
257 self.webmmux = Gst.ElementFactory.make('webmmux', 'webmmux')
258 self.pipeline.add(self.webmmux)
259
260 self.filesink = Gst.ElementFactory.make('filesink', 'filesink')
261 self.filesink.set_property('location', self.destination_path)
262 self.pipeline.add(self.filesink)
263
264 # Progressreport
265 self.progressreport = Gst.ElementFactory.make(
266 'progressreport', 'progressreport')
267 # Update every second
268 self.progressreport.set_property('update-freq', 1)
269 self.progressreport.set_property('silent', True)
270 self.pipeline.add(self.progressreport)
271
272 def _link_elements(self):
273 '''
274 Link all the elements
275
276 This code depends on data from the discoverer and is called
277 from __discovered
278 '''
279 _log.debug('linking elements')
280 # Link the filesrc element to the decoder. The decoder then emits
281 # 'new-decoded-pad' which links decoded src pads to either a video
282 # or audio sink
283 self.filesrc.link(self.decoder)
284 # link the rest
285 self.videoqueue.link(self.videorate)
286 self.videorate.link(self.videoconvert)
287 self.videoconvert.link(self.videoscale)
288 self.videoscale.link(self.capsfilter)
289 self.capsfilter.link(self.vp8enc)
290 self.vp8enc.link(self.webmmux)
291
292 if self.data.get_audio_streams():
293 self.audioqueue.link(self.audiorate)
294 self.audiorate.link(self.audioconvert)
295 self.audioconvert.link(self.audiocapsfilter)
296 self.audiocapsfilter.link(self.vorbisenc)
297 self.vorbisenc.link(self.webmmux)
298 self.webmmux.link(self.progressreport)
299 self.progressreport.link(self.filesink)
300
301 # Setup the message bus and connect _on_message to the pipeline
302 self._setup_bus()
303
304 def _on_dynamic_pad(self, dbin, pad):
305 '''
306 Callback called when ``decodebin`` has a pad that we can connect to
307 '''
308 # Intersect the capabilities of the video sink and the pad src
309 # Then check if they have no common capabilities.
310 if (self.videorate.get_static_pad('sink').get_pad_template()
311 .get_caps().intersect(pad.query_caps()).is_empty()):
312 # It is NOT a video src pad.
313 _log.debug('linking audio to the pad dynamically')
314 pad.link(self.audioqueue.get_static_pad('sink'))
315 else:
316 # It IS a video src pad.
317 _log.debug('linking video to the pad dynamically')
318 pad.link(self.videoqueue.get_static_pad('sink'))
319
320 def _setup_bus(self):
321 self.bus = self.pipeline.get_bus()
322 self.bus.add_signal_watch()
323 self.bus.connect('message', self._on_message)
324
325 def __setup_videoscale_capsfilter(self):
326 '''
327 Sets up the output format (width, height) for the video
328 '''
329 caps_struct = Gst.Structure.new_empty('video/x-raw')
330 caps_struct.set_value('pixel-aspect-ratio', Gst.Fraction(1, 1))
331 caps_struct.set_value('framerate', Gst.Fraction(30, 1))
332 video_info = self.data.get_video_streams()[0]
333 if video_info.get_height() > video_info.get_width():
334 # portrait
335 caps_struct.set_value('height', self.destination_dimensions[1])
336 else:
337 # landscape
338 caps_struct.set_value('width', self.destination_dimensions[0])
339 caps = Gst.Caps.new_empty()
340 caps.append_structure(caps_struct)
341 self.capsfilter.set_property('caps', caps)
342
343 def _on_message(self, bus, message):
344 _log.debug((bus, message, message.type))
345 if message.type == Gst.MessageType.EOS:
346 self.dst_data = discover(self.destination_path)
347 self.__stop()
348 _log.info('Done')
349 elif message.type == Gst.MessageType.ELEMENT:
350 if message.has_name('progress'):
351 structure = message.get_structure()
352 # Update progress state if it has changed
353 (success, percent) = structure.get_int('percent')
354 if self.progress_percentage != percent and success:
355 self.progress_percentage = percent
356 if self._progress_callback:
357 self._progress_callback(percent)
358 _log.info('{percent}% done...'.format(percent=percent))
359 elif message.type == Gst.MessageType.ERROR:
360 _log.error('Got error: {0}'.format(message.parse_error()))
361 self.dst_data = None
362 self.__stop()
363
364 def __stop(self):
365 _log.debug(self.loop)
366
367 if hasattr(self, 'pipeline'):
368 # Stop executing the pipeline
369 self.pipeline.set_state(Gst.State.NULL)
370
371 # This kills the loop, mercifully
372 GObject.idle_add(self.__stop_mainloop)
373
374 def __stop_mainloop(self):
375 '''
376 Wrapper for GObject.MainLoop.quit()
377
378 This wrapper makes us able to see if self.loop.quit has been called
379 '''
380 _log.info('Terminating MainLoop')
381
382 self.loop.quit()
383
384
385 if __name__ == '__main__':
386 os.nice(19)
387 from optparse import OptionParser
388
389 parser = OptionParser(
390 usage='%prog [-v] -a [ video | thumbnail | discover ] SRC [ DEST ]')
391
392 parser.add_option('-a', '--action',
393 dest='action',
394 help='One of "video", "discover" or "thumbnail"')
395
396 parser.add_option('-v',
397 dest='verbose',
398 action='store_true',
399 help='Output debug information')
400
401 parser.add_option('-q',
402 dest='quiet',
403 action='store_true',
404 help='Dear program, please be quiet unless *error*')
405
406 parser.add_option('-w', '--width',
407 type=int,
408 default=180)
409
410 (options, args) = parser.parse_args()
411
412 if options.verbose:
413 _log.setLevel(logging.DEBUG)
414 else:
415 _log.setLevel(logging.INFO)
416
417 if options.quiet:
418 _log.setLevel(logging.ERROR)
419
420 _log.debug(args)
421
422 if not len(args) == 2 and not options.action == 'discover':
423 parser.print_help()
424 sys.exit()
425
426 transcoder = VideoTranscoder()
427
428 if options.action == 'thumbnail':
429 args.append(options.width)
430 VideoThumbnailerMarkII(*args)
431 elif options.action == 'video':
432 def cb(data):
433 print('I\'m a callback!')
434 transcoder.transcode(*args, progress_callback=cb)
435 elif options.action == 'discover':
436 print transcoder.discover(*args)