1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
25 from time
import localtime
, strftime
29 from gi
.repository
import Gst
30 from gi
.repository
import GstVideo
32 # Pathname has to be defined
33 AUDIO_DEFAULT
= 'AUDIO_DEFAULT'
34 RAWVIDEO_DEFAULT
= 'RAWVIDEO_DEFAULT'
35 STREAM_DEFAULT
= 'STREAM_DEFAULT'
36 BACKUP_SUFFIX
= '_BACKUP'
37 FAILED_SUFFIX
= '_FAILED_'
39 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
40 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
41 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
47 sources
= {'RTSP_IP' : None,
48 'AUDIO_INPUT' : None,}
49 sinks
= {'AUDIO_OUTPUT' : None,
51 'STREAM_SERVER_IP' : None,
55 'VIDEO_MOUNT' : None,}
57 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
58 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
60 config
= configparser
.RawConfigParser()
61 if path
.exists(CONFIG
):
64 sources
= {key
: config
.get('sources', key
) for key
in sources
}
65 sinks
= {key
: config
.get('sinks', key
) for key
in sinks
}
67 print(ERROR
, gettime(), 'Failed to parse config file.')
69 print(ERROR
, gettime(), '".abyss" config file doesn\'t exist.')
72 class New_user_pipeline():
75 def __init__(self
, feed
='main'):
76 self
.rtsp_address
= 'rtsp://' + sources
['RTSP_IP']
78 self
.user_pipeline
= self
.create_gstreamer_pipeline()
80 def create_video_sources(self
):
81 """Create video inputs from various sources."""
82 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
83 self
.videosrc
.set_property('location', self
.rtsp_address
)
84 self
.videosrc
.set_property('latency', 0)
85 ## self.videosrc.set_property('debug', True)
86 if self
.feed
== 'backup':
87 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
89 device_location
= self
.find_webcam_device()
90 self
.videosrc_backup
.set_property('device', device_location
)
92 def find_webcam_device(self
):
93 """Look out for the USB webcam device."""
94 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
96 # In case of computer having a built-in webcam
97 if item
!= 'video0' and len(devices
) > 1:
99 # Without built-in webcam
100 elif len(devices
) == 1:
102 print(ERROR
, gettime(), 'No webcam device found.')
104 def find_mixingdesk_device(self
):
105 """Look out for the USB mixing desk device.
106 Product used here: Behringer XENYX Q1002USB.
108 # shell cmd : 'pactl list | grep alsa_input'
109 # AUDIO_INPUT --> const used currently
112 def create_pipeline_callbacks(self
):
113 """Callbacks to connect dynamically created pads."""
114 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
116 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
117 """Connect the dynamic 'src'pad of an RTSP source."""
118 sinkpad
= self
.queuev_1
.get_static_pad('sink')
121 def create_audio_sources(self
):
122 """Create audio inputs from various sources."""
123 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
124 self
.audiosrc
.set_property('device', sources
['AUDIO_INPUT'])
126 def create_audiolevel_plugin(self
):
127 """Create audio level plugin to feed a vu-meter."""
128 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
129 self
.audiolevel
.set_property('interval', 200000000)
131 def create_filesink(self
):
132 """Create storable output elements."""
133 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
134 #[TO DO]: File location has to be defined
135 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
136 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
137 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
138 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
139 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
140 if self
.feed
== 'backup':
141 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
142 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
143 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
145 def create_streamsink(self
):
146 """Create streamable output elements."""
148 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
149 self
.screensink
.set_property('sync', False)
150 # To local audio output (headphones):
151 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
152 self
.audiosink
.set_property('device', sinks
['AUDIO_OUTPUT'])
153 self
.audiosink
.set_property('sync', False)
155 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
156 self
.icecastsink_audio
.set_property('sync', False)
157 self
.icecastsink_audio
.set_property('ip', sinks
['STREAM_SERVER_IP']) #'live2.fsf.org')
158 self
.icecastsink_audio
.set_property('port', int(sinks
['SERVER_PORT']))
159 self
.icecastsink_audio
.set_property('mount', sinks
['AUDIO_MOUNT'])# 'testaudio.ogg')
160 self
.icecastsink_audio
.set_property('password', sinks
['PASSWORD'])#'thahw3Wiez')
161 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
162 self
.icecastsink_stream
.set_property('sync', False)
163 self
.icecastsink_stream
.set_property('ip', sinks
['STREAM_SERVER_IP'])#'live2.fsf.org')
164 self
.icecastsink_stream
.set_property('port', int(sinks
['SERVER_PORT']))#80)
165 self
.icecastsink_stream
.set_property('mount', sinks
['VIDEO_MOUNT'])#'teststream.webm')
166 self
.icecastsink_stream
.set_property('password', sinks
['PASSWORD'])#'thahw3Wiez')
168 def create_payloader_elements(self
):
171 def create_depayloader_elements(self
):
172 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
174 def create_encoder_elements(self
):
176 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
178 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
179 self
.vp8enc
.set_property('min_quantizer', 1)
180 self
.vp8enc
.set_property('max_quantizer', 13)
181 self
.vp8enc
.set_property('cpu-used', 5)
182 self
.vp8enc
.set_property('deadline', 42000)
183 self
.vp8enc
.set_property('threads', 2)
184 self
.vp8enc
.set_property('sharpness', 7)
186 def create_decoder_elements(self
):
187 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
188 self
.jpegdec
.set_property('max-errors', -1)
190 def create_muxer_elements(self
):
191 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
192 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
193 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
194 self
.webmmux
.set_property('streamable', True)
196 def create_demuxer_elements(self
):
199 def create_filtering_elements(self
):
200 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
201 caps
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
202 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
203 self
.capsfilter
.set_property('caps', caps
)
205 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
206 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
207 self
.capsfilter_backup
.set_property('caps', caps_backup
)
209 def create_tee_elements(self
):
210 """Create tee elements to divide feeds."""
211 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
212 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
213 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
214 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
215 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
217 def connect_tee(self
,
222 output_element_3
=None,):
223 """Links input and outputs of a given tee element."""
224 # Find a way to check if the element given are in the pipeline
225 # then pass the result to the 'if' statement.
226 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
227 ## print('[DEBUG] ArgList check: ', argcheck)
228 ## if False not in argcheck
230 input_element
.link(tee_element
)
231 tee_element
.link(output_element_1
)
232 tee_element
.link(output_element_2
)
234 tee_element
.link(output_element_3
)
238 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
240 def create_queues(self
):
242 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
243 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
244 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
245 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
246 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
247 self
.queuev_6
= Gst
.ElementFactory
.make('queue', 'queuev_6')
249 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
250 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
251 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
252 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
253 self
.queuea_4
.set_property('leaky', 2)
254 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
255 # For audio+video muxer:
256 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
257 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
258 self
.queuem_2
.set_property('leaky', 2)
260 def create_pipeline_elements(self
):
261 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
263 self
.create_video_sources()
264 self
.create_audio_sources()
266 self
.create_audiolevel_plugin()
267 self
.create_payloader_elements()
268 self
.create_depayloader_elements()
269 self
.create_encoder_elements()
270 self
.create_decoder_elements()
271 self
.create_muxer_elements()
272 self
.create_filtering_elements()
273 self
.create_tee_elements()
276 self
.create_filesink()
277 self
.create_streamsink()
278 if self
.feed
== 'test':
279 print('TEST OK...', end
='')
281 if self
.feed
== 'backup':
284 'Webcam device location: ',
285 self
.videosrc_backup
.get_property('device'))
288 def add_elements_to_pipeline(self
):
289 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
290 cond
= self
.feed
!= 'test'
293 self
.streampipe
.add(self
.audiosrc
)
295 self
.streampipe
.add(self
.audiolevel
)
296 self
.streampipe
.add(self
.queuea_1
)
297 self
.streampipe
.add(self
.queuev_3
)
299 self
.streampipe
.add(self
.vorbisenc
)
300 self
.streampipe
.add(self
.oggmux
)
301 self
.streampipe
.add(self
.queuea_2
)
302 self
.streampipe
.add(self
.queuea_3
)
303 self
.streampipe
.add(self
.vp8enc
)
304 self
.streampipe
.add(self
.mkvmux
)
305 self
.streampipe
.add(self
.webmmux
)
306 self
.streampipe
.add(self
.tee_rawaudio
)
307 self
.streampipe
.add(self
.tee_rawvideo
)
308 self
.streampipe
.add(self
.tee_streamaudio
)
309 self
.streampipe
.add(self
.tee_streamfull
)
310 self
.streampipe
.add(self
.queuev_2
)
311 self
.streampipe
.add(self
.queuev_4
)
312 self
.streampipe
.add(self
.queuev_5
)
313 ## self.streampipe.add(self.queuev_6)
314 self
.streampipe
.add(self
.queuea_4
)
315 self
.streampipe
.add(self
.queuea_5
)
316 self
.streampipe
.add(self
.queuem_1
)
317 self
.streampipe
.add(self
.queuem_2
)
319 self
.streampipe
.add(self
.screensink
)
321 self
.streampipe
.add(self
.disksink_rawvideo
)
322 self
.streampipe
.add(self
.disksink_audio
)
323 self
.streampipe
.add(self
.disksink_stream
)
324 self
.streampipe
.add(self
.icecastsink_audio
)
325 self
.streampipe
.add(self
.icecastsink_stream
)
327 self
.streampipe
.add(self
.audiosink
)
329 if self
.feed
== 'main' or self
.feed
== 'test':
331 self
.streampipe
.add(self
.videosrc
)
333 self
.streampipe
.add(self
.rtpjpegdepay
)
334 self
.streampipe
.add(self
.jpegdec
)
335 self
.streampipe
.add(self
.scaling
)
336 self
.streampipe
.add(self
.capsfilter
)
337 self
.streampipe
.add(self
.tee_videodecoded
)
338 self
.streampipe
.add(self
.queuev_1
)
339 if self
.feed
== 'test':
340 print ('TEST OK...', end
='')
341 elif self
.feed
== 'backup':
343 self
.streampipe
.add(self
.videosrc_backup
)
345 self
.streampipe
.add(self
.capsfilter_backup
)
346 print ('BACKUP OK...', end
='')
349 def link_pipeline_elements(self
):
350 """Link all elements with static pads."""
351 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
352 cond
= self
.feed
!= 'test'
355 self
.audiosrc
.link(self
.audiolevel
)
356 self
.audiolevel
.link(self
.queuea_1
)
358 self
.queuea_1
.link(self
.vorbisenc
)
359 self
.connect_tee(self
.tee_rawaudio
,
363 self
.queuea_2
.link(self
.oggmux
)
364 self
.connect_tee(self
.tee_streamaudio
,
368 self
.queuea_3
.link(self
.disksink_audio
)
369 self
.queuea_4
.link(self
.icecastsink_audio
)
370 self
.queuea_5
.link(self
.webmmux
)
372 self
.queuea_1
.link(self
.audiosink
)
376 self
.queuev_2
.link(self
.mkvmux
)
377 self
.mkvmux
.link(self
.queuev_4
)
378 self
.queuev_4
.link(self
.disksink_rawvideo
)
380 self
.queuev_1
.link(self
.rtpjpegdepay
)
381 self
.rtpjpegdepay
.link(self
.jpegdec
)
382 self
.jpegdec
.link(self
.queuev_3
)
383 self
.queuev_3
.link(self
.screensink
)
385 # Stream (audio+video) feed:
387 self
.vp8enc
.link(self
.queuev_5
)
388 self
.queuev_5
.link(self
.webmmux
)
389 self
.connect_tee(self
.tee_streamfull
,
393 self
.queuem_1
.link(self
.disksink_stream
)
394 self
.queuem_2
.link(self
.icecastsink_stream
)
395 if self
.feed
== 'main':
396 # linking here RTSP feed
397 self
.queuev_1
.link(self
.rtpjpegdepay
)
398 self
.connect_tee(self
.tee_rawvideo
,
403 ## self.queuev_6.link(self.jpegdec)
404 self
.connect_tee(self
.tee_videodecoded
,
408 # Stream (video) feed:
409 self
.scaling
.link(self
.capsfilter
)
410 self
.capsfilter
.link(self
.vp8enc
)
411 elif self
.feed
== 'backup':
412 # linking here backup feed (WEBCAM)
413 self
.videosrc_backup
.link(self
.capsfilter_backup
)
414 self
.connect_tee(self
.tee_rawvideo
,
415 self
.capsfilter_backup
,
418 output_element_3
=self
.vp8enc
)
419 ## self.capsfilter_backup.link(self.queuev_3)
420 print('BACKUP OK...', end
='')
422 print('TEST OK...', end
='')
425 def create_gstreamer_pipeline(self
):
426 # New empty pipeline:
427 self
.streampipe
= Gst
.Pipeline()
428 self
.create_pipeline_elements()
430 self
.add_elements_to_pipeline()
431 self
.link_pipeline_elements()
432 if self
.feed
== 'main' or self
.feed
== 'test':
433 self
.create_pipeline_callbacks()
436 bus
= self
.streampipe
.get_bus()
437 bus
.add_signal_watch()
438 bus
.enable_sync_message_emission()
439 # Used to get messages that GStreamer emits.
440 bus
.connect("message", self
.on_message
)
442 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
443 return self
.streampipe
445 def on_message(self
, bus
, message
):
447 if t
== Gst
.MessageType
.EOS
:
448 self
.streampipe
.set_state(Gst
.State
.NULL
)
449 elif t
== Gst
.MessageType
.ERROR
:
450 err
, debug
= message
.parse_error()
451 print (ERROR
, '%s' % err
, debug
)
453 def stream_play(self
):
454 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
455 if self
.feed
== 'backup':
456 print(WARN
, gettime(), 'Backup pipeline started.')
457 print(INFO
, gettime(), 'PLAYING State resquested')
459 def stream_stop(self
):
460 self
.streampipe
.set_state(Gst
.State
.NULL
)
461 print(INFO
, gettime(), 'STOPPED State resquested')
463 def set_filenames(self
, string
, streamfailed
=False):
464 """Sets filename and location for each sink."""
467 audio
= sources
['DIR'] + filename
+ '_AUDIO'
468 rawvideo
= sources
['DIR'] + filename
+ '_RAWVIDEO'
469 stream
= sources
['DIR'] + filename
+ '_STREAM'
470 print('FEED STATE: ', self
.feed
)
471 if self
.feed
== 'main':
472 if streamfailed
and filename
:
473 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
474 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
475 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
476 rename(AUDIO_DEFAULT
, audio
)
477 rename(RAWVIDEO_DEFAULT
, rawvideo
)
478 rename(STREAM_DEFAULT
, stream
)
481 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
482 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
483 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
484 rename(AUDIO_DEFAULT
, audio
)
485 rename(RAWVIDEO_DEFAULT
, rawvideo
)
486 rename(STREAM_DEFAULT
, stream
)
489 rename(AUDIO_DEFAULT
, audio
)
490 rename(RAWVIDEO_DEFAULT
, rawvideo
)
491 rename(STREAM_DEFAULT
, stream
)
492 elif self
.feed
== 'backup':
493 print('INSIDE BACKUP RENAMING')
494 rename(AUDIO_BACKUP
, audio
)
495 rename(RAWVIDEO_BACKUP
, rawvideo
)
496 rename(STREAM_BACKUP
, stream
)
501 def get_gstreamer_bus():
505 return strftime('%y-%m-%d_%H:%M:%S ', localtime())