1 #!/usr/bin/env python3.4
2 # -*- coding: utf-8 -*-
4 # This file is part of ABYSS.
5 # ABYSS Broadcast Your Streaming Successfully
7 # ABYSS is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # ABYSS is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with ABYSS. If not, see <http://www.gnu.org/licenses/>.
20 # Copyright (c) 2016 David Testé
23 from os
import listdir
26 from time
import localtime
, strftime
30 from gi
.repository
import Gst
31 from gi
.repository
import GstVideo
33 DIR_NAME
= 'FILES_RECORDED'
34 AUDIO_DEFAULT
= './' + DIR_NAME
+ '/' + 'AUDIO_DEFAULT'
35 RAWVIDEO_DEFAULT
= './' + DIR_NAME
+ '/' + 'RAWVIDEO_DEFAULT'
36 STREAM_DEFAULT
= './' + DIR_NAME
+ '/' + 'STREAM_DEFAULT'
37 BACKUP_SUFFIX
= '_BACKUP'
38 FAILED_SUFFIX
= '_FAILED_'
40 AUDIO_BACKUP
= AUDIO_DEFAULT
+ BACKUP_SUFFIX
41 RAWVIDEO_BACKUP
= RAWVIDEO_DEFAULT
+ BACKUP_SUFFIX
42 STREAM_BACKUP
= STREAM_DEFAULT
+ BACKUP_SUFFIX
48 sources
= {'RTSP_IP' : None,
49 'AUDIO_INPUT' : None,}
50 sinks
= {'AUDIO_OUTPUT' : None,
52 'STREAM_SERVER_IP' : None,
56 'VIDEO_MOUNT' : None,}
58 ##AUDIO_INPUT = 'alsa_input.usb-Burr-Brown_from_TI_USB_Audio_CODEC-00-CODEC.analog-stereo'
59 ##AUDIO_OUTPUT = 'alsa_output.pci-0000_00_1b.0.analog-stereo'
61 config
= configparser
.RawConfigParser()
62 if path
.exists(CONFIG
):
65 sources
= {key
: config
.get('sources', key
) for key
in sources
}
66 sinks
= {key
: config
.get('sinks', key
) for key
in sinks
}
68 print(ERROR
, gettime(), 'Failed to parse config file.')
70 print(ERROR
, gettime(), '".abyss" config file doesn\'t exist.')
72 if not path
.isdir(DIR_NAME
):
75 class New_user_pipeline():
78 def __init__(self
, feed
='main'):
79 self
.rtsp_address
= 'rtsp://' + sources
['RTSP_IP']
81 self
.user_pipeline
= self
.create_gstreamer_pipeline()
83 def create_video_sources(self
):
84 """Create video inputs from various sources."""
85 self
.videosrc
= Gst
.ElementFactory
.make('rtspsrc', 'videosrc')
86 self
.videosrc
.set_property('location', self
.rtsp_address
)
87 self
.videosrc
.set_property('latency', 0)
88 ## self.videosrc.set_property('debug', True)
89 if self
.feed
== 'backup':
90 self
.videosrc_backup
= Gst
.ElementFactory
.make('v4l2src',
92 device_location
= self
.find_webcam_device()
93 self
.videosrc_backup
.set_property('device', device_location
)
95 def find_webcam_device(self
):
96 """Look out for the USB webcam device."""
97 devices
= [dev
for dev
in listdir('/dev/') if 'video' in dev
]
99 # In case of computer having a built-in webcam
100 if item
!= 'video0' and len(devices
) > 1:
101 return '/dev/' + item
102 # Without built-in webcam
103 elif len(devices
) == 1:
105 print(ERROR
, gettime(), 'No webcam device found.')
107 def find_mixingdesk_device(self
):
108 """Look out for the USB mixing desk device.
109 Product used here: Behringer XENYX Q1002USB.
111 # shell cmd : 'pactl list | grep alsa_input'
112 # AUDIO_INPUT --> const used currently
115 def create_pipeline_callbacks(self
):
116 """Callbacks to connect dynamically created pads."""
117 self
.videosrc
.connect('pad-added', self
.on_pad_added_to_rtspsrc
)
119 def on_pad_added_to_rtspsrc(self
, rtspsrc
, pad
):
120 """Connect the dynamic 'src'pad of an RTSP source."""
121 sinkpad
= self
.queuev_1
.get_static_pad('sink')
124 def create_audio_sources(self
):
125 """Create audio inputs from various sources."""
126 self
.audiosrc
= Gst
.ElementFactory
.make('pulsesrc', 'audiosrc')
127 self
.audiosrc
.set_property('device', sources
['AUDIO_INPUT'])
129 def create_audiolevel_plugin(self
):
130 """Create audio level plugin to feed a vu-meter."""
131 self
.audiolevel
= Gst
.ElementFactory
.make('level', 'audiolevel')
132 self
.audiolevel
.set_property('interval', 200000000)
134 def create_filesink(self
):
135 """Create storable output elements."""
136 self
.disksink_rawvideo
= Gst
.ElementFactory
.make('filesink')
137 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_DEFAULT
)
138 self
.disksink_audio
= Gst
.ElementFactory
.make('filesink')
139 self
.disksink_audio
.set_property('location', AUDIO_DEFAULT
)
140 self
.disksink_stream
= Gst
.ElementFactory
.make('filesink')
141 self
.disksink_stream
.set_property('location', STREAM_DEFAULT
)
142 if self
.feed
== 'backup':
143 self
.disksink_rawvideo
.set_property('location', RAWVIDEO_BACKUP
)
144 self
.disksink_audio
.set_property('location', AUDIO_BACKUP
)
145 self
.disksink_stream
.set_property('location', STREAM_BACKUP
)
147 def create_streamsink(self
):
148 """Create streamable output elements."""
150 self
.screensink
= Gst
.ElementFactory
.make('xvimagesink', 'screensink')
151 self
.screensink
.set_property('sync', False)
152 # To local audio output (headphones):
153 self
.audiosink
= Gst
.ElementFactory
.make('pulsesink', 'audiosink')
154 self
.audiosink
.set_property('device', sinks
['AUDIO_OUTPUT'])
155 self
.audiosink
.set_property('sync', False)
157 self
.icecastsink_audio
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_audio')
158 self
.icecastsink_audio
.set_property('sync', False)
159 self
.icecastsink_audio
.set_property('ip', sinks
['STREAM_SERVER_IP'])
160 self
.icecastsink_audio
.set_property('port', int(sinks
['SERVER_PORT']))
161 self
.icecastsink_audio
.set_property('mount', sinks
['AUDIO_MOUNT'])
162 self
.icecastsink_audio
.set_property('password', sinks
['PASSWORD'])
163 self
.icecastsink_stream
= Gst
.ElementFactory
.make('shout2send', 'icecastsink_stream')
164 self
.icecastsink_stream
.set_property('sync', False)
165 self
.icecastsink_stream
.set_property('ip', sinks
['STREAM_SERVER_IP'])
166 self
.icecastsink_stream
.set_property('port', int(sinks
['SERVER_PORT']))
167 self
.icecastsink_stream
.set_property('mount', sinks
['VIDEO_MOUNT'])
168 self
.icecastsink_stream
.set_property('password', sinks
['PASSWORD'])
170 def create_payloader_elements(self
):
173 def create_depayloader_elements(self
):
174 self
.rtpjpegdepay
= Gst
.ElementFactory
.make('rtpjpegdepay', 'rtpjpegdepay')
176 def create_encoder_elements(self
):
178 self
.vorbisenc
= Gst
.ElementFactory
.make('vorbisenc', 'vorbisenc')
180 self
.vp8enc
= Gst
.ElementFactory
.make('vp8enc', 'vp8enc')
181 self
.vp8enc
.set_property('min_quantizer', 1)
182 self
.vp8enc
.set_property('max_quantizer', 13)
183 self
.vp8enc
.set_property('cpu-used', 5)
184 self
.vp8enc
.set_property('deadline', 1)
185 ## self.vp8enc.set_property('target-bitrate', 2000000)
186 self
.vp8enc
.set_property('threads', 2)
187 self
.vp8enc
.set_property('sharpness', 7)
189 def create_decoder_elements(self
):
190 self
.jpegdec
= Gst
.ElementFactory
.make('jpegdec', 'jpegdec')
191 self
.jpegdec
.set_property('max-errors', -1)
193 def create_muxer_elements(self
):
194 self
.oggmux
= Gst
.ElementFactory
.make('oggmux', 'oggmux')
195 self
.mkvmux
= Gst
.ElementFactory
.make('matroskamux', 'mkvmux')
196 self
.webmmux
= Gst
.ElementFactory
.make('webmmux', 'webmmux')
197 self
.webmmux
.set_property('streamable', True)
199 def create_demuxer_elements(self
):
202 def create_filtering_elements(self
):
203 self
.scaling
= Gst
.ElementFactory
.make('videoscale', 'scaling')
204 caps
= Gst
.caps_from_string(
205 'video/x-raw, width=(int)640, height=(int)360, framerate=(float)25/1')
206 self
.capsfilter
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter')
207 self
.capsfilter
.set_property('caps', caps
)
209 caps_backup
= Gst
.caps_from_string('video/x-raw, width=(int)640, height=(int)360')
210 self
.capsfilter_backup
= Gst
.ElementFactory
.make('capsfilter', 'capsfilter_backup')
211 self
.capsfilter_backup
.set_property('caps', caps_backup
)
213 def create_tee_elements(self
):
214 """Create tee elements to divide feeds."""
215 self
.tee_rawvideo
= Gst
.ElementFactory
.make('tee', 'tee_rawvideo')
216 self
.tee_videodecoded
= Gst
.ElementFactory
.make('tee', 'tee_videodecoded')
217 self
.tee_streamfull
= Gst
.ElementFactory
.make('tee', 'tee_streamfull')
218 self
.tee_rawaudio
= Gst
.ElementFactory
.make('tee', 'tee_rawaudio')
219 self
.tee_streamaudio
= Gst
.ElementFactory
.make('tee', 'tee_streamaudio')
221 def connect_tee(self
,
226 output_element_3
=None,):
227 """Links input and outputs of a given tee element."""
228 # Find a way to check if the element given are in the pipeline
229 # then pass the result to the 'if' statement.
230 ## argcheck = [True for arg in locals() if arg in 'the_list_of_elements_added']
231 ## print('[DEBUG] ArgList check: ', argcheck)
232 ## if False not in argcheck
234 input_element
.link(tee_element
)
235 tee_element
.link(output_element_1
)
236 tee_element
.link(output_element_2
)
238 tee_element
.link(output_element_3
)
242 'Couldn\'t link the tee. Element(s) probably not in the pipeline ')
244 def create_queues(self
):
246 self
.queuev_1
= Gst
.ElementFactory
.make('queue', 'queuev_1')
247 self
.queuev_2
= Gst
.ElementFactory
.make('queue', 'queuev_2')
248 self
.queuev_3
= Gst
.ElementFactory
.make('queue', 'queuev_3')
249 self
.queuev_4
= Gst
.ElementFactory
.make('queue', 'queuev_4')
250 self
.queuev_5
= Gst
.ElementFactory
.make('queue', 'queuev_5')
252 self
.queuea_1
= Gst
.ElementFactory
.make('queue', 'queuea_1')
253 self
.queuea_2
= Gst
.ElementFactory
.make('queue', 'queuea_2')
254 self
.queuea_3
= Gst
.ElementFactory
.make('queue', 'queuea_3')
255 self
.queuea_4
= Gst
.ElementFactory
.make('queue', 'queuea_4')
256 self
.queuea_4
.set_property('leaky', 2)
257 self
.queuea_5
= Gst
.ElementFactory
.make('queue', 'queuea_5')
258 # For audio+video muxer:
259 self
.queuem_1
= Gst
.ElementFactory
.make('queue', 'queuem_1')
260 self
.queuem_2
= Gst
.ElementFactory
.make('queue', 'queuem_2')
261 self
.queuem_2
.set_property('leaky', 2)
263 def create_pipeline_elements(self
):
264 print(INFO
, gettime(), 'Pipeline creation state: creating elements... ', end
='')
266 self
.create_video_sources()
267 self
.create_audio_sources()
269 self
.create_audiolevel_plugin()
270 self
.create_payloader_elements()
271 self
.create_depayloader_elements()
272 self
.create_encoder_elements()
273 self
.create_decoder_elements()
274 self
.create_muxer_elements()
275 self
.create_filtering_elements()
276 self
.create_tee_elements()
279 self
.create_filesink()
280 self
.create_streamsink()
281 if self
.feed
== 'test':
282 print('TEST OK...', end
='')
284 if self
.feed
== 'backup':
287 'Webcam device location: ',
288 self
.videosrc_backup
.get_property('device'))
291 def add_elements_to_pipeline(self
):
292 print(INFO
, gettime(), 'Pipeline creation state: adding elements... ', end
='')
293 cond
= self
.feed
!= 'test'
296 self
.streampipe
.add(self
.audiosrc
)
298 self
.streampipe
.add(self
.audiolevel
)
299 self
.streampipe
.add(self
.queuea_1
)
300 self
.streampipe
.add(self
.queuev_3
)
302 self
.streampipe
.add(self
.vorbisenc
)
303 self
.streampipe
.add(self
.oggmux
)
304 self
.streampipe
.add(self
.queuea_2
)
305 self
.streampipe
.add(self
.queuea_3
)
306 self
.streampipe
.add(self
.vp8enc
)
307 self
.streampipe
.add(self
.mkvmux
)
308 self
.streampipe
.add(self
.webmmux
)
309 self
.streampipe
.add(self
.tee_rawaudio
)
310 self
.streampipe
.add(self
.tee_rawvideo
)
311 self
.streampipe
.add(self
.tee_streamaudio
)
312 self
.streampipe
.add(self
.tee_streamfull
)
313 self
.streampipe
.add(self
.queuev_2
)
314 self
.streampipe
.add(self
.queuev_4
)
315 self
.streampipe
.add(self
.queuev_5
)
316 self
.streampipe
.add(self
.queuea_4
)
317 self
.streampipe
.add(self
.queuea_5
)
318 self
.streampipe
.add(self
.queuem_1
)
319 self
.streampipe
.add(self
.queuem_2
)
321 self
.streampipe
.add(self
.screensink
)
323 self
.streampipe
.add(self
.disksink_rawvideo
)
324 self
.streampipe
.add(self
.disksink_audio
)
325 self
.streampipe
.add(self
.disksink_stream
)
326 self
.streampipe
.add(self
.icecastsink_audio
)
327 self
.streampipe
.add(self
.icecastsink_stream
)
329 self
.streampipe
.add(self
.audiosink
)
331 if self
.feed
== 'main' or self
.feed
== 'test':
333 self
.streampipe
.add(self
.videosrc
)
335 self
.streampipe
.add(self
.rtpjpegdepay
)
336 self
.streampipe
.add(self
.jpegdec
)
337 self
.streampipe
.add(self
.scaling
)
338 self
.streampipe
.add(self
.capsfilter
)
339 self
.streampipe
.add(self
.tee_videodecoded
)
340 self
.streampipe
.add(self
.queuev_1
)
341 if self
.feed
== 'test':
342 print ('TEST OK...', end
='')
343 elif self
.feed
== 'backup':
345 self
.streampipe
.add(self
.videosrc_backup
)
347 self
.streampipe
.add(self
.capsfilter_backup
)
348 print ('BACKUP OK...', end
='')
351 def link_pipeline_elements(self
):
352 """Link all elements with static pads."""
353 print(INFO
, gettime(), 'Pipeline creation state: linking elements... ', end
='')
354 cond
= self
.feed
!= 'test'
357 self
.audiosrc
.link(self
.audiolevel
)
358 self
.audiolevel
.link(self
.queuea_1
)
360 self
.queuea_1
.link(self
.vorbisenc
)
361 self
.connect_tee(self
.tee_rawaudio
,
365 self
.queuea_2
.link(self
.oggmux
)
366 self
.connect_tee(self
.tee_streamaudio
,
370 self
.queuea_3
.link(self
.disksink_audio
)
371 self
.queuea_4
.link(self
.icecastsink_audio
)
372 self
.queuea_5
.link(self
.webmmux
)
374 self
.queuea_1
.link(self
.audiosink
)
378 self
.queuev_2
.link(self
.mkvmux
)
379 self
.mkvmux
.link(self
.queuev_4
)
380 self
.queuev_4
.link(self
.disksink_rawvideo
)
382 self
.queuev_1
.link(self
.rtpjpegdepay
)
383 self
.rtpjpegdepay
.link(self
.jpegdec
)
384 self
.jpegdec
.link(self
.queuev_3
)
385 self
.queuev_3
.link(self
.screensink
)
387 # Stream (audio+video) feed:
389 self
.vp8enc
.link(self
.queuev_5
)
390 self
.queuev_5
.link(self
.webmmux
)
391 self
.connect_tee(self
.tee_streamfull
,
395 self
.queuem_1
.link(self
.disksink_stream
)
396 self
.queuem_2
.link(self
.icecastsink_stream
)
397 if self
.feed
== 'main':
398 # linking here RTSP feed
399 self
.queuev_1
.link(self
.rtpjpegdepay
)
400 self
.connect_tee(self
.tee_rawvideo
,
404 self
.connect_tee(self
.tee_videodecoded
,
408 # Stream (video) feed:
409 self
.scaling
.link(self
.capsfilter
)
410 self
.capsfilter
.link(self
.vp8enc
)
411 elif self
.feed
== 'backup':
412 # linking here backup feed (WEBCAM)
413 self
.videosrc_backup
.link(self
.capsfilter_backup
)
414 self
.connect_tee(self
.tee_rawvideo
,
415 self
.capsfilter_backup
,
418 output_element_3
=self
.vp8enc
)
419 print('BACKUP OK...', end
='')
421 print('TEST OK...', end
='')
424 def create_gstreamer_pipeline(self
):
425 # New empty pipeline:
426 self
.streampipe
= Gst
.Pipeline()
427 self
.create_pipeline_elements()
429 self
.add_elements_to_pipeline()
430 self
.link_pipeline_elements()
431 if self
.feed
== 'main' or self
.feed
== 'test':
432 self
.create_pipeline_callbacks()
435 bus
= self
.streampipe
.get_bus()
436 bus
.add_signal_watch()
437 bus
.enable_sync_message_emission()
438 # Used to get messages that GStreamer emits.
439 bus
.connect("message", self
.on_message
)
441 print(INFO
, gettime(), 'Pipeline creation state: successfully done.')
442 return self
.streampipe
444 def on_message(self
, bus
, message
):
446 if t
== Gst
.MessageType
.EOS
:
447 self
.streampipe
.set_state(Gst
.State
.NULL
)
448 elif t
== Gst
.MessageType
.ERROR
:
449 err
, debug
= message
.parse_error()
450 print (ERROR
, '%s' % err
, debug
)
452 def stream_play(self
):
453 self
.streampipe
.set_state(Gst
.State
.PLAYING
)
454 if self
.feed
== 'backup':
455 print(WARN
, gettime(), 'Backup pipeline started.')
456 print(INFO
, gettime(), 'PLAYING State resquested.')
458 def stream_stop(self
):
459 self
.streampipe
.set_state(Gst
.State
.NULL
)
460 print(INFO
, gettime(), 'STOPPED State resquested.')
462 def set_filenames(self
, string
, streamfailed
=False):
463 """Sets filename and location for each sink."""
466 audio
= './' + DIR_NAME
+ '/' + filename
+ '_AUDIO'
467 rawvideo
= './' + DIR_NAME
+ '/' + filename
+ '_RAWVIDEO'
468 stream
= './' + DIR_NAME
+ '/' + filename
+ '_STREAM'
469 if self
.feed
== 'main':
470 if streamfailed
and filename
:
471 audio
= audio
+ FAILED_SUFFIX
+ str(fail_counter
)
472 rawvideo
= rawvideo
+ FAILED_SUFFIX
+ str(fail_counter
)
473 stream
= stream
+ FAILED_SUFFIX
+ str(fail_counter
)
474 self
.rename_files(audio
, rawvideo
, stream
)
477 audio
= AUDIO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
478 rawvideo
= RAWVIDEO_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
479 stream
= STREAM_DEFAULT
+ FAILED_SUFFIX
+ str(fail_counter
)
480 self
.rename_files(audio
, rawvideo
, stream
)
483 self
.rename_files(audio
, rawvideo
, stream
)
484 elif self
.feed
== 'backup':
485 ## print('INSIDE BACKUP RENAMING')
486 rename(AUDIO_BACKUP
, audio
)
487 rename(RAWVIDEO_BACKUP
, rawvideo
)
488 rename(STREAM_BACKUP
, stream
)
490 print(INFO
, gettime(), 'Audio file written on disk.')
491 print(INFO
, gettime(), 'Raw video file written on disk.')
492 print(INFO
, gettime(), 'Streamed file written on disk.')
494 def rename_files(self
, audio_name
, rawvideo_name
, stream_name
):
495 rename(AUDIO_DEFAULT
, audio_name
)
496 rename(RAWVIDEO_DEFAULT
, rawvideo_name
)
497 rename(STREAM_DEFAULT
, stream_name
)
499 def get_gstreamer_bus():
503 return strftime('%y-%m-%d_%H:%M:%S ', localtime())