2 Colorful user's timeline stream
4 from __future__
import print_function
5 from multiprocessing
import Process
6 from dateutil
import parser
16 from twitter
.stream
import TwitterStream
, Timeout
, HeartbeatTimeout
, Hangup
17 from twitter
.api
import *
18 from twitter
.oauth
import OAuth
, read_token_file
19 from twitter
.oauth_dance
import oauth_dance
20 from twitter
.util
import printNicely
24 from .interactive
import *
46 def draw(t
, keyword
=None):
53 screen_name
= t
['user']['screen_name']
54 name
= t
['user']['name']
55 created_at
= t
['created_at']
56 date
= parser
.parse(created_at
)
57 date
= date
- datetime
.timedelta(seconds
=time
.timezone
)
58 clock
= date
.strftime('%Y/%m/%d %H:%M:%S')
60 res
= db
.tweet_query(tid
)
63 res
= db
.tweet_query(tid
)
64 rid
= res
[0].rainbow_id
67 user
= cycle_color(name
) + grey(' ' + '@' + screen_name
+ ' ')
68 meta
= grey('[' + clock
+ '] [id=' + str(rid
) + ']')
71 tweet
= map(lambda x
: grey(x
) if x
== 'RT' else x
, tweet
)
72 # Highlight screen_name
73 tweet
= map(lambda x
: cycle_color(x
) if x
[0] == '@' else x
, tweet
)
75 tweet
= map(lambda x
: cyan(x
) if x
[0:7] == 'http://' else x
, tweet
)
76 # Highlight search keyword
79 lambda x
: on_yellow(x
) if
80 ''.join(c
for c
in x
if c
.isalnum()).lower() == keyword
.lower()
84 tweet
= ' '.join(tweet
)
87 line1
= u
"{u:>{uw}}:".format(
91 line2
= u
"{c:>{cw}}".format(
103 def parse_arguments():
107 parser
= argparse
.ArgumentParser(description
=__doc__
or "")
111 help='Timeout for the stream (seconds).')
114 '--heartbeat-timeout',
115 help='Set heartbeat timeout.',
121 help='Set stream to non-blocking.')
125 help='Search the stream for specific text.')
126 return parser
.parse_args()
131 Authenticate with Twitter OAuth
133 # When using rainbow stream you must authorize.
134 twitter_credential
= os
.environ
.get(
138 '')) + os
.sep
+ '.rainbow_oauth'
139 if not os
.path
.exists(twitter_credential
):
140 oauth_dance("Rainbow Stream",
144 oauth_token
, oauth_token_secret
= read_token_file(twitter_credential
)
152 def get_decorated_name():
154 Beginning of every line
156 t
= Twitter(auth
=authen())
157 name
= '@' + t
.account
.verify_credentials()['screen_name']
158 g
['original_name'] = name
[1:]
159 g
['decorated_name'] = grey('[') + grey(name
) + grey(']: ')
167 target
= g
['stuff'].split()[0]
170 if target
== 'public':
171 keyword
= g
['stuff'].split()[1]
172 if keyword
[0] == '#':
173 keyword
= keyword
[1:]
176 os
.kill(g
['stream_pid'], signal
.SIGKILL
)
177 args
= parse_arguments()
178 args
.track_keywords
= keyword
187 g
['stream_pid'] = p
.pid
190 elif target
== 'mine':
193 os
.kill(g
['stream_pid'], signal
.SIGKILL
)
194 args
= parse_arguments()
204 g
['stream_pid'] = p
.pid
205 printNicely(green('stream switched.'))
207 printNicely(red('Sorry I can\'t understand.'))
215 t
= Twitter(auth
=authen())
217 if g
['stuff'].isdigit():
219 for tweet
in reversed(t
.statuses
.home_timeline(count
=num
)):
228 t
= Twitter(auth
=authen())
229 user
= g
['stuff'].split()[0]
232 num
= int(g
['stuff'].split()[1])
235 for tweet
in reversed(t
.statuses
.user_timeline(count
=num
, screen_name
=user
[1:])):
239 printNicely(red('A name should begin with a \'@\''))
246 t
= Twitter(auth
=authen())
247 t
.statuses
.update(status
=g
['stuff'])
255 t
= Twitter(auth
=authen())
257 id = int(g
['stuff'].split()[0])
258 tid
= db
.rainbow_query(id)[0].tweet_id
259 t
.statuses
.retweet(id=tid
, include_entities
=False, trim_user
=True)
261 printNicely(red('Sorry I can\'t retweet for you.'))
269 t
= Twitter(auth
=authen())
271 id = int(g
['stuff'].split()[0])
272 tid
= db
.rainbow_query(id)[0].tweet_id
273 user
= t
.statuses
.show(id=tid
)['user']['screen_name']
274 status
= ' '.join(g
['stuff'].split()[1:])
275 status
= '@' + user
+ ' ' + status
.decode('utf-8')
276 t
.statuses
.update(status
=status
, in_reply_to_status_id
=tid
)
278 printNicely(red('Sorry I can\'t understand.'))
286 t
= Twitter(auth
=authen())
288 id = int(g
['stuff'].split()[0])
289 tid
= db
.rainbow_query(id)[0].tweet_id
290 t
.statuses
.destroy(id=tid
)
291 printNicely(green('Okay it\'s gone.'))
293 printNicely(red('Sorry I can\'t delete this tweet for you.'))
300 t
= Twitter(auth
=authen())
302 if g
['stuff'][0] == '#':
303 rel
= t
.search
.tweets(q
=g
['stuff'])['statuses']
305 printNicely('Newest tweets:')
306 for i
in reversed(xrange(SEARCH_MAX_RECORD
)):
307 draw(t
=rel
[i
], keyword
=g
['stuff'].strip()[1:])
310 printNicely(magenta('I\'m afraid there is no result'))
312 printNicely(red('A keyword should be a hashtag (like \'#AKB48\')'))
314 printNicely(red('Sorry I can\'t understand.'))
319 List of friend (following)
321 t
= Twitter(auth
=authen())
322 g
['friends'] = t
.friends
.ids()['ids']
323 for i
in g
['friends']:
324 screen_name
= t
.users
.lookup(user_id
=i
)[0]['screen_name']
325 user
= cycle_color('@' + screen_name
)
334 t
= Twitter(auth
=authen())
335 g
['followers'] = t
.followers
.ids()['ids']
336 for i
in g
['followers']:
337 screen_name
= t
.users
.lookup(user_id
=i
)[0]['screen_name']
338 user
= cycle_color('@' + screen_name
)
348 Hi boss! I'm ready to serve you right now!
349 -------------------------------------------------------------
350 You are already see your personal stream:
351 "switch public #AKB" will switch to public stream and follow "AKB" keyword.
352 "switch mine" will switch back to your personal stream.
354 "home" will show your timeline. "home 7" will show 7 tweet.
355 "view @bob" will show your friend @bob's home.
356 "t oops" will tweet "oops" immediately.
357 "rt 12345" will retweet to tweet with id "12345".
358 "rep 12345 oops" will reply "oops" to tweet with id "12345".
359 "del 12345" will delete tweet with id "12345".
360 "s #AKB48" will search for "AKB48" and return 5 newest tweet.
361 "fr" will list out your following people.
362 "fl" will list out your followers.
363 "h" will show this help again.
364 "c" will clear the terminal.
366 -------------------------------------------------------------
367 Have fun and hang tight!
384 os
.system('rm -rf rainbow.db')
385 os
.kill(g
['stream_pid'], signal
.SIGKILL
)
394 printNicely(green('Need tips ? Type "h" and hit Enter key!'))
425 Listen to user's input
427 init_interactive_shell(cmdset
)
430 if g
['prefix'] and not first
:
431 line
= raw_input(g
['decorated_name'])
435 cmd
= line
.split()[0]
438 # Save cmd to global variable and call process
439 g
['stuff'] = ' '.join(line
.split()[1:])
444 def stream(domain
, args
, name
='Rainbow Stream'):
451 PUBLIC_DOMAIN
: args
.track_keywords
,
452 SITE_DOMAIN
: 'Site Stream',
454 ascii_art(art_dict
[domain
])
455 # These arguments are optional:
457 timeout
=args
.timeout
,
458 block
=not args
.no_block
,
459 heartbeat_timeout
=args
.heartbeat_timeout
)
463 if args
.track_keywords
:
464 query_args
['track'] = args
.track_keywords
467 stream
= TwitterStream(
472 if domain
== USER_DOMAIN
:
473 tweet_iter
= stream
.user(**query_args
)
474 elif domain
== SITE_DOMAIN
:
475 tweet_iter
= stream
.site(**query_args
)
477 if args
.track_keywords
:
478 tweet_iter
= stream
.statuses
.filter(**query_args
)
480 tweet_iter
= stream
.statuses
.sample()
482 # Iterate over the stream.
483 for tweet
in tweet_iter
:
485 printNicely("-- None --")
486 elif tweet
is Timeout
:
487 printNicely("-- Timeout --")
488 elif tweet
is HeartbeatTimeout
:
489 printNicely("-- Heartbeat Timeout --")
490 elif tweet
is Hangup
:
491 printNicely("-- Hangup --")
492 elif tweet
.get('text'):
500 # Spawn stream process
501 args
= parse_arguments()
503 p
= Process(target
=stream
, args
=(USER_DOMAIN
, args
, g
['original_name']))
506 # Start listen process
509 g
['stream_pid'] = p
.pid