-"""
-Colorful user's timeline stream
-"""
import os
import os.path
import sys
# Lock for streams
StreamLock = threading.Lock()
-# Commands
-cmdset = [
- 'switch',
- 'trend',
- 'home',
- 'view',
- 'mentions',
- 't',
- 'rt',
- 'quote',
- 'allrt',
- 'fav',
- 'rep',
- 'del',
- 'ufav',
- 's',
- 'mes',
- 'show',
- 'open',
- 'ls',
- 'inbox',
- 'sent',
- 'trash',
- 'whois',
- 'fl',
- 'ufl',
- 'mute',
- 'unmute',
- 'muting',
- 'block',
- 'unblock',
- 'report',
- 'list',
- 'cal',
- 'config',
- 'theme',
- 'h',
- 'p',
- 'r',
- 'c',
- 'q'
-]
-
def parse_arguments():
"""
'-to',
'--timeout',
help='Timeout for the stream (seconds).')
- parser.add_argument(
- '-ht',
- '--heartbeat-timeout',
- help='Set heartbeat timeout.',
- default=90)
- parser.add_argument(
- '-nb',
- '--no-block',
- action='store_true',
- help='Set stream to non-blocking.')
parser.add_argument(
'-tt',
'--track-keywords',
CONSUMER_SECRET)
+def build_mute_dict(dict_data=False):
+ """
+ Build muting list
+ """
+ t = Twitter(auth=authen())
+ # Init cursor
+ next_cursor = -1
+ screen_name_list = []
+ name_list = []
+ # Cursor loop
+ while next_cursor != 0:
+ list = t.mutes.users.list(
+ screen_name=g['original_name'],
+ cursor=next_cursor,
+ skip_status=True,
+ include_entities=False,
+ )
+ screen_name_list += ['@' + u['screen_name'] for u in list['users']]
+ name_list += [u['name'] for u in list['users']]
+ next_cursor = list['next_cursor']
+ # Return dict or list
+ if dict_data:
+ return dict(zip(screen_name_list, name_list))
+ else:
+ return screen_name_list
+
+
def init(args):
"""
Init function
signal.signal(signal.SIGINT, ctrl_c_handler)
# Get name
t = Twitter(auth=authen())
- name = '@' + t.account.verify_credentials()['screen_name']
+ credential = t.account.verify_credentials()
+ screen_name = '@' + credential['screen_name']
+ name = credential['name']
if not get_config('PREFIX'):
- set_config('PREFIX', name)
- g['original_name'] = name[1:]
+ set_config('PREFIX', screen_name)
+ g['PREFIX'] = u2str(c['PREFIX'])
+ c['original_name'] = g['original_name'] = screen_name[1:]
+ g['full_name'] = name
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])(
- '[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ')
# Theme init
files = os.listdir(os.path.dirname(__file__) + '/colorset')
themes = [f.split('.')[0] for f in files if f.split('.')[-1] == 'json']
g['themes'] = themes
+ g['pause'] = False
+ g['message_threads'] = {}
+ # Events
+ g['events'] = []
+ # Startup cmd
+ g['cmd'] = ''
+ # Retweet of mine events
+ c['events'] = []
# Semaphore init
c['lock'] = False
- c['pause'] = False
# Init tweet dict and message dict
c['tweet_dict'] = []
c['message_dict'] = []
# Image on term
c['IMAGE_ON_TERM'] = args.image_on_term
-
-
-def switch():
- """
- Switch stream
- """
- try:
- target = g['stuff'].split()[0]
- # Filter and ignore
- args = parse_arguments()
- try:
- if g['stuff'].split()[-1] == '-f':
- guide = 'To ignore an option, just hit Enter key.'
- printNicely(light_magenta(guide))
- only = raw_input('Only nicks [Ex: @xxx,@yy]: ')
- ignore = raw_input('Ignore nicks [Ex: @xxx,@yy]: ')
- args.filter = filter(None, only.split(','))
- args.ignore = filter(None, ignore.split(','))
- elif g['stuff'].split()[-1] == '-d':
- args.filter = c['ONLY_LIST']
- args.ignore = c['IGNORE_LIST']
- except:
- printNicely(red('Sorry, wrong format.'))
- return
- # Public stream
- if target == 'public':
- keyword = g['stuff'].split()[1]
- if keyword[0] == '#':
- keyword = keyword[1:]
- # Kill old thread
- g['stream_stop'] = True
- args.track_keywords = keyword
- # Start new thread
- th = threading.Thread(target=stream, args=(c['PUBLIC_DOMAIN'], args))
- th.daemon = True
- th.start()
- # Personal stream
- elif target == 'mine':
- # Kill old thread
- g['stream_stop'] = True
- # Start new thread
- th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
- th.daemon = True
- th.start()
- printNicely('')
- if args.filter:
- printNicely(cyan('Only: ' + str(args.filter)))
- if args.ignore:
- printNicely(red('Ignore: ' + str(args.ignore)))
- printNicely('')
- except:
- printNicely(red('Sorry I can\'t understand.'))
+ set_config('IMAGE_ON_TERM', str(c['IMAGE_ON_TERM']))
+ # Mute dict
+ c['IGNORE_LIST'] += build_mute_dict()
def trend():
printNicely('')
+def notification():
+ """
+ Show notifications
+ """
+ g['events'] = g['events'] + c['events']
+ if g['events']:
+ for e in g['events']:
+ print_event(e)
+ printNicely('')
+ else:
+ printNicely(magenta('Nothing at this time.'))
+
+
+def mentions():
+ """
+ Mentions timeline
+ """
+ t = Twitter(auth=authen())
+ num = c['HOME_TWEET_NUM']
+ if g['stuff'].isdigit():
+ num = int(g['stuff'])
+ for tweet in reversed(t.statuses.mentions_timeline(count=num)):
+ draw(t=tweet)
+ printNicely('')
+
+
+def whois():
+ """
+ Show profile of a specific user
+ """
+ t = Twitter(auth=authen())
+ screen_name = g['stuff'].split()[0]
+ if screen_name.startswith('@'):
+ try:
+ user = t.users.show(
+ screen_name=screen_name[1:],
+ include_entities=False)
+ show_profile(user)
+ except:
+ printNicely(red('Omg no user.'))
+ else:
+ printNicely(red('A name should begin with a \'@\''))
+
+
def view():
"""
Friend view
printNicely(red('A name should begin with a \'@\''))
-def mentions():
+def search():
"""
- Mentions timeline
+ Search
"""
t = Twitter(auth=authen())
- num = c['HOME_TWEET_NUM']
- if g['stuff'].isdigit():
- num = int(g['stuff'])
- for tweet in reversed(t.statuses.mentions_timeline(count=num)):
- draw(t=tweet)
- printNicely('')
+ # Setup query
+ query = g['stuff'].strip()
+ type = c['SEARCH_TYPE']
+ if type not in ['mixed', 'recent', 'popular']:
+ type = 'mixed'
+ max_record = c['SEARCH_MAX_RECORD']
+ count = min(max_record, 100)
+ # Perform search
+ rel = t.search.tweets(
+ q=query,
+ type=type,
+ count=count
+ )['statuses']
+ # Return results
+ if rel:
+ printNicely('Newest tweets:')
+ for i in reversed(xrange(count)):
+ draw(t=rel[i], keyword=query)
+ printNicely('')
+ else:
+ printNicely(magenta('I\'m afraid there is no result'))
def tweet():
"""
Quote a tweet
"""
+ # Get tweet
t = Twitter(auth=authen())
try:
id = int(g['stuff'].split()[0])
return
tid = c['tweet_dict'][id]
tweet = t.statuses.show(id=tid)
- screen_name = tweet['user']['screen_name']
- text = tweet['text']
- quote = '\"@' + screen_name + ': ' + text + '\"'
- quote = quote.encode('utf8')
- notice = light_magenta('Compose mode ')
- notice += light_yellow('(Enter nothing will cancel the quote)')
- notice += light_magenta(':')
- printNicely(notice)
- extra = raw_input(quote)
- if extra:
- t.statuses.update(status=quote + extra)
+ # Get formater
+ formater = format_quote(tweet)
+ if not formater:
+ return
+ # Get comment
+ prefix = light_magenta('Compose your ') + light_green('#comment: ')
+ comment = raw_input(prefix)
+ if comment:
+ quote = comment.join(formater.split('#comment'))
+ t.statuses.update(status=quote)
else:
printNicely(light_magenta('No text added.'))
printNicely('')
-def favorite():
+def conversation():
"""
- Favorite
+ Conversation view
"""
t = Twitter(auth=authen())
try:
printNicely(red('Sorry I can\'t understand.'))
return
tid = c['tweet_dict'][id]
- t.favorites.create(_id=tid, include_entities=False)
- printNicely(green('Favorited.'))
- draw(t.statuses.show(id=tid))
+ tweet = t.statuses.show(id=tid)
+ limit = c['CONVERSATION_MAX']
+ thread_ref = []
+ thread_ref.append(tweet)
+ prev_tid = tweet['in_reply_to_status_id']
+ while prev_tid and limit:
+ limit -= 1
+ tweet = t.statuses.show(id=prev_tid)
+ prev_tid = tweet['in_reply_to_status_id']
+ thread_ref.append(tweet)
+
+ for tweet in reversed(thread_ref):
+ draw(t=tweet)
printNicely('')
tid = c['tweet_dict'][id]
user = t.statuses.show(id=tid)['user']['screen_name']
status = ' '.join(g['stuff'].split()[1:])
- status = '@' + user + ' ' + status.decode('utf-8')
+ status = '@' + user + ' ' + str2u(status)
t.statuses.update(status=status, in_reply_to_status_id=tid)
-def delete():
+def favorite():
"""
- Delete
+ Favorite
"""
t = Twitter(auth=authen())
try:
printNicely(red('Sorry I can\'t understand.'))
return
tid = c['tweet_dict'][id]
- t.statuses.destroy(id=tid)
- printNicely(green('Okay it\'s gone.'))
+ t.favorites.create(_id=tid, include_entities=False)
+ printNicely(green('Favorited.'))
+ draw(t.statuses.show(id=tid))
+ printNicely('')
def unfavorite():
printNicely('')
-def search():
- """
- Search
- """
- t = Twitter(auth=authen())
- g['stuff'] = g['stuff'].strip()
- rel = t.search.tweets(q=g['stuff'])['statuses']
- if rel:
- printNicely('Newest tweets:')
- for i in reversed(xrange(c['SEARCH_MAX_RECORD'])):
- draw(t=rel[i],
- keyword=g['stuff'])
- printNicely('')
- else:
- printNicely(magenta('I\'m afraid there is no result'))
-
-
-def message():
+def delete():
"""
- Send a direct message
+ Delete
"""
t = Twitter(auth=authen())
- user = g['stuff'].split()[0]
- if user[0].startswith('@'):
- try:
- content = g['stuff'].split()[1]
- except:
- printNicely(red('Sorry I can\'t understand.'))
- t.direct_messages.new(
- screen_name=user[1:],
- text=content
- )
- printNicely(green('Message sent.'))
- else:
- printNicely(red('A name should begin with a \'@\''))
+ try:
+ id = int(g['stuff'].split()[0])
+ except:
+ printNicely(red('Sorry I can\'t understand.'))
+ return
+ tid = c['tweet_dict'][id]
+ t.statuses.destroy(id=tid)
+ printNicely(green('Okay it\'s gone.'))
def show():
return
tid = c['tweet_dict'][int(g['stuff'])]
tweet = t.statuses.show(id=tid)
- link_ary = [
- u for u in tweet['text'].split() if u.startswith('http://')]
+ link_prefix = ('http://', 'https://')
+ link_ary = [u for u in tweet['text'].split()
+ if u.startswith(link_prefix)]
if not link_ary:
printNicely(light_magenta('No url here @.@!'))
return
printNicely(red('Sorry I can\'t open url in this tweet.'))
-def ls():
- """
- List friends for followers
- """
- t = Twitter(auth=authen())
- # Get name
- try:
- name = g['stuff'].split()[1]
- if name.startswith('@'):
- name = name[1:]
- else:
- printNicely(red('A name should begin with a \'@\''))
- raise Exception('Invalid name')
- except:
- name = g['original_name']
- # Get list followers or friends
- try:
- target = g['stuff'].split()[0]
- except:
- printNicely(red('Omg some syntax is wrong.'))
- # Init cursor
- d = {'fl': 'followers', 'fr': 'friends'}
- next_cursor = -1
- rel = {}
- # Cursor loop
- while next_cursor != 0:
- list = getattr(t, d[target]).list(
- screen_name=name,
- cursor=next_cursor,
- skip_status=True,
- include_entities=False,
- )
- for u in list['users']:
- rel[u['name']] = '@' + u['screen_name']
- next_cursor = list['next_cursor']
- # Print out result
- printNicely('All: ' + str(len(rel)) + ' ' + d[target] + '.')
- for name in rel:
- user = ' ' + cycle_color(name)
- user += color_func(c['TWEET']['nick'])(' ' + rel[name] + ' ')
- printNicely(user)
-
-
def inbox():
"""
- Inbox direct messages
+ Inbox threads
"""
t = Twitter(auth=authen())
num = c['MESSAGES_DISPLAY']
- rel = []
if g['stuff'].isdigit():
num = g['stuff']
+ # Get inbox messages
cur_page = 1
- # Max message per page is 20 so we have to loop
+ inbox = []
while num > 20:
- rel = rel + t.direct_messages(
+ inbox = inbox + t.direct_messages(
count=20,
page=cur_page,
include_entities=False,
)
num -= 20
cur_page += 1
- rel = rel + t.direct_messages(
+ inbox = inbox + t.direct_messages(
count=num,
page=cur_page,
include_entities=False,
skip_status=False
)
- # Display
- printNicely('Inbox: newest ' + str(len(rel)) + ' messages.')
- for m in reversed(rel):
- print_message(m)
- printNicely('')
-
-
-def sent():
- """
- Sent direct messages
- """
- t = Twitter(auth=authen())
+ # Get sent messages
num = c['MESSAGES_DISPLAY']
- rel = []
if g['stuff'].isdigit():
- num = int(g['stuff'])
+ num = g['stuff']
cur_page = 1
- # Max message per page is 20 so we have to loop
+ sent = []
while num > 20:
- rel = rel + t.direct_messages.sent(
+ sent = sent + t.direct_messages.sent(
count=20,
page=cur_page,
include_entities=False,
)
num -= 20
cur_page += 1
- rel = rel + t.direct_messages.sent(
+ sent = sent + t.direct_messages.sent(
count=num,
page=cur_page,
include_entities=False,
skip_status=False
)
- # Display
- printNicely('Sent: newest ' + str(len(rel)) + ' messages.')
- for m in reversed(rel):
- print_message(m)
- printNicely('')
+
+ d = {}
+ uniq_inbox = list(set(
+ [(m['sender_screen_name'], m['sender']['name']) for m in inbox]
+ ))
+ uniq_sent = list(set(
+ [(m['recipient_screen_name'], m['recipient']['name']) for m in sent]
+ ))
+ for partner in uniq_inbox:
+ inbox_ary = [m for m in inbox if m['sender_screen_name'] == partner[0]]
+ sent_ary = [
+ m for m in sent if m['recipient_screen_name'] == partner[0]]
+ d[partner] = inbox_ary + sent_ary
+ for partner in uniq_sent:
+ if partner not in d:
+ d[partner] = [
+ m for m in sent if m['recipient_screen_name'] == partner[0]]
+ g['message_threads'] = print_threads(d)
+
+
+def thread():
+ """
+ View a thread of message
+ """
+ try:
+ thread_id = int(g['stuff'])
+ print_thread(
+ g['message_threads'][thread_id],
+ g['original_name'],
+ g['full_name'])
+ except Exception:
+ printNicely(red('No such thread.'))
+
+
+def message():
+ """
+ Send a direct message
+ """
+ t = Twitter(auth=authen())
+ try:
+ user = g['stuff'].split()[0]
+ if user[0].startswith('@'):
+ content = ' '.join(g['stuff'].split()[1:])
+ t.direct_messages.new(
+ screen_name=user[1:],
+ text=content
+ )
+ printNicely(green('Message sent.'))
+ else:
+ printNicely(red('A name should begin with a \'@\''))
+ except:
+ printNicely(red('Sorry I can\'t understand.'))
def trash():
printNicely(green('Message deleted.'))
-def whois():
+def ls():
"""
- Show profile of a specific user
+ List friends for followers
"""
t = Twitter(auth=authen())
- screen_name = g['stuff'].split()[0]
- if screen_name.startswith('@'):
- try:
- user = t.users.show(
- screen_name=screen_name[1:],
- include_entities=False)
- show_profile(user)
- except:
- printNicely(red('Omg no user.'))
- else:
- printNicely(red('A name should begin with a \'@\''))
+ # Get name
+ try:
+ name = g['stuff'].split()[1]
+ if name.startswith('@'):
+ name = name[1:]
+ else:
+ printNicely(red('A name should begin with a \'@\''))
+ raise Exception('Invalid name')
+ except:
+ name = g['original_name']
+ # Get list followers or friends
+ try:
+ target = g['stuff'].split()[0]
+ except:
+ printNicely(red('Omg some syntax is wrong.'))
+ # Init cursor
+ d = {'fl': 'followers', 'fr': 'friends'}
+ next_cursor = -1
+ rel = {}
+ # Cursor loop
+ while next_cursor != 0:
+ list = getattr(t, d[target]).list(
+ screen_name=name,
+ cursor=next_cursor,
+ skip_status=True,
+ include_entities=False,
+ )
+ for u in list['users']:
+ rel[u['name']] = '@' + u['screen_name']
+ next_cursor = list['next_cursor']
+ # Print out result
+ printNicely('All: ' + str(len(rel)) + ' ' + d[target] + '.')
+ for name in rel:
+ user = ' ' + cycle_color(name)
+ user += color_func(c['TWEET']['nick'])(' ' + rel[name] + ' ')
+ printNicely(user)
def follow():
printNicely(red('A name should be specified. '))
return
if screen_name.startswith('@'):
- rel = t.mutes.users.create(screen_name=screen_name[1:])
- if isinstance(rel, dict):
- printNicely(green(screen_name + ' is muted.'))
- else:
- printNicely(red(rel))
+ try:
+ rel = t.mutes.users.create(screen_name=screen_name[1:])
+ if isinstance(rel, dict):
+ printNicely(green(screen_name + ' is muted.'))
+ c['IGNORE_LIST'] += [unc(screen_name)]
+ c['IGNORE_LIST'] = list(set(c['IGNORE_LIST']))
+ else:
+ printNicely(red(rel))
+ except:
+ printNicely(red('Something is wrong, can not mute now :('))
else:
printNicely(red('A name should begin with a \'@\''))
printNicely(red('A name should be specified. '))
return
if screen_name.startswith('@'):
- rel = t.mutes.users.destroy(screen_name=screen_name[1:])
- if isinstance(rel, dict):
- printNicely(green(screen_name + ' is unmuted.'))
- else:
- printNicely(red(rel))
+ try:
+ rel = t.mutes.users.destroy(screen_name=screen_name[1:])
+ if isinstance(rel, dict):
+ printNicely(green(screen_name + ' is unmuted.'))
+ c['IGNORE_LIST'].remove(screen_name)
+ else:
+ printNicely(red(rel))
+ except:
+ printNicely(red('Maybe you are not muting this person ?'))
else:
printNicely(red('A name should begin with a \'@\''))
"""
List muting user
"""
- t = Twitter(auth=authen())
- # Init cursor
- next_cursor = -1
- rel = {}
- # Cursor loop
- while next_cursor != 0:
- list = t.mutes.users.list(
- screen_name=g['original_name'],
- cursor=next_cursor,
- skip_status=True,
- include_entities=False,
- )
- for u in list['users']:
- rel[u['name']] = '@' + u['screen_name']
- next_cursor = list['next_cursor']
- # Print out result
- printNicely('All: ' + str(len(rel)) + ' people.')
- for name in rel:
- user = ' ' + cycle_color(name)
- user += color_func(c['TWEET']['nick'])(' ' + rel[name] + ' ')
+ # Get dict of muting users
+ md = build_mute_dict(dict_data=True)
+ printNicely('All: ' + str(len(md)) + ' people.')
+ for name in md:
+ user = ' ' + cycle_color(md[name])
+ user += color_func(c['TWEET']['nick'])(' ' + name + ' ')
printNicely(user)
+ # Update from Twitter
+ c['IGNORE_LIST'] = [n for n in md]
def block():
"""
Delete a list
"""
- slug = raw_input(light_magenta('Your list that you want to update: '))
+ slug = raw_input(light_magenta('Your list that you want to delete: '))
try:
t.lists.destroy(
slug='-'.join(slug.split()),
printNicely(red('Oops something is wrong with Twitter :('))
-def list():
+def twitterlist():
"""
Twitter's list
"""
printNicely(red('Please try again.'))
+def switch():
+ """
+ Switch stream
+ """
+ try:
+ target = g['stuff'].split()[0]
+ # Filter and ignore
+ args = parse_arguments()
+ try:
+ if g['stuff'].split()[-1] == '-f':
+ guide = 'To ignore an option, just hit Enter key.'
+ printNicely(light_magenta(guide))
+ only = raw_input('Only nicks [Ex: @xxx,@yy]: ')
+ ignore = raw_input('Ignore nicks [Ex: @xxx,@yy]: ')
+ args.filter = filter(None, only.split(','))
+ args.ignore = filter(None, ignore.split(','))
+ elif g['stuff'].split()[-1] == '-d':
+ args.filter = c['ONLY_LIST']
+ args.ignore = c['IGNORE_LIST']
+ except:
+ printNicely(red('Sorry, wrong format.'))
+ return
+ # Public stream
+ if target == 'public':
+ keyword = g['stuff'].split()[1]
+ if keyword[0] == '#':
+ keyword = keyword[1:]
+ # Kill old thread
+ g['stream_stop'] = True
+ args.track_keywords = keyword
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['PUBLIC_DOMAIN'],
+ args))
+ th.daemon = True
+ th.start()
+ # Personal stream
+ elif target == 'mine':
+ # Kill old thread
+ g['stream_stop'] = True
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ g['original_name']))
+ th.daemon = True
+ th.start()
+ printNicely('')
+ if args.filter:
+ printNicely(cyan('Only: ' + str(args.filter)))
+ if args.ignore:
+ printNicely(red('Ignore: ' + str(args.ignore)))
+ printNicely('')
+ except:
+ printNicely(red('Sorry I can\'t understand.'))
+
+
def cal():
"""
Unix's command `cal`
show_calendar(month, date, rel)
+def theme():
+ """
+ List and change theme
+ """
+ if not g['stuff']:
+ # List themes
+ for theme in g['themes']:
+ line = light_magenta(theme)
+ if c['THEME'] == theme:
+ line = ' ' * 2 + light_yellow('* ') + line
+ else:
+ line = ' ' * 4 + line
+ printNicely(line)
+ else:
+ # Change theme
+ try:
+ # Load new theme
+ c['THEME'] = reload_theme(g['stuff'], c['THEME'])
+ # Redefine decorated_name
+ g['decorated_name'] = lambda x: color_func(
+ c['DECORATED_NAME'])(
+ '[' + x + ']: ')
+ printNicely(green('Theme changed.'))
+ except:
+ printNicely(red('No such theme exists.'))
+
+
def config():
"""
Browse and change config
value = get_default_config(key)
line = ' ' * 2 + green(key) + ': ' + light_magenta(value)
printNicely(line)
- except:
- printNicely(
- light_magenta('This config key does not exist in default.'))
+ except Exception as e:
+ printNicely(red(e))
# Delete specific config key in config file
elif len(g['stuff'].split()) == 2 and g['stuff'].split()[-1] == 'drop':
key = g['stuff'].split()[0]
try:
delete_config(key)
printNicely(green('Config key is dropped.'))
- except:
- printNicely(red('No such config key.'))
+ except Exception as e:
+ printNicely(red(e))
# Set specific config
elif len(g['stuff'].split()) == 3 and g['stuff'].split()[1] == '=':
key = g['stuff'].split()[0]
set_config(key, value)
# Apply theme immediately
if key == 'THEME':
- c['THEME'] = reload_theme(value,c['THEME'])
+ c['THEME'] = reload_theme(value, c['THEME'])
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])(
- '[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ')
+ reload_config()
printNicely(green('Updated successfully.'))
- except:
- printNicely(light_magenta('Not valid value.'))
- return
- reload_config()
+ except Exception as e:
+ printNicely(red(e))
else:
printNicely(light_magenta('Sorry I can\'s understand.'))
-def theme():
- """
- List and change theme
- """
- if not g['stuff']:
- # List themes
- for theme in g['themes']:
- line = light_magenta(theme)
- if c['THEME'] == theme:
- line = ' ' * 2 + light_yellow('* ') + line
- else:
- line = ' ' * 4 + line
- printNicely(line)
- else:
- # Change theme
- try:
- # Load new theme
- c['THEME'] = reload_theme(g['stuff'],c['THEME'])
- # Redefine decorated_name
- g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])(
- '[' + x + ']: ')
- printNicely(green('Theme changed.'))
- except:
- printNicely(red('No such theme exists.'))
-
-
def help_discover():
"""
Discover the world
light_green('trend JP Tokyo') + '.\n'
usage += s * 2 + light_green('home') + ' will show your timeline. ' + \
light_green('home 7') + ' will show 7 tweets.\n'
+ usage += s * 2 + \
+ light_green('notification') + ' will show your recent notification.\n'
usage += s * 2 + light_green('mentions') + ' will show mentions timeline. ' + \
light_green('mentions 7') + ' will show 7 mention tweets.\n'
usage += s * 2 + light_green('whois @mdo') + ' will show profile of ' + \
usage += s * 2 + \
light_green('allrt 12 20 ') + ' will list 20 newest retweet of the tweet with ' + \
light_yellow('[id=12]') + '.\n'
+ usage += s * 2 + light_green('conversation 12') + ' will show the chain of ' + \
+ 'replies prior to the tweet with ' + light_yellow('[id=12]') + '.\n'
usage += s * 2 + light_green('rep 12 oops') + ' will reply "' + \
light_yellow('oops') + '" to tweet with ' + \
light_yellow('[id=12]') + '.\n'
usage += s + grey(u'\u266A' + ' Direct messages \n')
usage += s * 2 + light_green('inbox') + ' will show inbox messages. ' + \
light_green('inbox 7') + ' will show newest 7 messages.\n'
- usage += s * 2 + light_green('sent') + ' will show sent messages. ' + \
- light_green('sent 7') + ' will show newest 7 messages.\n'
+ usage += s * 2 + light_green('thread 2') + ' will show full thread with ' + \
+ light_yellow('[thread_id=2]') + '.\n'
usage += s * 2 + light_green('mes @dtvd88 hi') + ' will send a "hi" messege to ' + \
magenta('@dtvd88') + '.\n'
usage += s * 2 + light_green('trash 5') + ' will remove message with ' + \
light_yellow('already') + ' on your personal stream.\n'
usage += s + 'Any update from Twitter will show up ' + \
light_yellow('immediately') + '.\n'
- usage += s + 'In addtion, following commands are available right now:\n'
+ usage += s + 'In addition, following commands are available right now:\n'
# Twitter help section
usage += '\n'
usage += s + grey(u'\u266A' + ' Twitter help\n')
'stream': help_stream,
}
if g['stuff']:
- d[g['stuff'].strip()]()
+ d.get(
+ g['stuff'].strip(),
+ lambda: printNicely(red('No such command.'))
+ )()
else:
printNicely(usage)
"""
Pause stream display
"""
- c['pause'] = True
+ g['pause'] = True
printNicely(green('Stream is paused'))
"""
Replay stream
"""
- c['pause'] = False
+ g['pause'] = False
printNicely(green('Stream is running back now'))
Reset prefix of line
"""
if g['reset']:
+ if c.get('USER_JSON_ERROR'):
+ printNicely(red('Your ~/.rainbow_config.json is messed up:'))
+ printNicely(red('>>> ' + c['USER_JSON_ERROR']))
+ printNicely('')
printNicely(magenta('Need tips ? Type "h" and hit Enter key!'))
g['reset'] = False
try:
pass
+# Command set
+cmdset = [
+ 'switch',
+ 'trend',
+ 'home',
+ 'notification',
+ 'view',
+ 'mentions',
+ 't',
+ 'rt',
+ 'quote',
+ 'allrt',
+ 'conversation',
+ 'fav',
+ 'rep',
+ 'del',
+ 'ufav',
+ 's',
+ 'mes',
+ 'show',
+ 'open',
+ 'ls',
+ 'inbox',
+ 'thread',
+ 'trash',
+ 'whois',
+ 'fl',
+ 'ufl',
+ 'mute',
+ 'unmute',
+ 'muting',
+ 'block',
+ 'unblock',
+ 'report',
+ 'list',
+ 'cal',
+ 'config',
+ 'theme',
+ 'h',
+ 'p',
+ 'r',
+ 'c',
+ 'q'
+]
+
+# Handle function set
+funcset = [
+ switch,
+ trend,
+ home,
+ notification,
+ view,
+ mentions,
+ tweet,
+ retweet,
+ quote,
+ allretweet,
+ conversation,
+ favorite,
+ reply,
+ delete,
+ unfavorite,
+ search,
+ message,
+ show,
+ urlopen,
+ ls,
+ inbox,
+ thread,
+ trash,
+ whois,
+ follow,
+ unfollow,
+ mute,
+ unmute,
+ muting,
+ block,
+ unblock,
+ report,
+ twitterlist,
+ cal,
+ config,
+ theme,
+ help,
+ pause,
+ replay,
+ clear,
+ quit
+]
+
+
def process(cmd):
"""
Process switch
"""
- return dict(zip(
- cmdset,
- [
- switch,
- trend,
- home,
- view,
- mentions,
- tweet,
- retweet,
- quote,
- allretweet,
- favorite,
- reply,
- delete,
- unfavorite,
- search,
- message,
- show,
- urlopen,
- ls,
- inbox,
- sent,
- trash,
- whois,
- follow,
- unfollow,
- mute,
- unmute,
- muting,
- block,
- unblock,
- report,
- list,
- cal,
- config,
- theme,
- help,
- pause,
- replay,
- clear,
- quit
- ]
- )).get(cmd, reset)
+ return dict(zip(cmdset, funcset)).get(cmd, reset)
def listen():
['public', 'mine'], # switch
[], # trend
[], # home
+ [], # notification
['@'], # view
[], # mentions
[], # tweet
[], # retweet
[], # quote
[], # allretweet
+ [], # conversation
[], # favorite
[], # reply
[], # delete
[''], # open url
['fl', 'fr'], # list
[], # inbox
- [], # sent
+ [i for i in g['message_threads']], # sent
[], # trash
['@'], # whois
['@'], # follow
read_history()
reset()
while True:
- # raw_input
- if g['prefix']:
- line = raw_input(g['decorated_name'](c['PREFIX']))
- else:
- line = raw_input()
- try:
- cmd = line.split()[0]
- except:
- cmd = ''
- g['cmd'] = cmd
try:
+ # raw_input
+ if g['prefix']:
+ line = raw_input(g['decorated_name'](g['PREFIX']))
+ else:
+ line = raw_input()
+ # Save cmd to compare with readline buffer
+ g['cmd'] = line.strip()
+ # Get short cmd to pass to handle function
+ try:
+ cmd = line.split()[0]
+ except:
+ cmd = ''
# Lock the semaphore
c['lock'] = True
# Save cmd to global variable and call process
g['prefix'] = True
# Release the semaphore lock
c['lock'] = False
+ except EOFError:
+ printNicely('')
except Exception:
printNicely(red('OMG something is wrong with Twitter right now.'))
ascii_art(art_dict[domain])
# These arguments are optional:
stream_args = dict(
- timeout=args.timeout,
- block=False,
- heartbeat_timeout=args.heartbeat_timeout)
+ timeout=0.5, # To check g['stream_stop'] after each 0.5 s
+ block=True,
+ heartbeat_timeout=c['HEARTBEAT_TIMEOUT'] * 60)
# Track keyword
query_args = dict()
if args.track_keywords:
StreamLock.acquire()
g['stream_stop'] = False
for tweet in tweet_iter:
- if(g['stream_stop']):
- StreamLock.release()
- break
if tweet is None:
- pass
+ printNicely("-- None --")
elif tweet is Timeout:
- printNicely("-- Timeout --")
+ if(g['stream_stop']):
+ StreamLock.release()
+ break
elif tweet is HeartbeatTimeout:
printNicely("-- Heartbeat Timeout --")
+ guide = light_magenta("You can use ") + \
+ light_green("switch") + \
+ light_magenta(" command to return to your stream.\n")
+ guide += light_magenta("Type ") + \
+ light_green("h stream") + \
+ light_magenta(" for more details.")
+ printNicely(guide)
+ sys.stdout.write(g['decorated_name'](g['PREFIX']))
+ sys.stdout.flush()
+ StreamLock.release()
+ break
elif tweet is Hangup:
printNicely("-- Hangup --")
elif tweet.get('text'):
+ # Check the semaphore pause and lock (stream process only)
+ if g['pause']:
+ continue
+ while c['lock']:
+ time.sleep(0.5)
+ # Draw the tweet
draw(
t=tweet,
keyword=args.track_keywords,
- check_semaphore=True,
+ humanize=False,
fil=args.filter,
ig=args.ignore,
)
+ # Current readline buffer
+ current_buffer = readline.get_line_buffer().strip()
+ # There is an unexpected behaviour in MacOSX readline + Python 2:
+ # after completely delete a word after typing it,
+ # somehow readline buffer still contains
+ # the 1st character of that word
+ if current_buffer and g['cmd'] != current_buffer:
+ sys.stdout.write(
+ g['decorated_name'](g['PREFIX']) + str2u(current_buffer))
+ sys.stdout.flush()
+ elif not c['HIDE_PROMPT']:
+ sys.stdout.write(g['decorated_name'](g['PREFIX']))
+ sys.stdout.flush()
elif tweet.get('direct_message'):
- print_message(tweet['direct_message'], check_semaphore=True)
+ # Check the semaphore pause and lock (stream process only)
+ if g['pause']:
+ continue
+ while c['lock']:
+ time.sleep(0.5)
+ print_message(tweet['direct_message'])
+ elif tweet.get('event'):
+ g['events'].append(tweet)
+ print_event(tweet)
except TwitterHTTPError:
printNicely('')
printNicely(
except TwitterHTTPError:
printNicely('')
printNicely(
- magenta("We have maximum connection problem with twitter'stream API right now :("))
+ magenta("We have connection problem with twitter'stream API right now :("))
printNicely(magenta("Let's try again later."))
save_history()
sys.exit()
# Spawn stream thread
- th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ g['original_name']))
th.daemon = True
th.start()
# Start listen process