1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
24 from time
import localtime
, strftime
27 from gi
.repository
import Gst
28 from gi
.repository
import GstVideo
30 # Pathname has to be defined
32 AUDIO_DEFAULT
= PATHNAME
+ 'AUDIO_DEFAULT'
33 RAWVIDEO_DEFAULT
= PATHNAME
+ 'RAWVIDEO_DEFAULT'
34 STREAM_DEFAULT
= PATHNAME
+ 'STREAM_DEFAULT'
35 BACKUP_SUFFIX
= '_BACKUP'
36 FAILED_SUFFIX
= '_FAILED_'
38 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
39 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
40 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
45 AUDIO_INPUT
= 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
46 AUDIO_OUTPUT
= 'alsa_output.pci-0000_00_1b.0.analog-stereo'
49 class New_user_pipeline():
52 def __init__(self
, rtsp_address
, feed
='main'):
53 self
.rtsp_address
= 'rtsp://' + rtsp_address
55 self
.user_pipeline
= self
.create_gstreamer_pipeline()
57 def create_video_sources(self
):
58 """Create video inputs from various sources."""
59 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
60 self
.videosrc
.set_property('location', self
.rtsp_address
)
61 ## self.videosrc.set_property('location', 'rtsp://192.168.48.2:554')
62 self
.videosrc
.set_property('latency', 0)
63 ## self.videosrc.set_property('debug', True)
64 if self
.feed
== 'backup':
65 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
67 device_location
= self
.find_webcam_device()
68 self
.videosrc_backup
.set_property('device', device_location
)
70 def find_webcam_device(self
):
71 """Look out for the USB webcam device."""
72 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
74 # In case of computer having a built-in webcam
75 if item
!= 'video0' and len(devices
) > 1:
77 # Without built-in webcam
78 elif len(devices
) == 1:
80 print(ERROR
, gettime(), 'No webcam device found.')
82 def find_mixingdesk_device(self
):
83 """Look out for the USB mixing desk device.
84 Product used here: Behringer XENYX Q1002USB.
86 # shell cmd : 'pactl list | grep alsa_input'
87 # AUDIO_INPUT --> const used currently
90 def create_pipeline_callbacks(self
):
91 """Callbacks to connect dynamically created pads."""
92 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
94 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
95 """Connect the dynamic 'src'pad of an RTSP source."""
96 sinkpad
= self
.queuev_1
.get_static_pad('sink')
99 def create_audio_sources(self
):
100 """Create audio inputs from various sources."""
101 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
102 self
.audiosrc
.set_property('device', AUDIO_INPUT
)
104 def create_audiolevel_plugin(self
):
105 """Create audio level plugin to feed a vu-meter."""
106 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
107 self
.audiolevel
.set_property('interval', 200000000)
109 def create_filesink(self
):
110 """Create storable output elements."""
111 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
112 #[TO DO]: File location has to be defined
113 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
114 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
115 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
116 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
117 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
118 if self
.feed
== 'backup':
119 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
120 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
121 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
123 def create_streamsink(self
):
124 """Create streamable output elements."""
126 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
127 self
.screensink
.set_property('sync', False)
128 # To local audio output (headphones):
129 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
130 self
.audiosink
.set_property('device', AUDIO_OUTPUT
)
131 self
.audiosink
.set_property('sync', False)
133 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
134 self
.icecastsink_audio
.set_property('sync', False)
135 ## Configuration should be written on a file locally to keep safe private addresses
136 self
.icecastsink_audio
.set_property('ip', 'live2.fsf.org')
137 self
.icecastsink_audio
.set_property('port', 80)
138 self
.icecastsink_audio
.set_property('mount', 'testaudio.ogg')
139 self
.icecastsink_audio
.set_property('password', 'thahw3Wiez')
140 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
141 self
.icecastsink_stream
.set_property('sync', False)
142 self
.icecastsink_stream
.set_property('ip', 'live2.fsf.org')
143 self
.icecastsink_stream
.set_property('port', 80)
144 self
.icecastsink_stream
.set_property('mount', 'teststream.webm')
145 self
.icecastsink_stream
.set_property('password', 'thahw3Wiez')
147 def create_payloader_elements(self
):
150 def create_depayloader_elements(self
):
151 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
153 def create_encoder_elements(self
):
155 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
157 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
158 self
.vp8enc
.set_property('min_quantizer', 1)
159 self
.vp8enc
.set_property('max_quantizer', 13)
160 self
.vp8enc
.set_property('cpu-used', 5)
161 self
.vp8enc
.set_property('deadline', 42000)
162 self
.vp8enc
.set_property('threads', 2)
163 self
.vp8enc
.set_property('sharpness', 7)
165 def create_decoder_elements(self
):
166 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
167 self
.jpegdec
.set_property('max-errors', -1)
169 def create_muxer_elements(self
):
170 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
171 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
172 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
173 self
.webmmux
.set_property('streamable', True)
175 def create_demuxer_elements(self
):
178 def create_filtering_elements(self
):
179 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
180 caps
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
181 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
182 self
.capsfilter
.set_property('caps', caps
)
184 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
185 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
186 self
.capsfilter_backup
.set_property('caps', caps_backup
)
188 def create_tee_elements(self
):
189 """Create tee elements to divide feeds."""
190 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
191 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
192 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
193 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
194 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
196 def connect_tee(self
,
201 output_element_3
=None,):
202 """Links input and outputs of a given tee element."""
203 # Find a way to check if the element given are in the pipeline
204 # then pass the result to the 'if' statement.
205 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
206 ## print('[DEBUG] ArgList check: ', argcheck)
207 ## if False not in argcheck
209 input_element
.link(tee_element
)
210 tee_element
.link(output_element_1
)
211 tee_element
.link(output_element_2
)
213 tee_element
.link(output_element_3
)
217 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
219 def create_queues(self
):
221 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
222 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
223 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
224 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
225 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
226 self
.queuev_6
= Gst
.ElementFactory
.make('queue', 'queuev_6')
228 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
229 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
230 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
231 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
232 self
.queuea_4
.set_property('leaky', 2)
233 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
234 # For audio+video muxer:
235 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
236 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
237 self
.queuem_2
.set_property('leaky', 2)
239 def create_pipeline_elements(self
):
240 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
242 self
.create_video_sources()
243 self
.create_audio_sources()
245 self
.create_audiolevel_plugin()
246 self
.create_payloader_elements()
247 self
.create_depayloader_elements()
248 self
.create_encoder_elements()
249 self
.create_decoder_elements()
250 self
.create_muxer_elements()
251 self
.create_filtering_elements()
252 self
.create_tee_elements()
255 self
.create_filesink()
256 self
.create_streamsink()
257 if self
.feed
== 'test':
258 print('TEST OK...', end
='')
260 if self
.feed
== 'backup':
263 'Webcam device location: ',
264 self
.videosrc_backup
.get_property('device'))
267 def add_elements_to_pipeline(self
):
268 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
269 cond
= self
.feed
!= 'test'
272 self
.streampipe
.add(self
.audiosrc
)
274 self
.streampipe
.add(self
.audiolevel
)
275 self
.streampipe
.add(self
.queuea_1
)
276 self
.streampipe
.add(self
.queuev_3
)
278 self
.streampipe
.add(self
.vorbisenc
)
279 self
.streampipe
.add(self
.oggmux
)
280 self
.streampipe
.add(self
.queuea_2
)
281 self
.streampipe
.add(self
.queuea_3
)
282 self
.streampipe
.add(self
.vp8enc
)
283 self
.streampipe
.add(self
.mkvmux
)
284 self
.streampipe
.add(self
.webmmux
)
285 self
.streampipe
.add(self
.tee_rawaudio
)
286 self
.streampipe
.add(self
.tee_rawvideo
)
287 self
.streampipe
.add(self
.tee_streamaudio
)
288 self
.streampipe
.add(self
.tee_streamfull
)
289 self
.streampipe
.add(self
.queuev_2
)
290 self
.streampipe
.add(self
.queuev_4
)
291 self
.streampipe
.add(self
.queuev_5
)
292 self
.streampipe
.add(self
.queuea_4
)
293 self
.streampipe
.add(self
.queuea_5
)
294 self
.streampipe
.add(self
.queuem_1
)
295 self
.streampipe
.add(self
.queuem_2
)
297 self
.streampipe
.add(self
.screensink
)
299 self
.streampipe
.add(self
.disksink_rawvideo
)
300 self
.streampipe
.add(self
.disksink_audio
)
301 self
.streampipe
.add(self
.disksink_stream
)
302 self
.streampipe
.add(self
.icecastsink_audio
)
303 self
.streampipe
.add(self
.icecastsink_stream
)
305 self
.streampipe
.add(self
.audiosink
)
307 if self
.feed
== 'main' or self
.feed
== 'test':
309 self
.streampipe
.add(self
.videosrc
)
311 self
.streampipe
.add(self
.rtpjpegdepay
)
312 self
.streampipe
.add(self
.jpegdec
)
313 self
.streampipe
.add(self
.scaling
)
314 self
.streampipe
.add(self
.capsfilter
)
315 self
.streampipe
.add(self
.tee_videodecoded
)
316 self
.streampipe
.add(self
.queuev_1
)
317 if self
.feed
== 'test':
318 print ('TEST OK...', end
='')
319 elif self
.feed
== 'backup':
321 self
.streampipe
.add(self
.videosrc_backup
)
323 self
.streampipe
.add(self
.capsfilter_backup
)
324 print ('BACKUP OK...', end
='')
327 def link_pipeline_elements(self
):
328 """Link all elements with static pads."""
329 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
330 cond
= self
.feed
!= 'test'
333 self
.audiosrc
.link(self
.audiolevel
)
334 self
.audiolevel
.link(self
.queuea_1
)
336 self
.queuea_1
.link(self
.vorbisenc
)
337 self
.connect_tee(self
.tee_rawaudio
,
341 self
.queuea_2
.link(self
.oggmux
)
342 self
.connect_tee(self
.tee_streamaudio
,
346 self
.queuea_3
.link(self
.disksink_audio
)
347 self
.queuea_4
.link(self
.icecastsink_audio
)
348 self
.queuea_5
.link(self
.webmmux
)
350 self
.queuea_1
.link(self
.audiosink
)
354 self
.queuev_2
.link(self
.mkvmux
)
355 self
.mkvmux
.link(self
.queuev_4
)
356 self
.queuev_4
.link(self
.disksink_rawvideo
)
358 self
.queuev_1
.link(self
.rtpjpegdepay
)
359 self
.rtpjpegdepay
.link(self
.jpegdec
)
360 self
.jpegdec
.link(self
.queuev_3
)
361 self
.queuev_3
.link(self
.screensink
)
363 # Stream (audio+video) feed:
365 self
.vp8enc
.link(self
.queuev_5
)
366 self
.queuev_5
.link(self
.webmmux
)
367 self
.connect_tee(self
.tee_streamfull
,
371 self
.queuem_1
.link(self
.disksink_stream
)
372 self
.queuem_2
.link(self
.icecastsink_stream
)
373 if self
.feed
== 'main':
374 # linking here RTSP feed
375 self
.queuev_1
.link(self
.rtpjpegdepay
)
376 self
.connect_tee(self
.tee_rawvideo
,
380 self
.connect_tee(self
.tee_videodecoded
,
384 # Stream (video) feed:
385 self
.scaling
.link(self
.capsfilter
)
386 self
.capsfilter
.link(self
.vp8enc
)
387 elif self
.feed
== 'backup':
388 # linking here backup feed (WEBCAM)
389 self
.videosrc_backup
.link(self
.capsfilter_backup
)
390 self
.connect_tee(self
.tee_rawvideo
,
391 self
.capsfilter_backup
,
394 output_element_3
=self
.vp8enc
)
395 ## self.capsfilter_backup.link(self.queuev_3)
396 print('BACKUP OK...', end
='')
398 print('TEST OK...', end
='')
401 def create_gstreamer_pipeline(self
):
402 # New empty pipeline:
403 self
.streampipe
= Gst
.Pipeline()
404 self
.create_pipeline_elements()
406 self
.add_elements_to_pipeline()
407 self
.link_pipeline_elements()
408 if self
.feed
== 'main' or self
.feed
== 'test':
409 self
.create_pipeline_callbacks()
412 bus
= self
.streampipe
.get_bus()
413 bus
.add_signal_watch()
414 bus
.enable_sync_message_emission()
415 # Used to get messages that GStreamer emits.
416 bus
.connect("message", self
.on_message
)
418 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
419 return self
.streampipe
421 def on_message(self
, bus
, message
):
423 if t
== Gst
.MessageType
.EOS
:
424 self
.streampipe
.set_state(Gst
.State
.NULL
)
425 elif t
== Gst
.MessageType
.ERROR
:
426 err
, debug
= message
.parse_error()
427 print (ERROR
, '%s' % err
, debug
)
429 def stream_play(self
):
430 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
431 if self
.feed
== 'backup':
432 print(WARN
, gettime(), 'Backup pipeline started.')
433 print(INFO
, gettime(), 'PLAYING State resquested')
435 def stream_stop(self
):
436 self
.streampipe
.set_state(Gst
.State
.NULL
)
437 print(INFO
, gettime(), 'STOPPED State resquested')
439 def set_filenames(self
, string
, streamfailed
=False):
440 """Sets filename and location for each sink."""
443 audio
= PATHNAME
+ filename
+ '_AUDIO'
444 rawvideo
= PATHNAME
+ filename
+ '_RAWVIDEO'
445 stream
= PATHNAME
+ filename
+ '_STREAM'
446 print('FEED STATE: ', self
.feed
)
447 if self
.feed
== 'main':
448 if streamfailed
and filename
:
449 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
450 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
451 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
452 rename(AUDIO_DEFAULT
, audio
)
453 rename(RAWVIDEO_DEFAULT
, rawvideo
)
454 rename(STREAM_DEFAULT
, stream
)
457 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
458 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
459 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
460 rename(AUDIO_DEFAULT
, audio
)
461 rename(RAWVIDEO_DEFAULT
, rawvideo
)
462 rename(STREAM_DEFAULT
, stream
)
465 rename(AUDIO_DEFAULT
, audio
)
466 rename(RAWVIDEO_DEFAULT
, rawvideo
)
467 rename(STREAM_DEFAULT
, stream
)
468 elif self
.feed
== 'backup':
469 print('INSIDE BACKUP RENAMING')
470 rename(AUDIO_BACKUP
, audio
)
471 rename(RAWVIDEO_BACKUP
, rawvideo
)
472 rename(STREAM_BACKUP
, stream
)
477 def get_gstreamer_bus():
481 return strftime('%y-%m-%d_%H:%M:%S ', localtime())