import webbrowser
import traceback
import pkg_resources
+import socks
+import socket
+from io import BytesIO
from twitter.stream import TwitterStream, Timeout, HeartbeatTimeout, Hangup
from twitter.api import *
from twitter.oauth import OAuth, read_token_file
'--image-on-term',
action='store_true',
help='Display all image on terminal.')
+ parser.add_argument(
+ '-ph',
+ '--proxy-host',
+ help='Use HTTP/SOCKS proxy for network connections.')
+ parser.add_argument(
+ '-pp',
+ '--proxy-port',
+ default=8080,
+ help='HTTP/SOCKS proxy port (Default: 8080).')
+ parser.add_argument(
+ '-pt',
+ '--proxy-type',
+ default='SOCKS5',
+ help='Proxy type (HTTP, SOCKS4, SOCKS5; Default: SOCKS5).')
return parser.parse_args()
+def proxy_connect(args):
+ """
+ Connect to specified proxy
+ """
+ if args.proxy_host:
+ # Setup proxy by monkeypatching the standard lib
+ if args.proxy_type.lower() == "socks5" or not args.proxy_type:
+ socks.set_default_proxy(
+ socks.SOCKS5, args.proxy_host,
+ int(args.proxy_port))
+ elif args.proxy_type.lower() == "http":
+ socks.set_default_proxy(
+ socks.HTTP, args.proxy_host,
+ int(args.proxy_port))
+ elif args.proxy_type.lower() == "socks4":
+ socks.set_default_proxy(
+ socks.SOCKS4, args.proxy_host,
+ int(args.proxy_port))
+ else:
+ printNicely(
+ magenta("Sorry, wrong proxy type specified! Aborting..."))
+ sys.exit()
+ socket.socket = socks.socksocket
+
+
def authen():
"""
Authenticate with Twitter OAuth
try:
current = pkg_resources.get_distribution("rainbowstream").version
url = 'https://raw.githubusercontent.com/DTVD/rainbowstream/master/setup.py'
- readme = requests.get(url).content
+ readme = requests.get(url).text
latest = readme.split("version = \'")[1].split("\'")[0]
if current != latest:
notice = light_magenta('RainbowStream latest version is ')
notice += light_magenta('You should upgrade with ')
notice += light_green('pip install -U rainbowstream')
printNicely(notice)
+ else:
+ notice = light_yellow('You are running latest version (')
+ notice += light_green(current)
+ notice += light_yellow(')')
+ printNicely(notice)
except:
pass
c['original_name'] = g['original_name'] = screen_name[1:]
g['full_name'] = name
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])('[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ', rl=True)
# Theme init
files = os.listdir(os.path.dirname(__file__) + '/colorset')
themes = [f.split('.')[0] for f in files if f.split('.')[-1] == 'json']
# Image on term
c['IMAGE_ON_TERM'] = args.image_on_term
set_config('IMAGE_ON_TERM', str(c['IMAGE_ON_TERM']))
+ # Check type of ONLY_LIST and IGNORE_LIST
+ if not isinstance(c['ONLY_LIST'], list):
+ printNicely(red('ONLY_LIST is not a valid list value.'))
+ c['ONLY_LIST'] = []
+ if not isinstance(c['IGNORE_LIST'], list):
+ printNicely(red('IGNORE_LIST is not a valid list value.'))
+ c['IGNORE_LIST'] = []
# Mute dict
c['IGNORE_LIST'] += build_mute_dict()
num = int(g['stuff'].split()[1])
except:
num = c['HOME_TWEET_NUM']
- for tweet in reversed(t.statuses.user_timeline(count=num, screen_name=user[1:])):
+ for tweet in reversed(
+ t.statuses.user_timeline(count=num, screen_name=user[1:])):
draw(t=tweet)
printNicely('')
else:
if not formater:
return
# Get comment
- prefix = light_magenta('Compose your ') + light_green('#comment: ')
+ prefix = light_magenta('Compose your ', rl=True) + \
+ light_green('#comment: ', rl=True)
comment = raw_input(prefix)
if comment:
quote = comment.join(formater.split('#comment'))
printNicely('')
+def share():
+ """
+ Copy url of a tweet to clipboard
+ """
+ t = Twitter(auth=authen())
+ try:
+ id = int(g['stuff'].split()[0])
+ tid = c['tweet_dict'][id]
+ except:
+ printNicely(red('Tweet id is not valid.'))
+ return
+ tweet = t.statuses.show(id=tid)
+ url = 'https://twitter.com/' + \
+ tweet['user']['screen_name'] + '/status/' + str(tid)
+ import platform
+ if platform.system().lower() == 'darwin':
+ os.system("echo '%s' | pbcopy" % url)
+ printNicely(green('Copied tweet\'s url to clipboard.'))
+ else:
+ printNicely('Direct link: ' + yellow(url))
+
+
def delete():
"""
Delete
def get_slug():
"""
- Get Slug Decorator
+ Get slug
"""
# Get list name
- list_name = raw_input(light_magenta('Give me the list\'s name: '))
+ list_name = raw_input(
+ light_magenta('Give me the list\'s name ("@owner/list_name"): ', rl=True))
# Get list name and owner
try:
owner, slug = list_name.split('/')
owner_screen_name=owner,
count=c['LIST_MAX'],
include_entities=False)
- for tweet in res:
+ for tweet in reversed(res):
draw(t=tweet)
printNicely('')
"""
owner, slug = get_slug()
# Add
- user_name = raw_input(light_magenta('Give me name of the newbie: '))
+ user_name = raw_input(
+ light_magenta(
+ 'Give me name of the newbie: ',
+ rl=True))
if user_name.startswith('@'):
user_name = user_name[1:]
try:
"""
owner, slug = get_slug()
# Remove
- user_name = raw_input(light_magenta('Give me name of the unlucky one: '))
+ user_name = raw_input(
+ light_magenta(
+ 'Give me name of the unlucky one: ',
+ rl=True))
if user_name.startswith('@'):
user_name = user_name[1:]
try:
"""
Create a new list
"""
- name = raw_input(light_magenta('New list\'s name: '))
- mode = raw_input(light_magenta('New list\'s mode (public/private): '))
- description = raw_input(light_magenta('New list\'s description: '))
+ name = raw_input(light_magenta('New list\'s name: ', rl=True))
+ mode = raw_input(
+ light_magenta(
+ 'New list\'s mode (public/private): ',
+ rl=True))
+ description = raw_input(
+ light_magenta(
+ 'New list\'s description: ',
+ rl=True))
try:
t.lists.create(
name=name,
"""
Update a list
"""
- slug = raw_input(light_magenta('Your list that you want to update: '))
- name = raw_input(light_magenta('Update name (leave blank to unchange): '))
- mode = raw_input(light_magenta('Update mode (public/private): '))
- description = raw_input(light_magenta('Update description: '))
+ slug = raw_input(
+ light_magenta(
+ 'Your list that you want to update: ',
+ rl=True))
+ name = raw_input(
+ light_magenta(
+ 'Update name (leave blank to unchange): ',
+ rl=True))
+ mode = raw_input(light_magenta('Update mode (public/private): ', rl=True))
+ description = raw_input(light_magenta('Update description: ', rl=True))
try:
if name:
t.lists.update(
"""
Delete a list
"""
- slug = raw_input(light_magenta('Your list that you want to delete: '))
+ slug = raw_input(
+ light_magenta(
+ 'Your list that you want to delete: ',
+ rl=True))
try:
t.lists.destroy(
slug='-'.join(slug.split()),
ignore = raw_input('Ignore nicks [Ex: @xxx,@yy]: ')
args.filter = filter(None, only.split(','))
args.ignore = filter(None, ignore.split(','))
- elif g['stuff'].split()[-1] == '-d':
- args.filter = c['ONLY_LIST']
- args.ignore = c['IGNORE_LIST']
except:
printNicely(red('Sorry, wrong format.'))
return
# Kill old thread
g['stream_stop'] = True
args.track_keywords = keyword
+ # Reset prefix
+ g['PREFIX'] = u2str(emojize(c['PREFIX']))
# Start new thread
th = threading.Thread(
target=stream,
elif target == 'mine':
# Kill old thread
g['stream_stop'] = True
+ # Reset prefix
+ g['PREFIX'] = u2str(emojize(c['PREFIX']))
# Start new thread
th = threading.Thread(
target=stream,
g['original_name']))
th.daemon = True
th.start()
+ # Stream base on list
+ elif target == 'list':
+ owner, slug = get_slug()
+ # Force python 2 not redraw readline buffer
+ g['PREFIX'] = g['cmd'] = '/'.join([owner, slug])
+ printNicely(light_yellow('getting list members ...'))
+ # Get members
+ t = Twitter(auth=authen())
+ members = []
+ next_cursor = -1
+ while next_cursor != 0:
+ m = t.lists.members(
+ slug=slug,
+ owner_screen_name=owner,
+ cursor=next_cursor,
+ include_entities=False)
+ for u in m['users']:
+ members.append('@' + u['screen_name'])
+ next_cursor = m['next_cursor']
+ printNicely(light_yellow('... done.'))
+ # Build thread filter array
+ args.filter = members
+ # Kill old thread
+ g['stream_stop'] = True
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ slug))
+ th.daemon = True
+ th.start()
printNicely('')
if args.filter:
- printNicely(cyan('Only: ' + str(args.filter)))
+ printNicely(cyan('Include: ' + str(len(args.filter)) + ' people.'))
if args.ignore:
- printNicely(red('Ignore: ' + str(args.ignore)))
+ printNicely(red('Ignore: ' + str(len(args.ignore)) + ' people.'))
printNicely('')
except:
+ debug_option()
printNicely(red('Sorry I can\'t understand.'))
value = get_default_config(key)
line = ' ' * 2 + green(key) + ': ' + light_magenta(value)
printNicely(line)
- except Exception as e:
- printNicely(red(e))
+ except:
+ debug_option()
+ printNicely(red('Just can not get the default.'))
# Delete specific config key in config file
elif len(g['stuff'].split()) == 2 and g['stuff'].split()[-1] == 'drop':
key = g['stuff'].split()[0]
try:
delete_config(key)
printNicely(green('Config key is dropped.'))
- except Exception as e:
- printNicely(red(e))
+ except:
+ debug_option()
+ printNicely(red('Just can not drop the key.'))
# Set specific config
elif len(g['stuff'].split()) == 3 and g['stuff'].split()[1] == '=':
key = g['stuff'].split()[0]
return
try:
set_config(key, value)
- # Apply theme immediately
+ # Keys that needs to be apply immediately
if key == 'THEME':
c['THEME'] = reload_theme(value, c['THEME'])
g['decorated_name'] = lambda x: color_func(
c['DECORATED_NAME'])('[' + x + ']: ')
+ elif key == 'PREFIX':
+ g['PREFIX'] = u2str(emojize(c['PREFIX']))
reload_config()
printNicely(green('Updated successfully.'))
- except Exception as e:
- printNicely(red(e))
+ except:
+ debug_option()
+ printNicely(red('Just can not set the key.'))
else:
printNicely(light_magenta('Sorry I can\'s understand.'))
usage += s * 2 + \
light_green('ufav 12 ') + ' will unfavorite tweet with ' + \
light_yellow('[id=12]') + '.\n'
+ usage += s * 2 + \
+ light_green('share 12 ') + ' will get the direct link of the tweet with ' + \
+ light_yellow('[id=12]') + '.\n'
usage += s * 2 + \
light_green('del 12 ') + ' will delete tweet with ' + \
light_yellow('[id=12]') + '.\n'
' filter will decide nicks will be INCLUDE ONLY.\n'
usage += s * 3 + light_yellow('Ignore nicks') + \
' filter will decide nicks will be EXCLUDE.\n'
- usage += s * 2 + light_green('switch mine -d') + \
- ' will use the config\'s ONLY_LIST and IGNORE_LIST.\n'
+ usage += s * 2 + light_green('switch list') + \
+ ' will switch to a Twitter list\'s stream. You will be asked for list name\n'
printNicely(usage)
usage += s * 2 + light_green('p') + ' will pause the stream.\n'
usage += s * 2 + light_green('r') + ' will unpause the stream.\n'
usage += s * 2 + light_green('c') + ' will clear the screen.\n'
+ usage += s * 2 + light_green('v') + ' will show version info.\n'
usage += s * 2 + light_green('q') + ' will quit.\n'
# End
usage += '\n'
'rep',
'del',
'ufav',
+ 'share',
's',
'mes',
'show',
'p',
'r',
'c',
- 'q'
+ 'v',
+ 'q',
]
# Handle function set
reply,
delete,
unfavorite,
+ share,
search,
message,
show,
pause,
replay,
clear,
- quit
+ upgrade_center,
+ quit,
]
d = dict(zip(
cmdset,
[
- ['public', 'mine'], # switch
+ ['public', 'mine', 'list'], # switch
[], # trend
[], # home
[], # notification
[], # reply
[], # delete
[], # unfavorite
+ [], # url
['#'], # search
['@'], # message
['image'], # show image
[], # pause
[], # reconnect
[], # clear
+ [], # version
[], # quit
]
))
printNicely(red('OMG something is wrong with Twitter right now.'))
+def reconn_notice():
+ """
+ Notice when Hangup or Timeout
+ """
+ guide = light_magenta("You can use ") + \
+ light_green("switch") + \
+ light_magenta(" command to return to your stream.\n")
+ guide += light_magenta("Type ") + \
+ light_green("h stream") + \
+ light_magenta(" for more details.")
+ printNicely(guide)
+ sys.stdout.write(g['decorated_name'](c['PREFIX']))
+ sys.stdout.flush()
+
+
def stream(domain, args, name='Rainbow Stream'):
"""
Track the stream
# Block new stream until other one exits
StreamLock.acquire()
g['stream_stop'] = False
+ last_tweet_time = time.time()
for tweet in tweet_iter:
if tweet is None:
printNicely("-- None --")
elif tweet is Timeout:
+ # Because the stream check for each 0.3s
+ # so we shouldn't output anything here
if(g['stream_stop']):
StreamLock.release()
break
elif tweet is HeartbeatTimeout:
printNicely("-- Heartbeat Timeout --")
- guide = light_magenta("You can use ") + \
- light_green("switch") + \
- light_magenta(" command to return to your stream.\n")
- guide += light_magenta("Type ") + \
- light_green("h stream") + \
- light_magenta(" for more details.")
- printNicely(guide)
- sys.stdout.write(g['decorated_name'](c['PREFIX']))
- sys.stdout.flush()
+ reconn_notice()
StreamLock.release()
break
elif tweet is Hangup:
printNicely("-- Hangup --")
+ reconn_notice()
+ StreamLock.release()
+ break
elif tweet.get('text'):
+ # Slow down the stream by STREAM_DELAY config key
+ if time.time() - last_tweet_time < c['STREAM_DELAY']:
+ continue
+ last_tweet_time = time.time()
# Check the semaphore pause and lock (stream process only)
if g['pause']:
continue
# Initial
args = parse_arguments()
try:
+ proxy_connect(args)
init(args)
+ # Twitter API connection problem
except TwitterHTTPError:
printNicely('')
printNicely(
printNicely(magenta("Let's try again later."))
save_history()
sys.exit()
+ # Proxy connection problem
+ except (socks.ProxyConnectionError, URLError):
+ printNicely(
+ magenta("There seems to be a connection problem."))
+ printNicely(
+ magenta("You might want to check your proxy settings (host, port and type)!"))
+ save_history()
+ sys.exit()
+
# Spawn stream thread
th = threading.Thread(
target=stream,