CONSUMER_SECRET)
+def build_mute_dict(dict_data=False):
+ """
+ Build muting list
+ """
+ t = Twitter(auth=authen())
+ # Init cursor
+ next_cursor = -1
+ screen_name_list = []
+ name_list = []
+ # Cursor loop
+ while next_cursor != 0:
+ list = t.mutes.users.list(
+ screen_name=g['original_name'],
+ cursor=next_cursor,
+ skip_status=True,
+ include_entities=False,
+ )
+ screen_name_list += ['@' + u['screen_name'] for u in list['users']]
+ name_list += [u['name'] for u in list['users']]
+ next_cursor = list['next_cursor']
+ # Return dict or list
+ if dict_data:
+ return dict(zip(screen_name_list, name_list))
+ else:
+ return screen_name_list
+
+
def init(args):
"""
Init function
set_config('PREFIX', name)
g['original_name'] = name[1:]
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])(
- '[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ')
# Theme init
files = os.listdir(os.path.dirname(__file__) + '/colorset')
themes = [f.split('.')[0] for f in files if f.split('.')[-1] == 'json']
g['themes'] = themes
+ # Startup cmd
+ g['previous_cmd'] = ''
# Semaphore init
c['lock'] = False
c['pause'] = False
c['message_dict'] = []
# Image on term
c['IMAGE_ON_TERM'] = args.image_on_term
+ set_config('IMAGE_ON_TERM',str(c['IMAGE_ON_TERM']))
+ # Mute dict
+ c['IGNORE_LIST'] += build_mute_dict()
def switch():
g['stream_stop'] = True
args.track_keywords = keyword
# Start new thread
- th = threading.Thread(target=stream, args=(c['PUBLIC_DOMAIN'], args))
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['PUBLIC_DOMAIN'],
+ args))
th.daemon = True
th.start()
# Personal stream
# Kill old thread
g['stream_stop'] = True
# Start new thread
- th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ g['original_name']))
th.daemon = True
th.start()
printNicely('')
tid = c['tweet_dict'][id]
user = t.statuses.show(id=tid)['user']['screen_name']
status = ' '.join(g['stuff'].split()[1:])
- status = '@' + user + ' ' + status.decode('utf-8')
+ status = '@' + user + ' ' + unc(status)
t.statuses.update(status=status, in_reply_to_status_id=tid)
printNicely(red('A name should be specified. '))
return
if screen_name.startswith('@'):
- rel = t.mutes.users.create(screen_name=screen_name[1:])
- if isinstance(rel, dict):
- printNicely(green(screen_name + ' is muted.'))
- else:
- printNicely(red(rel))
+ try:
+ rel = t.mutes.users.create(screen_name=screen_name[1:])
+ if isinstance(rel, dict):
+ printNicely(green(screen_name + ' is muted.'))
+ c['IGNORE_LIST'] += [unc(screen_name)]
+ c['IGNORE_LIST'] = list(set(c['IGNORE_LIST']))
+ else:
+ printNicely(red(rel))
+ except:
+ printNicely(red('Something is wrong, can not mute now :('))
else:
printNicely(red('A name should begin with a \'@\''))
printNicely(red('A name should be specified. '))
return
if screen_name.startswith('@'):
- rel = t.mutes.users.destroy(screen_name=screen_name[1:])
- if isinstance(rel, dict):
- printNicely(green(screen_name + ' is unmuted.'))
- else:
- printNicely(red(rel))
+ try:
+ rel = t.mutes.users.destroy(screen_name=screen_name[1:])
+ if isinstance(rel, dict):
+ printNicely(green(screen_name + ' is unmuted.'))
+ c['IGNORE_LIST'].remove(screen_name)
+ else:
+ printNicely(red(rel))
+ except:
+ printNicely(red('Maybe you are not muting this person ?'))
else:
printNicely(red('A name should begin with a \'@\''))
"""
List muting user
"""
- t = Twitter(auth=authen())
- # Init cursor
- next_cursor = -1
- rel = {}
- # Cursor loop
- while next_cursor != 0:
- list = t.mutes.users.list(
- screen_name=g['original_name'],
- cursor=next_cursor,
- skip_status=True,
- include_entities=False,
- )
- for u in list['users']:
- rel[u['name']] = '@' + u['screen_name']
- next_cursor = list['next_cursor']
- # Print out result
- printNicely('All: ' + str(len(rel)) + ' people.')
- for name in rel:
- user = ' ' + cycle_color(name)
- user += color_func(c['TWEET']['nick'])(' ' + rel[name] + ' ')
+ # Get dict of muting users
+ md = build_mute_dict(dict_data=True)
+ printNicely('All: ' + str(len(md)) + ' people.')
+ for name in md:
+ user = ' ' + cycle_color(md[name])
+ user += color_func(c['TWEET']['nick'])(' ' + name + ' ')
printNicely(user)
+ # Update from Twitter
+ c['IGNORE_LIST'] = [n for n in md]
def block():
printNicely(red('Oops something is wrong with Twitter :('))
-def list():
+def twitterlist():
"""
Twitter's list
"""
value = get_default_config(key)
line = ' ' * 2 + green(key) + ': ' + light_magenta(value)
printNicely(line)
- except:
- printNicely(
- light_magenta('This config key does not exist in default.'))
+ except Exception as e:
+ printNicely(red(e))
# Delete specific config key in config file
elif len(g['stuff'].split()) == 2 and g['stuff'].split()[-1] == 'drop':
key = g['stuff'].split()[0]
try:
delete_config(key)
printNicely(green('Config key is dropped.'))
- except:
- printNicely(red('No such config key.'))
+ except Exception as e:
+ printNicely(red(e))
# Set specific config
elif len(g['stuff'].split()) == 3 and g['stuff'].split()[1] == '=':
key = g['stuff'].split()[0]
set_config(key, value)
# Apply theme immediately
if key == 'THEME':
- c['THEME'] = reload_theme(value,c['THEME'])
+ c['THEME'] = reload_theme(value, c['THEME'])
g['decorated_name'] = lambda x: color_func(
- c['DECORATED_NAME'])(
- '[' + x + ']: ')
+ c['DECORATED_NAME'])('[' + x + ']: ')
+ reload_config()
printNicely(green('Updated successfully.'))
- except:
- printNicely(light_magenta('Not valid value.'))
- return
- reload_config()
+ except Exception as e:
+ printNicely(red(e))
else:
printNicely(light_magenta('Sorry I can\'s understand.'))
# Change theme
try:
# Load new theme
- c['THEME'] = reload_theme(g['stuff'],c['THEME'])
+ c['THEME'] = reload_theme(g['stuff'], c['THEME'])
# Redefine decorated_name
g['decorated_name'] = lambda x: color_func(
c['DECORATED_NAME'])(
'stream': help_stream,
}
if g['stuff']:
- d[g['stuff'].strip()]()
+ d.get(
+ g['stuff'].strip(),
+ lambda: printNicely(red('No such command.'))
+ )()
else:
printNicely(usage)
Reset prefix of line
"""
if g['reset']:
+ if c.get('USER_JSON_ERROR'):
+ printNicely(red('Your ~/.rainbow_config.json is messed up:'))
+ printNicely(red('>>> ' + c['USER_JSON_ERROR']))
+ printNicely('')
printNicely(magenta('Need tips ? Type "h" and hit Enter key!'))
g['reset'] = False
try:
block,
unblock,
report,
- list,
+ twitterlist,
cal,
config,
theme,
line = raw_input(g['decorated_name'](c['PREFIX']))
else:
line = raw_input()
+ # Save previous cmd in order to compare with readline buffer
+ g['previous_cmd'] = line.strip()
try:
cmd = line.split()[0]
except:
ascii_art(art_dict[domain])
# These arguments are optional:
stream_args = dict(
- timeout=args.timeout,
- block=False,
+ timeout=0.5, # To check g['stream_stop'] after each 0.5 s
+ block=not args.no_block,
heartbeat_timeout=args.heartbeat_timeout)
# Track keyword
query_args = dict()
StreamLock.acquire()
g['stream_stop'] = False
for tweet in tweet_iter:
- if(g['stream_stop']):
- StreamLock.release()
- break
if tweet is None:
- pass
+ printNicely("-- None --")
elif tweet is Timeout:
- printNicely("-- Timeout --")
+ if(g['stream_stop']):
+ StreamLock.release()
+ break
elif tweet is HeartbeatTimeout:
printNicely("-- Heartbeat Timeout --")
elif tweet is Hangup:
fil=args.filter,
ig=args.ignore,
)
+ # Current readline buffer
+ current_buffer = readline.get_line_buffer().strip()
+ # There is an unexpected behaviour in MacOSX readline + Python 2:
+ # after completely delete a word after typing it,
+ # somehow readline buffer still contains
+ # the 1st character of that word
+ if current_buffer and g['previous_cmd'] != current_buffer:
+ sys.stdout.write(
+ g['decorated_name'](c['PREFIX']) + current_buffer)
+ sys.stdout.flush()
+ elif not c['HIDE_PROMPT']:
+ sys.stdout.write(g['decorated_name'](c['PREFIX']))
+ sys.stdout.flush()
elif tweet.get('direct_message'):
print_message(tweet['direct_message'], check_semaphore=True)
except TwitterHTTPError:
except TwitterHTTPError:
printNicely('')
printNicely(
- magenta("We have maximum connection problem with twitter'stream API right now :("))
+ magenta("We have connection problem with twitter'stream API right now :("))
printNicely(magenta("Let's try again later."))
save_history()
sys.exit()
# Spawn stream thread
- th = threading.Thread(target=stream, args=(c['USER_DOMAIN'], args, g['original_name']))
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ g['original_name']))
th.daemon = True
th.start()
# Start listen process