From 755700f58164cdffad8e4a77af32939470eee89c Mon Sep 17 00:00:00 2001 From: Josh Roesslein Date: Fri, 7 Aug 2009 13:36:46 -0500 Subject: [PATCH] Adding stream object to library. Got stream reading loop working. Added test program to watch stream of statuses from commandline. --- stream_watcher.py | 26 ++++++++++++++++++ tweepy/__init__.py | 1 + tweepy/streaming.py | 67 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+) create mode 100644 stream_watcher.py create mode 100644 tweepy/streaming.py diff --git a/stream_watcher.py b/stream_watcher.py new file mode 100644 index 0000000..949c90a --- /dev/null +++ b/stream_watcher.py @@ -0,0 +1,26 @@ +import time + +import tweepy + +def callback(stream_object): + + if 'text' in stream_object: + print stream_object['text'] + + +stream = tweepy.Stream('spritzer', 'tweebly', 'omega1987twitter') +stream.connect(callback) + +while True: + + try: + if not stream.running: + print 'Stream stopped!' + break + + time.sleep(1) + + except KeyboardInterrupt: + break + +stream.disconnect() diff --git a/tweepy/__init__.py b/tweepy/__init__.py index e3543c7..8b01bb7 100644 --- a/tweepy/__init__.py +++ b/tweepy/__init__.py @@ -12,6 +12,7 @@ from error import TweepError from api import API from cache import * from auth import BasicAuthHandler, OAuthHandler +from streaming import Stream # Global, unauthenticated instance of API api = API() diff --git a/tweepy/streaming.py b/tweepy/streaming.py new file mode 100644 index 0000000..afa8cab --- /dev/null +++ b/tweepy/streaming.py @@ -0,0 +1,67 @@ +# Tweepy +# Copyright 2009 Joshua Roesslein +# See LICENSE + +import httplib +from threading import Thread + +from . auth import BasicAuthHandler + +try: + import json +except ImportError: + import simplejson as json + +class Stream(object): + + def __init__(self, method, username, password, host='stream.twitter.com', buffer_size=1500): + self.method = method if method[0] == '/' else '/' + method + self.host = host + self.auth = BasicAuthHandler(username, password) + self.running = False + self.buffer_size = buffer_size + + def _run(self): + conn = httplib.HTTPConnection(self.host) + headers = {} + self.auth.apply_auth(None, None, headers, None) + conn.request('POST', self.method + '.json?delimited=length', headers=headers) + resp = conn.getresponse() + data = '' + while self.running: + if resp.isclosed(): + break + + # read length + length = '' + while True: + c = resp.read(1) + if c == '\n': + break + length += c + length = length.strip() + if length.isdigit(): + length = int(length) + else: + continue + + # read data + data = resp.read(length) + jobject = json.loads(data) + self.callback(jobject) + + conn.close() + self.running = False + + def connect(self, callback): + if self.running: + raise TweepError('Stream object already connected!') + self.callback = callback + self.running = True + Thread(target=self._run).start() + + def disconnect(self): + if not self.running: + raise TweepError('Stream object not connected!') + self.running = False + -- 2.25.1