Check latest and notify to upgrade
"""
try:
- current = pkg_resources.get_distribution("rainbowstream").version
+ current = pkg_resources.get_distribution("rainbowstream").version
url = 'https://raw.githubusercontent.com/DTVD/rainbowstream/master/setup.py'
- readme = requests.get(url).content
+ readme = requests.get(url).text
latest = readme.split("version = \'")[1].split("\'")[0]
if current != latest:
- notice = light_magenta('RainbowStream latest version is ')
+ notice = light_magenta('RainbowStream latest version is ')
notice += light_green(latest)
notice += light_magenta(' while your current version is ')
notice += light_yellow(current) + '\n'
- notice += light_magenta('You should upgrade with "pip install -U rainbowstream".')
+ notice += light_magenta('You should upgrade with ')
+ notice += light_green('pip install -U rainbowstream')
printNicely(notice)
except:
pass
c['original_name'] = g['original_name'] = screen_name[1:]
g['full_name'] = name
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])('[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ', rl=True)
# Theme init
files = os.listdir(os.path.dirname(__file__) + '/colorset')
themes = [f.split('.')[0] for f in files if f.split('.')[-1] == 'json']
g['themes'] = themes
g['pause'] = False
g['message_threads'] = {}
- # Events
- g['events'] = []
# Startup cmd
g['cmd'] = ''
# Debug option default = True
g['debug'] = True
g['traceback'] = []
- # Retweet of mine events
+ # Events
c['events'] = []
# Semaphore init
c['lock'] = False
"""
Show notifications
"""
- g['events'] = g['events'] + c['events']
- if g['events']:
- for e in g['events']:
+ if c['events']:
+ for e in c['events']:
print_event(e)
printNicely('')
else:
num = int(g['stuff'].split()[1])
except:
num = c['HOME_TWEET_NUM']
- for tweet in reversed(t.statuses.user_timeline(count=num, screen_name=user[1:])):
+ for tweet in reversed(
+ t.statuses.user_timeline(count=num, screen_name=user[1:])):
draw(t=tweet)
printNicely('')
else:
if not formater:
return
# Get comment
- prefix = light_magenta('Compose your ') + light_green('#comment: ')
+ prefix = light_magenta('Compose your ', rl=True) + \
+ light_green('#comment: ', rl=True)
comment = raw_input(prefix)
if comment:
quote = comment.join(formater.split('#comment'))
def get_slug():
"""
- Get Slug Decorator
+ Get slug
"""
# Get list name
- list_name = raw_input(light_magenta('Give me the list\'s name: '))
+ list_name = raw_input(
+ light_magenta('Give me the list\'s name ("@owner/list_name"): ', rl=True))
# Get list name and owner
try:
owner, slug = list_name.split('/')
"""
owner, slug = get_slug()
# Add
- user_name = raw_input(light_magenta('Give me name of the newbie: '))
+ user_name = raw_input(
+ light_magenta(
+ 'Give me name of the newbie: ',
+ rl=True))
if user_name.startswith('@'):
user_name = user_name[1:]
try:
"""
owner, slug = get_slug()
# Remove
- user_name = raw_input(light_magenta('Give me name of the unlucky one: '))
+ user_name = raw_input(
+ light_magenta(
+ 'Give me name of the unlucky one: ',
+ rl=True))
if user_name.startswith('@'):
user_name = user_name[1:]
try:
"""
Create a new list
"""
- name = raw_input(light_magenta('New list\'s name: '))
- mode = raw_input(light_magenta('New list\'s mode (public/private): '))
- description = raw_input(light_magenta('New list\'s description: '))
+ name = raw_input(light_magenta('New list\'s name: ', rl=True))
+ mode = raw_input(
+ light_magenta(
+ 'New list\'s mode (public/private): ',
+ rl=True))
+ description = raw_input(
+ light_magenta(
+ 'New list\'s description: ',
+ rl=True))
try:
t.lists.create(
name=name,
"""
Update a list
"""
- slug = raw_input(light_magenta('Your list that you want to update: '))
- name = raw_input(light_magenta('Update name (leave blank to unchange): '))
- mode = raw_input(light_magenta('Update mode (public/private): '))
- description = raw_input(light_magenta('Update description: '))
+ slug = raw_input(
+ light_magenta(
+ 'Your list that you want to update: ',
+ rl=True))
+ name = raw_input(
+ light_magenta(
+ 'Update name (leave blank to unchange): ',
+ rl=True))
+ mode = raw_input(light_magenta('Update mode (public/private): ', rl=True))
+ description = raw_input(light_magenta('Update description: ', rl=True))
try:
if name:
t.lists.update(
"""
Delete a list
"""
- slug = raw_input(light_magenta('Your list that you want to delete: '))
+ slug = raw_input(
+ light_magenta(
+ 'Your list that you want to delete: ',
+ rl=True))
try:
t.lists.destroy(
slug='-'.join(slug.split()),
g['original_name']))
th.daemon = True
th.start()
+ # Stream base on list
+ elif target == 'list':
+ owner, slug = get_slug()
+ # Force python 2 not redraw readline buffer
+ g['cmd'] = '/'.join([owner, slug])
+ printNicely(light_yellow('getting list members ...'))
+ # Get members
+ t = Twitter(auth=authen())
+ members = []
+ next_cursor = -1
+ while next_cursor != 0:
+ m = t.lists.members(
+ slug=slug,
+ owner_screen_name=owner,
+ cursor=next_cursor,
+ include_entities=False)
+ for u in m['users']:
+ members.append('@' + u['screen_name'])
+ next_cursor = m['next_cursor']
+ printNicely(light_yellow('... done.'))
+ # Build thread filter array
+ args.filter = members
+ # Kill old thread
+ g['stream_stop'] = True
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ slug))
+ th.daemon = True
+ th.start()
printNicely('')
if args.filter:
- printNicely(cyan('Only: ' + str(args.filter)))
+ printNicely(cyan('Include: ' + str(len(args.filter)) + ' people.'))
if args.ignore:
- printNicely(red('Ignore: ' + str(args.ignore)))
+ printNicely(red('Ignore: ' + str(len(args.ignore)) + ' people.'))
printNicely('')
- except:
+ except Exception:
+ debug_option()
printNicely(red('Sorry I can\'t understand.'))
' filter will decide nicks will be EXCLUDE.\n'
usage += s * 2 + light_green('switch mine -d') + \
' will use the config\'s ONLY_LIST and IGNORE_LIST.\n'
+ usage += s * 2 + light_green('switch list') + \
+ ' will switch to a Twitter list\'s stream. You will be asked for list name\n'
printNicely(usage)
d = dict(zip(
cmdset,
[
- ['public', 'mine'], # switch
+ ['public', 'mine', 'list'], # switch
[], # trend
[], # home
[], # notification
time.sleep(0.5)
print_message(tweet['direct_message'])
elif tweet.get('event'):
- g['events'].append(tweet)
+ c['events'].append(tweet)
print_event(tweet)
except TwitterHTTPError:
printNicely('')