fixed issue 19, stream is thread now
authorBryan Salas <bryans5252@sbcglobal.net>
Sat, 2 Aug 2014 20:53:50 +0000 (15:53 -0500)
committerBryan Salas <bryans5252@sbcglobal.net>
Sat, 2 Aug 2014 20:53:50 +0000 (15:53 -0500)
rainbowstream/db.py
rainbowstream/rainbow.py

index 192448f..e9f2ccc 100644 (file)
@@ -1,6 +1,7 @@
 import os
 from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
+from sqlalchemy.pool import StaticPool
 from .table_def import *
 
 
@@ -15,7 +16,7 @@ class RainbowDB():
         if os.path.isfile('rainbow.db'):
             os.system('rm -rf rainbow.db')
         init_db()
-        self.engine = create_engine('sqlite:///rainbow.db', echo=False)
+        self.engine = create_engine('sqlite:///rainbow.db', echo=False, connect_args={'check_same_thread':False}, poolclass=StaticPool)
 
     def tweet_store(self, tweet_id):
         """
index c0ca8fe..257b75e 100644 (file)
@@ -9,6 +9,7 @@ import sys
 import signal
 import argparse
 import time
+import threading
 import requests
 import webbrowser
 
@@ -33,6 +34,10 @@ g = {}
 # Database
 db = RainbowDB()
 
+# Lock for streams
+
+StreamLock = threading.Lock()
+
 # Commands
 cmdset = [
     'switch',
@@ -193,37 +198,29 @@ def switch():
             keyword = g['stuff'].split()[1]
             if keyword[0] == '#':
                 keyword = keyword[1:]
-            # Kill old process
-            os.kill(g['stream_pid'], signal.SIGKILL)
+            # Kill old thread
+            g['stream_stop'] = True
             args.track_keywords = keyword
-            # Start new process
-            p = Process(
-                target=stream,
-                args=(
-                    c['PUBLIC_DOMAIN'],
-                    args))
-            p.start()
-            g['stream_pid'] = p.pid
+            # Start new thread
+            th = threading.Thread(target=stream, args=(c['PUBLIC_DOMAIN'], args))
+            th.daemon = True
+            th.start()
         # Personal stream
         elif target == 'mine':
-            # Kill old process
-            os.kill(g['stream_pid'], signal.SIGKILL)
-            # Start new process
-            p = Process(
-                target=stream,
-                args=(
-                    c['USER_DOMAIN'],
-                    args,
-                    g['original_name']))
-            p.start()
-            g['stream_pid'] = p.pid
+            # Kill old thread
+            g['stream_stop'] = True
+            # Start new thread
+            th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
+            th.daemon = True
+            th.start()
+        g['prefix'] = True
         printNicely('')
         if args.filter:
             printNicely(cyan('Only: ' + str(args.filter)))
         if args.ignore:
             printNicely(red('Ignore: ' + str(args.ignore)))
         printNicely('')
-    except:
+    except Exception:
         printNicely(red('Sorry I can\'t understand.'))
 
 
@@ -1474,7 +1471,6 @@ def quit():
     try:
         save_history()
         os.system('rm -rf rainbow.db')
-        os.kill(g['stream_pid'], signal.SIGKILL)
         printNicely(green('See you next time :)'))
     except:
         pass
@@ -1618,6 +1614,7 @@ def listen():
         if g['prefix']:
             line = raw_input(g['decorated_name'](c['PREFIX']))
         else:
+            print('prefix is false')
             line = raw_input()
         try:
             cmd = line.split()[0]
@@ -1632,7 +1629,7 @@ def listen():
             # Process the command
             process(cmd)()
             # Not re-display
-            if cmd in ['switch', 't', 'rt', 'rep']:
+            if cmd in ['t', 'rt', 'rep']:
                 g['prefix'] = False
             else:
                 g['prefix'] = True
@@ -1657,7 +1654,7 @@ def stream(domain, args, name='Rainbow Stream'):
     # These arguments are optional:
     stream_args = dict(
         timeout=args.timeout,
-        block=not args.no_block,
+        block=False,
         heartbeat_timeout=args.heartbeat_timeout)
     # Track keyword
     query_args = dict()
@@ -1678,9 +1675,15 @@ def stream(domain, args, name='Rainbow Stream'):
                 tweet_iter = stream.statuses.filter(**query_args)
             else:
                 tweet_iter = stream.statuses.sample()
+        # Block new stream until other one exits
+        StreamLock.acquire()
+        g['stream_stop'] = False
         for tweet in tweet_iter:
+            if(g['stream_stop'] == True):
+                StreamLock.release()
+                break
             if tweet is None:
-                printNicely("-- None --")
+                pass
             elif tweet is Timeout:
                 printNicely("-- Timeout --")
             elif tweet is HeartbeatTimeout:
@@ -1695,6 +1698,8 @@ def stream(domain, args, name='Rainbow Stream'):
                     fil=args.filter,
                     ig=args.ignore,
                 )
+                sys.stdout.write(g['decorated_name'](c['PREFIX']) + readline.get_line_buffer())
+                sys.stdout.flush()
             elif tweet.get('direct_message'):
                 print_message(tweet['direct_message'], check_semaphore=True)
     except TwitterHTTPError:
@@ -1719,17 +1724,12 @@ def fly():
         save_history()
         os.system('rm -rf rainbow.db')
         sys.exit()
-    # Spawn stream process
-    p = Process(
-        target=stream,
-        args=(
-            c['USER_DOMAIN'],
-            args,
-            g['original_name']))
-    p.start()
+    # Spawn stream thread
+    th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
+    th.daemon = True
+    th.start()
     # Start listen process
     time.sleep(0.5)
     g['reset'] = True
     g['prefix'] = True
-    g['stream_pid'] = p.pid
     listen()