import os
import mimetypes
-import urllib
from tweepy.binder import bind_api
from tweepy.error import TweepError
parser_type = Parser
if not isinstance(self.parser, parser_type):
raise TypeError(
- '"parser" argument has to be an instance of "{}". It is currently a {}.'.format(
- parser_type.__name__, type(self.parser)
- )
+ '"parser" argument has to be an instance of "{}".'
+ ' It is currently a {}.'.format(parser_type.__name__,
+ type(self.parser))
)
@property
require_auth=True
)
- def statuses_lookup(self, id_, include_entities=None, trim_user=None, map_=None):
+ def statuses_lookup(self, id_, include_entities=None,
+ trim_user=None, map_=None):
return self._statuses_lookup(list_to_csv(id_), include_entities,
trim_user, map_)
allowed_param=['id']
)
-
@property
def update_status(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/statuses/update
@property
def retweeters(self):
- """
+ """
:allowed_param:'id', 'cursor', 'stringify_ids
"""
return bind_api(
@property
def update_profile_colors(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/account/update_profile_colors
- :allowed_param:'profile_background_color', 'profile_text_color', 'profile_link_color', 'profile_sidebar_fill_color', 'profile_sidebar_border_color'],
+ :allowed_param:'profile_background_color', 'profile_text_color',
+ 'profile_link_color', 'profile_sidebar_fill_color',
+ 'profile_sidebar_border_color'],
"""
return bind_api(
api=self,
@property
def list_timeline(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/lists/statuses
- :allowed_param:'owner_screen_name', 'slug', 'owner_id', 'list_id', 'since_id', 'max_id', 'count', 'include_rts
+ :allowed_param:'owner_screen_name', 'slug', 'owner_id', 'list_id',
+ 'since_id', 'max_id', 'count', 'include_rts
"""
return bind_api(
api=self,
path='/lists/statuses.json',
payload_type='status', payload_list=True,
- allowed_param=['owner_screen_name', 'slug', 'owner_id', 'list_id', 'since_id', 'max_id', 'count', 'include_rts']
+ allowed_param=['owner_screen_name', 'slug', 'owner_id',
+ 'list_id', 'since_id', 'max_id', 'count',
+ 'include_rts']
)
@property
@property
def add_list_member(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/members/create
- :allowed_param:'screen_name', 'user_id', 'owner_screen_name', 'owner_id', 'slug', 'list_id'
+ :allowed_param:'screen_name', 'user_id', 'owner_screen_name',
+ 'owner_id', 'slug', 'list_id'
"""
return bind_api(
api=self,
path='/lists/members/create.json',
method='POST',
payload_type='list',
- allowed_param=['screen_name', 'user_id', 'owner_screen_name', 'owner_id', 'slug', 'list_id'],
+ allowed_param=['screen_name', 'user_id', 'owner_screen_name',
+ 'owner_id', 'slug', 'list_id'],
require_auth=True
)
@property
def remove_list_member(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/members/destroy
- :allowed_param:'screen_name', 'user_id', 'owner_screen_name', 'owner_id', 'slug', 'list_id'
+ :allowed_param:'screen_name', 'user_id', 'owner_screen_name',
+ 'owner_id', 'slug', 'list_id'
"""
return bind_api(
api=self,
path='/lists/members/destroy.json',
method='POST',
payload_type='list',
- allowed_param=['screen_name', 'user_id', 'owner_screen_name', 'owner_id', 'slug', 'list_id'],
+ allowed_param=['screen_name', 'user_id', 'owner_screen_name',
+ 'owner_id', 'slug', 'list_id'],
require_auth=True
)
@property
def _add_list_members(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/members/create_all
- :allowed_param:'screen_name', 'user_id', 'slug', 'lit_id', 'owner_id', 'owner_screen_name'
+ :allowed_param:'screen_name', 'user_id', 'slug', 'lit_id',
+ 'owner_id', 'owner_screen_name'
"""
return bind_api(
api=self,
path='/lists/members/create_all.json',
method='POST',
payload_type='list',
- allowed_param=['screen_name', 'user_id', 'slug', 'lit_id', 'owner_id', 'owner_screen_name'],
+ allowed_param=['screen_name', 'user_id', 'slug', 'lit_id',
+ 'owner_id', 'owner_screen_name'],
require_auth=True
)
-
def remove_list_members(self, screen_name=None, user_id=None, slug=None,
list_id=None, owner_id=None, owner_screen_name=None):
""" Perform bulk remove of list members from user ID or screenname """
@property
def _remove_list_members(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/members/destroy_all
- :allowed_param:'screen_name', 'user_id', 'slug', 'lit_id', 'owner_id', 'owner_screen_name'
+ :allowed_param:'screen_name', 'user_id', 'slug', 'lit_id',
+ 'owner_id', 'owner_screen_name'
"""
return bind_api(
api=self,
path='/lists/members/destroy_all.json',
method='POST',
payload_type='list',
- allowed_param=['screen_name', 'user_id', 'slug', 'lit_id', 'owner_id', 'owner_screen_name'],
+ allowed_param=['screen_name', 'user_id', 'slug', 'lit_id',
+ 'owner_id', 'owner_screen_name'],
require_auth=True
)
@property
def list_members(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/lists/members
- :allowed_param:'owner_screen_name', 'slug', 'list_id', 'owner_id', 'cursor
+ :allowed_param:'owner_screen_name', 'slug', 'list_id',
+ 'owner_id', 'cursor
"""
return bind_api(
api=self,
path='/lists/members.json',
payload_type='user', payload_list=True,
- allowed_param=['owner_screen_name', 'slug', 'list_id', 'owner_id', 'cursor']
+ allowed_param=['owner_screen_name', 'slug', 'list_id',
+ 'owner_id', 'cursor']
)
@property
def show_list_member(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/lists/members/show
- :allowed_param:'list_id', 'slug', 'user_id', 'screen_name', 'owner_screen_name', 'owner_id
+ :allowed_param:'list_id', 'slug', 'user_id', 'screen_name',
+ 'owner_screen_name', 'owner_id
"""
return bind_api(
api=self,
path='/lists/members/show.json',
payload_type='user',
- allowed_param=['list_id', 'slug', 'user_id', 'screen_name', 'owner_screen_name', 'owner_id']
+ allowed_param=['list_id', 'slug', 'user_id', 'screen_name',
+ 'owner_screen_name', 'owner_id']
)
@property
def subscribe_list(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/subscribers/create
- :allowed_param:'owner_screen_name', 'slug', 'owner_id', 'list_id'
+ :allowed_param:'owner_screen_name', 'slug', 'owner_id',
+ 'list_id'
"""
return bind_api(
api=self,
path='/lists/subscribers/create.json',
method='POST',
payload_type='list',
- allowed_param=['owner_screen_name', 'slug', 'owner_id', 'list_id'],
+ allowed_param=['owner_screen_name', 'slug', 'owner_id',
+ 'list_id'],
require_auth=True
)
@property
def unsubscribe_list(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/post/lists/subscribers/destroy
- :allowed_param:'owner_screen_name', 'slug', 'owner_id', 'list_id'
+ :allowed_param:'owner_screen_name', 'slug', 'owner_id',
+ 'list_id'
"""
return bind_api(
api=self,
path='/lists/subscribers/destroy.json',
method='POST',
payload_type='list',
- allowed_param=['owner_screen_name', 'slug', 'owner_id', 'list_id'],
+ allowed_param=['owner_screen_name', 'slug', 'owner_id',
+ 'list_id'],
require_auth=True
)
@property
def list_subscribers(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/lists/subscribers
- :allowed_param:'owner_screen_name', 'slug', 'owner_id', 'list_id', 'cursor
+ :allowed_param:'owner_screen_name', 'slug', 'owner_id',
+ 'list_id', 'cursor
"""
return bind_api(
api=self,
path='/lists/subscribers.json',
payload_type='user', payload_list=True,
- allowed_param=['owner_screen_name', 'slug', 'owner_id', 'list_id', 'cursor']
+ allowed_param=['owner_screen_name', 'slug', 'owner_id',
+ 'list_id', 'cursor']
)
@property
def show_list_subscriber(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/lists/subscribers/show
- :allowed_param:'owner_screen_name', 'slug', 'screen_name', 'owner_id', 'list_id', 'user_id
+ :allowed_param:'owner_screen_name', 'slug', 'screen_name',
+ 'owner_id', 'list_id', 'user_id
"""
return bind_api(
api=self,
path='/lists/subscribers/show.json',
payload_type='user',
- allowed_param=['owner_screen_name', 'slug', 'screen_name', 'owner_id', 'list_id', 'user_id']
+ allowed_param=['owner_screen_name', 'slug', 'screen_name',
+ 'owner_id', 'list_id', 'user_id']
)
@property
@property
def search(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/search
- :allowed_param:'q', 'lang', 'locale', 'since_id', 'geocode', 'max_id', 'since', 'until', 'result_type', 'count', 'include_entities', 'from', 'to', 'source']
+ :allowed_param:'q', 'lang', 'locale', 'since_id', 'geocode',
+ 'max_id', 'since', 'until', 'result_type', 'count',
+ 'include_entities', 'from', 'to', 'source']
"""
return bind_api(
api=self,
path='/search/tweets.json',
payload_type='search_results',
- allowed_param=['q', 'lang', 'locale', 'since_id', 'geocode', 'max_id', 'since', 'until', 'result_type',
- 'count', 'include_entities', 'from', 'to', 'source']
+ allowed_param=['q', 'lang', 'locale', 'since_id', 'geocode',
+ 'max_id', 'since', 'until', 'result_type',
+ 'count', 'include_entities', 'from',
+ 'to', 'source']
)
@property
api=self,
path='/geo/reverse_geocode.json',
payload_type='place', payload_list=True,
- allowed_param=['lat', 'long', 'accuracy', 'granularity', 'max_results']
+ allowed_param=['lat', 'long', 'accuracy', 'granularity',
+ 'max_results']
)
@property
@property
def geo_search(self):
""" :reference: https://dev.twitter.com/docs/api/1.1/get/geo/search
- :allowed_param:'lat', 'long', 'query', 'ip', 'granularity', 'accuracy', 'max_results', 'contained_within
+ :allowed_param:'lat', 'long', 'query', 'ip', 'granularity',
+ 'accuracy', 'max_results', 'contained_within
"""
return bind_api(
api=self,
path='/geo/search.json',
payload_type='place', payload_list=True,
- allowed_param=['lat', 'long', 'query', 'ip', 'granularity', 'accuracy', 'max_results', 'contained_within']
+ allowed_param=['lat', 'long', 'query', 'ip', 'granularity',
+ 'accuracy', 'max_results', 'contained_within']
)
@property
filename = filename.encode("utf-8")
filename = filename.encode("utf-8")
- BOUNDARY = 'Tw3ePy'
- body = []
- body.append('--' + BOUNDARY)
+ boundary = 'Tw3ePy'
+ body = list()
+ body.append('--' + boundary)
body.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (form_field, filename))
body.append('Content-Type: %s' % file_type)
body.append('')
body.append(fp.read())
- body.append('--' + BOUNDARY + '--')
+ body.append('--' + boundary + '--')
body.append('')
fp.close()
body = '\r\n'.join(body)
# Copyright 2009-2010 Joshua Roesslein
# See LICENSE for details.
-from tweepy.error import TweepError
from tweepy.utils import parse_datetime, parse_html_value, parse_a_href
return self._since_id
ids = self.ids()
# Since_id is always set to the *greatest* id in the set
- return max(ids) if ids else None
+ return max(ids) if ids else None
def ids(self):
return [item.id for item in self if hasattr(item, 'id')]
+
class Model(object):
def __init__(self, api=None):
@classmethod
def parse_list(cls, api, json_list):
- """Parse a list of JSON objects into a result set of model instances."""
+ """
+ Parse a list of JSON objects into
+ a result set of model instances.
+ """
results = ResultSet()
for obj in json_list:
if obj:
return results
def __repr__(self):
- state = ['%s=%s' % (k, repr(v)) for (k,v) in vars(self).items()]
+ state = ['%s=%s' % (k, repr(v)) for (k, v) in vars(self).items()]
return '%s(%s)' % (self.__class__.__name__, ', '.join(state))
self.following = False
def lists_memberships(self, *args, **kargs):
- return self._api.lists_memberships(user=self.screen_name, *args, **kargs)
+ return self._api.lists_memberships(user=self.screen_name,
+ *args,
+ **kargs)
def lists_subscriptions(self, *args, **kargs):
- return self._api.lists_subscriptions(user=self.screen_name, *args, **kargs)
+ return self._api.lists_subscriptions(user=self.screen_name,
+ *args,
+ **kargs)
def lists(self, *args, **kargs):
- return self._api.lists_all(user=self.screen_name, *args, **kargs)
+ return self._api.lists_all(user=self.screen_name,
+ *args,
+ **kargs)
def followers_ids(self, *args, **kargs):
- return self._api.followers_ids(user_id=self.id, *args, **kargs)
+ return self._api.followers_ids(user_id=self.id,
+ *args,
+ **kargs)
class DirectMessage(Model):
@classmethod
def parse(cls, api, json):
lst = List(api)
- for k,v in json.items():
+ for k, v in json.items():
if k == 'user':
setattr(lst, k, User.parse(api, v))
elif k == 'created_at':
return self._api.destroy_list(self.slug)
def timeline(self, **kargs):
- return self._api.list_timeline(self.user.screen_name, self.slug, **kargs)
+ return self._api.list_timeline(self.user.screen_name,
+ self.slug,
+ **kargs)
def add_member(self, id):
return self._api.add_list_member(self.slug, id)
return self._api.remove_list_member(self.slug, id)
def members(self, **kargs):
- return self._api.list_members(self.user.screen_name, self.slug, **kargs)
+ return self._api.list_members(self.user.screen_name,
+ self.slug,
+ **kargs)
def is_member(self, id):
- return self._api.is_list_member(self.user.screen_name, self.slug, id)
+ return self._api.is_list_member(self.user.screen_name,
+ self.slug,
+ id)
def subscribe(self):
return self._api.subscribe_list(self.user.screen_name, self.slug)
return self._api.unsubscribe_list(self.user.screen_name, self.slug)
def subscribers(self, **kargs):
- return self._api.list_subscribers(self.user.screen_name, self.slug, **kargs)
+ return self._api.list_subscribers(self.user.screen_name,
+ self.slug,
+ **kargs)
def is_subscribed(self, id):
- return self._api.is_subscribed_list(self.user.screen_name, self.slug, id)
+ return self._api.is_subscribed_list(self.user.screen_name,
+ self.slug,
+ id)
+
class Relation(Model):
@classmethod
def parse(cls, api, json):
result = cls(api)
- for k,v in json.items():
+ for k, v in json.items():
if k == 'value' and json['kind'] in ['Tweet', 'LookedupStatus']:
setattr(result, k, Status.parse(api, v))
elif k == 'results':
setattr(result, k, v)
return result
+
class Relationship(Model):
@classmethod
def parse(cls, api, json):
result = cls(api)
- for k,v in json.items():
+ for k, v in json.items():
if k == 'connections':
setattr(result, 'is_following', 'following' in v)
setattr(result, 'is_followed_by', 'followed_by' in v)
setattr(result, k, v)
return result
+
class JSONModel(Model):
@classmethod
results.append(cls.parse(api, obj))
return results
+
class ModelFactory(object):
"""
Used by parsers for creating instances
ids = IDModel
place = Place
bounding_box = BoundingBox
-
self.running = False
self.timeout = options.get("timeout", 300.0)
self.retry_count = options.get("retry_count")
- # values according to https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
+ # values according to
+ # https://dev.twitter.com/docs/streaming-apis/connecting#Reconnecting
self.retry_time_start = options.get("retry_time", 5.0)
self.retry_420_start = options.get("retry_420", 60.0)
self.retry_time_cap = options.get("retry_time_cap", 320.0)
resp = None
exception = None
while self.running:
- if self.retry_count is not None and error_counter > self.retry_count:
- # quit if error count greater than retry count
- break
+ if self.retry_count is not None:
+ if error_counter > self.retry_count:
+ # quit if error count greater than retry count
+ break
try:
auth = self.auth.apply_auth()
- resp = self.session.request('POST', url, data=self.body,
- timeout=self.timeout, stream=True, auth=auth)
+ resp = self.session.request('POST',
+ url,
+ data=self.body,
+ timeout=self.timeout,
+ stream=True,
+ auth=auth)
if resp.status_code != 200:
if self.listener.on_error(resp.status_code) is False:
break
error_counter += 1
if resp.status_code == 420:
- self.retry_time = max(self.retry_420_start, self.retry_time)
+ self.retry_time = max(self.retry_420_start,
+ self.retry_time)
sleep(self.retry_time)
- self.retry_time = min(self.retry_time * 2, self.retry_time_cap)
+ self.retry_time = min(self.retry_time * 2,
+ self.retry_time_cap)
else:
error_counter = 0
self.retry_time = self.retry_time_start
self.listener.on_connect()
self._read_loop(resp)
except (Timeout, ssl.SSLError) as exc:
- # This is still necessary, as a SSLError can actually be thrown when using Requests
+ # This is still necessary, as a SSLError can actually be
+ # thrown when using Requests
# If it's not time out treat it like any other exception
- if isinstance(exc, ssl.SSLError) and not (exc.args and 'timed out' in str(exc.args[0])):
- exception = exc
- break
- if self.listener.on_timeout() == False:
+ if isinstance(exc, ssl.SSLError):
+ if not (exc.args and 'timed out' in str(exc.args[0])):
+ exception = exc
+ break
+ if self.listener.on_timeout() is False:
break
if self.running is False:
break
while self.running:
- # Note: keep-alive newlines might be inserted before each length value.
+ # Note: keep-alive newlines might be inserted
+ # before each length value.
# read until we get a digit...
c = '\n'
for c in resp.iter_content():
# read the next twitter status object
if delimited_string.strip().isdigit():
- next_status_obj = resp.raw.read( int(delimited_string) )
+ next_status_obj = resp.raw.read(int(delimited_string))
if self.running:
self._data(next_status_obj)
""" Called when the response has been closed by Twitter """
pass
- def userstream(self, stall_warnings=False, _with=None, replies=None,
- track=None, locations=None, async=False, encoding='utf8'):
+ def userstream(self,
+ stall_warnings=False,
+ _with=None,
+ replies=None,
+ track=None,
+ locations=None,
+ async=False,
+ encoding='utf8'):
self.session.params = {'delimited': 'length'}
if self.running:
raise TweepError('Stream object already connected!')
self.url = '/%s/user.json' % STREAM_VERSION
- self.host='userstream.twitter.com'
+ self.host = 'userstream.twitter.com'
if stall_warnings:
self.session.params['stall_warnings'] = stall_warnings
if _with:
self.host = 'stream.twitter.com'
self._start(async)
- def sitestream(self, follow, stall_warnings=False, with_='user', replies=False, async=False):
+ def sitestream(self, follow, stall_warnings=False,
+ with_='user', replies=False, async=False):
self.parameters = {}
if self.running:
raise TweepError('Stream object already connected!')
if self.running is False:
return
self.running = False
-