From 929839450b0745b22e08c76839f3b6dcf1100c48 Mon Sep 17 00:00:00 2001 From: Bryan Salas Date: Sat, 2 Aug 2014 15:53:50 -0500 Subject: [PATCH] fixed issue 19, stream is thread now --- rainbowstream/db.py | 3 +- rainbowstream/rainbow.py | 70 ++++++++++++++++++++-------------------- 2 files changed, 37 insertions(+), 36 deletions(-) diff --git a/rainbowstream/db.py b/rainbowstream/db.py index 192448f..e9f2ccc 100644 --- a/rainbowstream/db.py +++ b/rainbowstream/db.py @@ -1,6 +1,7 @@ import os from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool from .table_def import * @@ -15,7 +16,7 @@ class RainbowDB(): if os.path.isfile('rainbow.db'): os.system('rm -rf rainbow.db') init_db() - self.engine = create_engine('sqlite:///rainbow.db', echo=False) + self.engine = create_engine('sqlite:///rainbow.db', echo=False, connect_args={'check_same_thread':False}, poolclass=StaticPool) def tweet_store(self, tweet_id): """ diff --git a/rainbowstream/rainbow.py b/rainbowstream/rainbow.py index c0ca8fe..257b75e 100644 --- a/rainbowstream/rainbow.py +++ b/rainbowstream/rainbow.py @@ -9,6 +9,7 @@ import sys import signal import argparse import time +import threading import requests import webbrowser @@ -33,6 +34,10 @@ g = {} # Database db = RainbowDB() +# Lock for streams + +StreamLock = threading.Lock() + # Commands cmdset = [ 'switch', @@ -193,37 +198,29 @@ def switch(): keyword = g['stuff'].split()[1] if keyword[0] == '#': keyword = keyword[1:] - # Kill old process - os.kill(g['stream_pid'], signal.SIGKILL) + # Kill old thread + g['stream_stop'] = True args.track_keywords = keyword - # Start new process - p = Process( - target=stream, - args=( - c['PUBLIC_DOMAIN'], - args)) - p.start() - g['stream_pid'] = p.pid + # Start new thread + th = threading.Thread(target=stream, args=(c['PUBLIC_DOMAIN'], args)) + th.daemon = True + th.start() # Personal stream elif target == 'mine': - # Kill old process - os.kill(g['stream_pid'], signal.SIGKILL) - # Start new process - p = Process( - target=stream, - args=( - c['USER_DOMAIN'], - args, - g['original_name'])) - p.start() - g['stream_pid'] = p.pid + # Kill old thread + g['stream_stop'] = True + # Start new thread + th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name'])) + th.daemon = True + th.start() + g['prefix'] = True printNicely('') if args.filter: printNicely(cyan('Only: ' + str(args.filter))) if args.ignore: printNicely(red('Ignore: ' + str(args.ignore))) printNicely('') - except: + except Exception: printNicely(red('Sorry I can\'t understand.')) @@ -1474,7 +1471,6 @@ def quit(): try: save_history() os.system('rm -rf rainbow.db') - os.kill(g['stream_pid'], signal.SIGKILL) printNicely(green('See you next time :)')) except: pass @@ -1618,6 +1614,7 @@ def listen(): if g['prefix']: line = raw_input(g['decorated_name'](c['PREFIX'])) else: + print('prefix is false') line = raw_input() try: cmd = line.split()[0] @@ -1632,7 +1629,7 @@ def listen(): # Process the command process(cmd)() # Not re-display - if cmd in ['switch', 't', 'rt', 'rep']: + if cmd in ['t', 'rt', 'rep']: g['prefix'] = False else: g['prefix'] = True @@ -1657,7 +1654,7 @@ def stream(domain, args, name='Rainbow Stream'): # These arguments are optional: stream_args = dict( timeout=args.timeout, - block=not args.no_block, + block=False, heartbeat_timeout=args.heartbeat_timeout) # Track keyword query_args = dict() @@ -1678,9 +1675,15 @@ def stream(domain, args, name='Rainbow Stream'): tweet_iter = stream.statuses.filter(**query_args) else: tweet_iter = stream.statuses.sample() + # Block new stream until other one exits + StreamLock.acquire() + g['stream_stop'] = False for tweet in tweet_iter: + if(g['stream_stop'] == True): + StreamLock.release() + break if tweet is None: - printNicely("-- None --") + pass elif tweet is Timeout: printNicely("-- Timeout --") elif tweet is HeartbeatTimeout: @@ -1695,6 +1698,8 @@ def stream(domain, args, name='Rainbow Stream'): fil=args.filter, ig=args.ignore, ) + sys.stdout.write(g['decorated_name'](c['PREFIX']) + readline.get_line_buffer()) + sys.stdout.flush() elif tweet.get('direct_message'): print_message(tweet['direct_message'], check_semaphore=True) except TwitterHTTPError: @@ -1719,17 +1724,12 @@ def fly(): save_history() os.system('rm -rf rainbow.db') sys.exit() - # Spawn stream process - p = Process( - target=stream, - args=( - c['USER_DOMAIN'], - args, - g['original_name'])) - p.start() + # Spawn stream thread + th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name'])) + th.daemon = True + th.start() # Start listen process time.sleep(0.5) g['reset'] = True g['prefix'] = True - g['stream_pid'] = p.pid listen() -- 2.25.1