1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
25 from time
import localtime
, strftime
29 from gi
.repository
import Gst
30 from gi
.repository
import GstVideo
32 # Pathname has to be defined
33 AUDIO_DEFAULT
= 'AUDIO_DEFAULT'
34 RAWVIDEO_DEFAULT
= 'RAWVIDEO_DEFAULT'
35 STREAM_DEFAULT
= 'STREAM_DEFAULT'
36 BACKUP_SUFFIX
= '_BACKUP'
37 FAILED_SUFFIX
= '_FAILED_'
39 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
40 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
41 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
47 sources
= {'RTSP_IP' : None,
48 'AUDIO_INPUT' : None,}
49 sinks
= {'AUDIO_OUTPUT' : None,
51 'STREAM_SERVER_IP' : None,
55 'VIDEO_MOUNT' : None,}
57 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
58 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
60 config
= configparser
.RawConfigParser()
61 if path
.exists(CONFIG
):
64 sources
= {key
: config
.get('sources', key
) for key
in sources
}
65 sinks
= {key
: config
.get('sinks', key
) for key
in sinks
}
67 print(ERROR
, gettime(), 'Failed to parse config file.')
69 print(ERROR
, gettime(), '".abyss" config file doesn\'t exist.')
72 class New_user_pipeline():
75 def __init__(self
, feed
='main'):
76 self
.rtsp_address
= 'rtsp://' + sources
['RTSP_IP']
78 self
.user_pipeline
= self
.create_gstreamer_pipeline()
80 def create_video_sources(self
):
81 """Create video inputs from various sources."""
82 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
83 self
.videosrc
.set_property('location', self
.rtsp_address
)
84 self
.videosrc
.set_property('latency', 0)
85 ## self.videosrc.set_property('debug', True)
86 if self
.feed
== 'backup':
87 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
89 device_location
= self
.find_webcam_device()
90 self
.videosrc_backup
.set_property('device', device_location
)
92 def find_webcam_device(self
):
93 """Look out for the USB webcam device."""
94 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
96 # In case of computer having a built-in webcam
97 if item
!= 'video0' and len(devices
) > 1:
99 # Without built-in webcam
100 elif len(devices
) == 1:
102 print(ERROR
, gettime(), 'No webcam device found.')
104 def find_mixingdesk_device(self
):
105 """Look out for the USB mixing desk device.
106 Product used here: Behringer XENYX Q1002USB.
108 # shell cmd : 'pactl list | grep alsa_input'
109 # AUDIO_INPUT --> const used currently
112 def create_pipeline_callbacks(self
):
113 """Callbacks to connect dynamically created pads."""
114 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
116 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
117 """Connect the dynamic 'src'pad of an RTSP source."""
118 sinkpad
= self
.queuev_1
.get_static_pad('sink')
121 def create_audio_sources(self
):
122 """Create audio inputs from various sources."""
123 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
124 self
.audiosrc
.set_property('device', sources
['AUDIO_INPUT'])
126 def create_audiolevel_plugin(self
):
127 """Create audio level plugin to feed a vu-meter."""
128 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
129 self
.audiolevel
.set_property('interval', 200000000)
131 def create_filesink(self
):
132 """Create storable output elements."""
133 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
134 #[TO DO]: File location has to be defined
135 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
136 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
137 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
138 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
139 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
140 if self
.feed
== 'backup':
141 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
142 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
143 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
145 def create_streamsink(self
):
146 """Create streamable output elements."""
148 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
149 self
.screensink
.set_property('sync', False)
150 # To local audio output (headphones):
151 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
152 self
.audiosink
.set_property('device', sinks
['AUDIO_OUTPUT'])
153 self
.audiosink
.set_property('sync', False)
155 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
156 self
.icecastsink_audio
.set_property('sync', False)
157 self
.icecastsink_audio
.set_property('ip', sinks
['STREAM_SERVER_IP']) #'live2.fsf.org')
158 self
.icecastsink_audio
.set_property('port', int(sinks
['SERVER_PORT']))
159 self
.icecastsink_audio
.set_property('mount', sinks
['AUDIO_MOUNT'])# 'testaudio.ogg')
160 self
.icecastsink_audio
.set_property('password', sinks
['PASSWORD'])#'thahw3Wiez')
161 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
162 self
.icecastsink_stream
.set_property('sync', False)
163 self
.icecastsink_stream
.set_property('ip', sinks
['STREAM_SERVER_IP'])#'live2.fsf.org')
164 self
.icecastsink_stream
.set_property('port', int(sinks
['SERVER_PORT']))#80)
165 self
.icecastsink_stream
.set_property('mount', sinks
['VIDEO_MOUNT'])#'teststream.webm')
166 self
.icecastsink_stream
.set_property('password', sinks
['PASSWORD'])#'thahw3Wiez')
168 def create_payloader_elements(self
):
171 def create_depayloader_elements(self
):
172 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
174 def create_encoder_elements(self
):
176 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
178 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
179 self
.vp8enc
.set_property('min_quantizer', 1)
180 self
.vp8enc
.set_property('max_quantizer', 13)
181 self
.vp8enc
.set_property('cpu-used', 5)
182 self
.vp8enc
.set_property('deadline', 42000)
183 self
.vp8enc
.set_property('threads', 2)
184 self
.vp8enc
.set_property('sharpness', 7)
186 def create_decoder_elements(self
):
187 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
188 self
.jpegdec
.set_property('max-errors', -1)
190 def create_muxer_elements(self
):
191 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
192 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
193 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
194 self
.webmmux
.set_property('streamable', True)
196 def create_demuxer_elements(self
):
199 def create_filtering_elements(self
):
200 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
201 caps
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
202 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
203 self
.capsfilter
.set_property('caps', caps
)
205 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
206 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
207 self
.capsfilter_backup
.set_property('caps', caps_backup
)
209 def create_tee_elements(self
):
210 """Create tee elements to divide feeds."""
211 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
212 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
213 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
214 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
215 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
217 def connect_tee(self
,
222 output_element_3
=None,):
223 """Links input and outputs of a given tee element."""
224 # Find a way to check if the element given are in the pipeline
225 # then pass the result to the 'if' statement.
226 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
227 ## print('[DEBUG] ArgList check: ', argcheck)
228 ## if False not in argcheck
230 input_element
.link(tee_element
)
231 tee_element
.link(output_element_1
)
232 tee_element
.link(output_element_2
)
234 tee_element
.link(output_element_3
)
238 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
240 def create_queues(self
):
242 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
243 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
244 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
245 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
246 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
248 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
249 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
250 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
251 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
252 self
.queuea_4
.set_property('leaky', 2)
253 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
254 # For audio+video muxer:
255 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
256 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
257 self
.queuem_2
.set_property('leaky', 2)
259 def create_pipeline_elements(self
):
260 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
262 self
.create_video_sources()
263 self
.create_audio_sources()
265 self
.create_audiolevel_plugin()
266 self
.create_payloader_elements()
267 self
.create_depayloader_elements()
268 self
.create_encoder_elements()
269 self
.create_decoder_elements()
270 self
.create_muxer_elements()
271 self
.create_filtering_elements()
272 self
.create_tee_elements()
275 self
.create_filesink()
276 self
.create_streamsink()
277 if self
.feed
== 'test':
278 print('TEST OK...', end
='')
280 if self
.feed
== 'backup':
283 'Webcam device location: ',
284 self
.videosrc_backup
.get_property('device'))
287 def add_elements_to_pipeline(self
):
288 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
289 cond
= self
.feed
!= 'test'
292 self
.streampipe
.add(self
.audiosrc
)
294 self
.streampipe
.add(self
.audiolevel
)
295 self
.streampipe
.add(self
.queuea_1
)
296 self
.streampipe
.add(self
.queuev_3
)
298 self
.streampipe
.add(self
.vorbisenc
)
299 self
.streampipe
.add(self
.oggmux
)
300 self
.streampipe
.add(self
.queuea_2
)
301 self
.streampipe
.add(self
.queuea_3
)
302 self
.streampipe
.add(self
.vp8enc
)
303 self
.streampipe
.add(self
.mkvmux
)
304 self
.streampipe
.add(self
.webmmux
)
305 self
.streampipe
.add(self
.tee_rawaudio
)
306 self
.streampipe
.add(self
.tee_rawvideo
)
307 self
.streampipe
.add(self
.tee_streamaudio
)
308 self
.streampipe
.add(self
.tee_streamfull
)
309 self
.streampipe
.add(self
.queuev_2
)
310 self
.streampipe
.add(self
.queuev_4
)
311 self
.streampipe
.add(self
.queuev_5
)
312 self
.streampipe
.add(self
.queuea_4
)
313 self
.streampipe
.add(self
.queuea_5
)
314 self
.streampipe
.add(self
.queuem_1
)
315 self
.streampipe
.add(self
.queuem_2
)
317 self
.streampipe
.add(self
.screensink
)
319 self
.streampipe
.add(self
.disksink_rawvideo
)
320 self
.streampipe
.add(self
.disksink_audio
)
321 self
.streampipe
.add(self
.disksink_stream
)
322 self
.streampipe
.add(self
.icecastsink_audio
)
323 self
.streampipe
.add(self
.icecastsink_stream
)
325 self
.streampipe
.add(self
.audiosink
)
327 if self
.feed
== 'main' or self
.feed
== 'test':
329 self
.streampipe
.add(self
.videosrc
)
331 self
.streampipe
.add(self
.rtpjpegdepay
)
332 self
.streampipe
.add(self
.jpegdec
)
333 self
.streampipe
.add(self
.scaling
)
334 self
.streampipe
.add(self
.capsfilter
)
335 self
.streampipe
.add(self
.tee_videodecoded
)
336 self
.streampipe
.add(self
.queuev_1
)
337 if self
.feed
== 'test':
338 print ('TEST OK...', end
='')
339 elif self
.feed
== 'backup':
341 self
.streampipe
.add(self
.videosrc_backup
)
343 self
.streampipe
.add(self
.capsfilter_backup
)
344 print ('BACKUP OK...', end
='')
347 def link_pipeline_elements(self
):
348 """Link all elements with static pads."""
349 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
350 cond
= self
.feed
!= 'test'
353 self
.audiosrc
.link(self
.audiolevel
)
354 self
.audiolevel
.link(self
.queuea_1
)
356 self
.queuea_1
.link(self
.vorbisenc
)
357 self
.connect_tee(self
.tee_rawaudio
,
361 self
.queuea_2
.link(self
.oggmux
)
362 self
.connect_tee(self
.tee_streamaudio
,
366 self
.queuea_3
.link(self
.disksink_audio
)
367 self
.queuea_4
.link(self
.icecastsink_audio
)
368 self
.queuea_5
.link(self
.webmmux
)
370 self
.queuea_1
.link(self
.audiosink
)
374 self
.queuev_2
.link(self
.mkvmux
)
375 self
.mkvmux
.link(self
.queuev_4
)
376 self
.queuev_4
.link(self
.disksink_rawvideo
)
378 self
.queuev_1
.link(self
.rtpjpegdepay
)
379 self
.rtpjpegdepay
.link(self
.jpegdec
)
380 self
.jpegdec
.link(self
.queuev_3
)
381 self
.queuev_3
.link(self
.screensink
)
383 # Stream (audio+video) feed:
385 self
.vp8enc
.link(self
.queuev_5
)
386 self
.queuev_5
.link(self
.webmmux
)
387 self
.connect_tee(self
.tee_streamfull
,
391 self
.queuem_1
.link(self
.disksink_stream
)
392 self
.queuem_2
.link(self
.icecastsink_stream
)
393 if self
.feed
== 'main':
394 # linking here RTSP feed
395 self
.queuev_1
.link(self
.rtpjpegdepay
)
396 self
.connect_tee(self
.tee_rawvideo
,
400 self
.connect_tee(self
.tee_videodecoded
,
404 # Stream (video) feed:
405 self
.scaling
.link(self
.capsfilter
)
406 self
.capsfilter
.link(self
.vp8enc
)
407 elif self
.feed
== 'backup':
408 # linking here backup feed (WEBCAM)
409 self
.videosrc_backup
.link(self
.capsfilter_backup
)
410 self
.connect_tee(self
.tee_rawvideo
,
411 self
.capsfilter_backup
,
414 output_element_3
=self
.vp8enc
)
415 ## self.capsfilter_backup.link(self.queuev_3)
416 print('BACKUP OK...', end
='')
418 print('TEST OK...', end
='')
421 def create_gstreamer_pipeline(self
):
422 # New empty pipeline:
423 self
.streampipe
= Gst
.Pipeline()
424 self
.create_pipeline_elements()
426 self
.add_elements_to_pipeline()
427 self
.link_pipeline_elements()
428 if self
.feed
== 'main' or self
.feed
== 'test':
429 self
.create_pipeline_callbacks()
432 bus
= self
.streampipe
.get_bus()
433 bus
.add_signal_watch()
434 bus
.enable_sync_message_emission()
435 # Used to get messages that GStreamer emits.
436 bus
.connect("message", self
.on_message
)
438 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
439 return self
.streampipe
441 def on_message(self
, bus
, message
):
443 if t
== Gst
.MessageType
.EOS
:
444 self
.streampipe
.set_state(Gst
.State
.NULL
)
445 elif t
== Gst
.MessageType
.ERROR
:
446 err
, debug
= message
.parse_error()
447 print (ERROR
, '%s' % err
, debug
)
449 def stream_play(self
):
450 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
451 if self
.feed
== 'backup':
452 print(WARN
, gettime(), 'Backup pipeline started.')
453 print(INFO
, gettime(), 'PLAYING State resquested')
455 def stream_stop(self
):
456 self
.streampipe
.set_state(Gst
.State
.NULL
)
457 print(INFO
, gettime(), 'STOPPED State resquested')
459 def set_filenames(self
, string
, streamfailed
=False):
460 """Sets filename and location for each sink."""
463 audio
= sources
['DIR'] + filename
+ '_AUDIO'
464 rawvideo
= sources
['DIR'] + filename
+ '_RAWVIDEO'
465 stream
= sources
['DIR'] + filename
+ '_STREAM'
466 print('FEED STATE: ', self
.feed
)
467 if self
.feed
== 'main':
468 if streamfailed
and filename
:
469 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
470 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
471 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
472 rename(AUDIO_DEFAULT
, audio
)
473 rename(RAWVIDEO_DEFAULT
, rawvideo
)
474 rename(STREAM_DEFAULT
, stream
)
477 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
478 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
479 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
480 rename(AUDIO_DEFAULT
, audio
)
481 rename(RAWVIDEO_DEFAULT
, rawvideo
)
482 rename(STREAM_DEFAULT
, stream
)
485 rename(AUDIO_DEFAULT
, audio
)
486 rename(RAWVIDEO_DEFAULT
, rawvideo
)
487 rename(STREAM_DEFAULT
, stream
)
488 elif self
.feed
== 'backup':
489 print('INSIDE BACKUP RENAMING')
490 rename(AUDIO_BACKUP
, audio
)
491 rename(RAWVIDEO_BACKUP
, rawvideo
)
492 rename(STREAM_BACKUP
, stream
)
497 def get_gstreamer_bus():
501 return strftime('%y-%m-%d_%H:%M:%S ', localtime())