parser.add_argument(
'-s',
'--stream',
- default="mine",
+ default="mine",
help='Default stream after program start. (Default: mine)')
parser.add_argument(
'-to',
light_magenta('List name should follow "@owner/list_name" format.'))
raise Exception('Wrong list name')
+
def check_slug(list_name):
"""
Check slug
light_magenta('List name should follow "@owner/list_name" format.'))
raise Exception('Wrong list name')
+
def show_lists(t):
"""
List list
except:
printNicely(red('Sorry, wrong format.'))
return
- # Public stream
- if target == 'public':
- keyword = g['stuff'].split()[1]
- if keyword[0] == '#':
- keyword = keyword[1:]
- # Kill old thread
- g['stream_stop'] = True
- args.track_keywords = keyword
- # Set the variable to tracked keyword
- # and reset the listname
- g['keyword'] = keyword
- g['listname'] = ''
- # Reset prefix
- g['PREFIX'] = u2str(emojize(format_prefix(keyword=g['keyword'])))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['PUBLIC_DOMAIN'],
- args))
- th.daemon = True
- th.start()
- # Personal stream
- elif target == 'mine':
- # Kill old thread
- g['stream_stop'] = True
- # Reset the tracked keyword and listname
- g['keyword'] = g['listname'] = ''
- # Reset prefix
- g['PREFIX'] = u2str(emojize(format_prefix()))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- g['original_name']))
- th.daemon = True
- th.start()
- # Stream base on list
- elif target == 'list':
- try:
- owner, slug = check_slug(g['stuff'].split()[1])
- except:
- owner, slug = get_slug()
- # Force python 2 not redraw readline buffer
- listname = '/'.join([owner, slug])
- # Set the listname variable
- # and reset tracked keyword
- g['listname'] = listname
- g['keyword'] = ''
- g['PREFIX'] = g['cmd'] = u2str(emojize(format_prefix(
- listname=g['listname']
- )))
- printNicely(light_yellow('getting list members ...'))
- # Get members
- t = Twitter(auth=authen())
- members = []
- next_cursor = -1
- while next_cursor != 0:
- m = t.lists.members(
- slug=slug,
- owner_screen_name=owner,
- cursor=next_cursor,
- include_entities=False)
- for u in m['users']:
- members.append('@' + u['screen_name'])
- next_cursor = m['next_cursor']
- printNicely(light_yellow('... done.'))
- # Build thread filter array
- args.filter = members
- # Kill old thread
- g['stream_stop'] = True
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- slug))
- th.daemon = True
- th.start()
- printNicely('')
- if args.filter:
- printNicely(cyan('Include: ' + str(len(args.filter)) + ' people.'))
- if args.ignore:
- printNicely(red('Ignore: ' + str(len(args.ignore)) + ' people.'))
- printNicely('')
+ # Kill old thread
+ g['stream_stop'] = True
+ try:
+ stuff = g['stuff'].split()[1]
+ except:
+ stuff = None
+ # Spawn new thread
+ spawn_dict = {
+ 'public': spawn_public_stream,
+ 'list': spawn_list_stream,
+ 'mine': spawn_personal_stream,
+ }
+ spawn_dict.get(target)(args, stuff)
except:
debug_option()
printNicely(red('Sorry I can\'t understand.'))
c['SITE_DOMAIN']: name,
}
if c['ASCII_ART']:
- ascii_art(art_dict[domain])
+ ascii_art(art_dict.get(domain, name))
# These arguments are optional:
stream_args = dict(
timeout=0.5, # To check g['stream_stop'] after each 0.5 s
sys.exit()
+def spawn_public_stream(args, keyword=None):
+ """
+ Spawn a new public stream
+ """
+ # Only set keyword if specified
+ if keyword:
+ if keyword[0] == '#':
+ keyword = keyword[1:]
+ args.track_keywords = keyword
+ # Set the variable to tracked keyword
+ g['keyword'] = keyword
+ g['listname'] = ''
+ # Reset prefix
+ g['PREFIX'] = u2str(emojize(format_prefix(keyword=g['keyword'])))
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['PUBLIC_DOMAIN'],
+ args))
+ th.daemon = True
+ th.start()
+
+
+def spawn_list_stream(args, stuff=None):
+ """
+ Spawn a new list stream
+ """
+ try:
+ owner, slug = check_slug(stuff)
+ except:
+ owner, slug = get_slug()
+
+ # Force python 2 not redraw readline buffer
+ listname = '/'.join([owner, slug])
+ # Set the listname variable
+ # and reset tracked keyword
+ g['listname'] = listname
+ g['keyword'] = ''
+ g['PREFIX'] = g['cmd'] = u2str(emojize(format_prefix(
+ listname=g['listname']
+ )))
+ printNicely(light_yellow('getting list members ...'))
+ # Get members
+ t = Twitter(auth=authen())
+ members = []
+ next_cursor = -1
+ while next_cursor != 0:
+ m = t.lists.members(
+ slug=slug,
+ owner_screen_name=owner,
+ cursor=next_cursor,
+ include_entities=False)
+ for u in m['users']:
+ members.append('@' + u['screen_name'])
+ next_cursor = m['next_cursor']
+ printNicely(light_yellow('... done.'))
+ # Build thread filter array
+ args.filter = members
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ slug))
+ th.daemon = True
+ th.start()
+ printNicely('')
+ if args.filter:
+ printNicely(cyan('Include: ' + str(len(args.filter)) + ' people.'))
+ if args.ignore:
+ printNicely(red('Ignore: ' + str(len(args.ignore)) + ' people.'))
+ printNicely('')
+
+
+def spawn_personal_stream(args, stuff=None):
+ """
+ Spawn a new personal stream
+ """
+ # Reset the tracked keyword and listname
+ g['keyword'] = g['listname'] = ''
+ # Reset prefix
+ g['PREFIX'] = u2str(emojize(format_prefix()))
+ # Start new thread
+ th = threading.Thread(
+ target=stream,
+ args=(
+ c['USER_DOMAIN'],
+ args,
+ g['original_name']))
+ th.daemon = True
+ th.start()
+
+
def fly():
"""
Main function
# Spawn stream thread
target = args.stream.split()[0]
- if target == 'public':
+ if target == 'mine' :
+ spawn_personal_stream(args)
+ else :
try:
- keyword = args.stream.split()[1]
- if keyword[0] == '#':
- keyword = keyword[1:]
- args.track_keywords = keyword
- # Set the variable to tracked keyword
- g['keyword'] = keyword
- # Reset prefix
- g['PREFIX'] = u2str(emojize(format_prefix(keyword=g['keyword'])))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['PUBLIC_DOMAIN'],
- args))
- th.daemon = True
- th.start()
+ stuff = args.stream.split()[1]
+ spawn_dict = {
+ 'public': spawn_public_stream,
+ 'list': spawn_list_stream,
+ }
+ spawn_dict.get(target)(args, stuff)
except:
- printNicely(red('Public requires a keyword! Loading your personal stream.'))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- g['original_name']))
- th.daemon = True
- th.start()
- elif target == "list":
- try:
- owner, slug = check_slug(args.stream.split()[1])
- # Force python 2 not redraw readline buffer
- listname = '/'.join([owner, slug])
- # Set the listname variable
- # and reset tracked keyword
- g['listname'] = listname
- g['keyword'] = ''
- g['PREFIX'] = g['cmd'] = u2str(emojize(format_prefix(
- listname=g['listname']
- )))
- printNicely(light_yellow('getting list members ...'))
- # Get members
- t = Twitter(auth=authen())
- members = []
- next_cursor = -1
- while next_cursor != 0:
- m = t.lists.members(
- slug=slug,
- owner_screen_name=owner,
- cursor=next_cursor,
- include_entities=False)
- for u in m['users']:
- members.append('@' + u['screen_name'])
- next_cursor = m['next_cursor']
- printNicely(light_yellow('... done.'))
- # Build thread filter array
- args.filter = members
- # Kill old thread
- g['stream_stop'] = True
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- slug))
- th.daemon = True
- th.start()
- printNicely('')
- if args.filter:
- printNicely(cyan('Include: ' + str(len(args.filter)) + ' people.'))
- except:
- printNicely(red('List requieres a correct name of a list! Loading your personal stream.'))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- g['original_name']))
- th.daemon = True
- th.start()
- elif target == "mine":
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- g['original_name']))
- th.daemon = True
- th.start()
- else:
- printNicely(red('Wrong -s/--stream argument given. Loading your personal stream.'))
- # Start new thread
- th = threading.Thread(
- target=stream,
- args=(
- c['USER_DOMAIN'],
- args,
- g['original_name']))
- th.daemon = True
- th.start()
-
+ printNicely(red('Wrong -s/--stream argument given. Loading your personal stream.'))
+ spawn_personal_stream(args)
+
# Start listen process
time.sleep(0.5)
g['reset'] = True