Merge branch 'devel' into messages
[diaspy.git] / diaspy / streams.py
1 """Docstrings for this module are taken from:
2 https://gist.github.com/MrZYX/01c93096c30dc44caf71
3
4 Documentation for D* JSON API taken from:
5 http://pad.spored.de/ro/r.qWmvhSZg7rk4OQam
6 """
7
8
9 import json
10 import time
11 from diaspy.models import Post, Aspect
12 from diaspy import errors
13
14
15 class Generic():
16 """Object representing generic stream.
17 """
18 _location = 'stream.json'
19
20 def __init__(self, connection, location=''):
21 """
22 :param connection: Connection() object
23 :type connection: diaspy.connection.Connection
24 :param location: location of json (optional)
25 :type location: str
26 """
27 self._connection = connection
28 if location: self._location = location
29 self._stream = []
30 # since epoch
31 self.max_time = int(time.mktime(time.gmtime()))
32 self.fill()
33
34 def __contains__(self, post):
35 """Returns True if stream contains given post.
36 """
37 return post in self._stream
38
39 def __iter__(self):
40 """Provides iterable interface for stream.
41 """
42 return iter(self._stream)
43
44 def __getitem__(self, n):
45 """Returns n-th item in Stream.
46 """
47 return self._stream[n]
48
49 def __len__(self):
50 """Returns length of the Stream.
51 """
52 return len(self._stream)
53
54 def _obtain(self, max_time=0):
55 """Obtains stream from pod.
56 """
57 params = {}
58 if max_time:
59 params['max_time'] = max_time
60 params['_'] = int(time.time() * 1000)
61 request = self._connection.get(self._location, params=params)
62 if request.status_code != 200:
63 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
64 return [Post(self._connection, post['id']) for post in request.json()]
65
66 def _expand(self, new_stream):
67 """Appends older posts to stream.
68 """
69 ids = [post.id for post in self._stream]
70 stream = self._stream
71 for post in new_stream:
72 if post.id not in ids:
73 stream.append(post)
74 ids.append(post.id)
75 self._stream = stream
76
77 def _update(self, new_stream):
78 """Updates stream with new posts.
79 """
80 ids = [post.id for post in self._stream]
81
82 stream = self._stream
83 for i in range(len(new_stream)):
84 if new_stream[-i].id not in ids:
85 stream = [new_stream[-i]] + stream
86 ids.append(new_stream[-i].id)
87 self._stream = stream
88
89 def clear(self):
90 """Removes all posts from stream.
91 """
92 self._stream = []
93
94 def purge(self):
95 """Removes all unexistent posts from stream.
96 """
97 stream = []
98 for post in self._stream:
99 deleted = False
100 try:
101 post.update()
102 stream.append(post)
103 except Exception:
104 deleted = True
105 finally:
106 if not deleted: stream.append(post)
107 self._stream = stream
108
109 def update(self):
110 """Updates stream.
111 """
112 self._update(self._obtain())
113
114 def fill(self):
115 """Fills the stream with posts.
116 """
117 self._stream = self._obtain()
118
119 def more(self, max_time=0, backtime=84600):
120 """Tries to download more (older ones) Posts from Stream.
121
122 :param backtime: how many seconds substract each time (defaults to one day)
123 :type backtime: int
124 :param max_time: seconds since epoch (optional, diaspy'll figure everything on its own)
125 :type max_time: int
126 """
127 if not max_time: max_time = self.max_time - backtime
128 self.max_time = max_time
129 new_stream = self._obtain(max_time=max_time)
130 self._expand(new_stream)
131
132 def full(self, backtime=84600, retry=42, callback=None):
133 """Fetches full stream - containing all posts.
134 WARNING: this is a **VERY** long running function.
135 Use callback parameter to access information about the stream during its
136 run.
137
138 Default backtime is one day. But sometimes user might not have any activity for longer
139 period (on the beginning I posted once a month or so).
140 The role of retry is to hadle such situations by trying to go further back in time.
141 If a post is found the counter is restored.
142
143 :param backtime: how many seconds to substract each time
144 :type backtime: int
145 :param retry: how many times the functin should look deeper than your last post
146 :type retry: int
147 :param callback: callable taking diaspy.streams.Generic as an argument
148 :returns: integer, lenght of the stream
149 """
150 oldstream = self.copy()
151 self.more()
152 while len(oldstream) < len(self):
153 oldstream = self.copy()
154 if callback is not None: callback(self)
155 self.more(backtime=backtime)
156 if len(oldstream) < len(self): continue
157 # but if no posts were found start retrying...
158 print('retrying... {0}'.format(retry))
159 n = retry
160 while n > 0:
161 print('\t', n, self.max_time)
162 # try to get even more posts...
163 self.more(backtime=backtime)
164 print('\t', len(oldstream), len(self))
165 # check if it was a success...
166 if len(oldstream) < len(self):
167 # and if so restore normal order of execution by
168 # going one loop higher
169 break
170 oldstream = self.copy()
171 # if it was not a success substract one day, keep calm and
172 # try going further rback in time...
173 n -= 1
174 #if len(oldstream) == len(self): break
175 return len(self)
176
177 def copy(self):
178 """Returns copy (list of posts) of current stream.
179 """
180 return [p for p in self._stream]
181
182 def json(self, comments=False):
183 """Returns JSON encoded string containing stream's data.
184
185 :param comments: to include comments or not to include 'em, that is the question this param holds answer to
186 :type comments: bool
187 """
188 stream = [post for post in self._stream]
189 if comments:
190 for i, post in enumerate(stream):
191 post._fetchcomments()
192 comments = [c.data for c in post.comments]
193 post['interactions']['comments'] = comments
194 stream[i] = post
195 stream = [post.data for post in stream]
196 return json.dumps(stream)
197
198
199 class Outer(Generic):
200 """Object used by diaspy.models.User to represent
201 stream of other user.
202 """
203 def _obtain(self, max_time=0):
204 """Obtains stream from pod.
205 """
206 params = {}
207 if max_time: params['max_time'] = max_time
208 request = self._connection.get(self._location, params=params)
209 if request.status_code != 200:
210 raise errors.StreamError('wrong status code: {0}'.format(request.status_code))
211 return [Post(self._connection, post['id']) for post in request.json()]
212
213
214 class Stream(Generic):
215 """The main stream containing the combined posts of the
216 followed users and tags and the community spotlights posts
217 if the user enabled those.
218 """
219 location = 'stream.json'
220
221 def post(self, text='', aspect_ids='public', photos=None, photo=''):
222 """This function sends a post to an aspect.
223 If both `photo` and `photos` are specified `photos` takes precedence.
224
225 :param text: Text to post.
226 :type text: str
227 :param aspect_ids: Aspect ids to send post to.
228 :type aspect_ids: str
229 :param photo: filename of photo to post
230 :type photo: str
231 :param photos: id of photo to post (obtained from _photoupload())
232 :type photos: int
233
234 :returns: diaspy.models.Post -- the Post which has been created
235 """
236 data = {}
237 data['aspect_ids'] = aspect_ids
238 data['status_message'] = {'text': text}
239 if photo: data['photos'] = self._photoupload(photo)
240 if photos: data['photos'] = photos
241
242 request = self._connection.post('status_messages',
243 data=json.dumps(data),
244 headers={'content-type': 'application/json',
245 'accept': 'application/json',
246 'x-csrf-token': repr(self._connection)})
247 if request.status_code != 201:
248 raise Exception('{0}: Post could not be posted.'.format(request.status_code))
249
250 post = Post(self._connection, request.json()['id'])
251 return post
252
253 def _photoupload(self, filename):
254 """Uploads picture to the pod.
255
256 :param filename: path to picture file
257 :type filename: str
258
259 :returns: id of the photo being uploaded
260 """
261 data = open(filename, 'rb')
262 image = data.read()
263 data.close()
264
265 params = {}
266 params['photo[pending]'] = 'true'
267 params['set_profile_image'] = ''
268 params['qqfile'] = filename
269 aspects = self._connection.getUserInfo()['aspects']
270 for i, aspect in enumerate(aspects):
271 params['photo[aspect_ids][{0}]'.format(i)] = aspect['id']
272
273 headers = {'content-type': 'application/octet-stream',
274 'x-csrf-token': repr(self._connection),
275 'x-file-name': filename}
276
277 request = self._connection.post('photos', data=image, params=params, headers=headers)
278 if request.status_code != 200:
279 raise errors.StreamError('photo cannot be uploaded: {0}'.format(request.status_code))
280 return request.json()['data']['photo']['id']
281
282
283 class Activity(Stream):
284 """Stream representing user's activity.
285 """
286 _location = 'activity.json'
287
288 def _delid(self, id):
289 """Deletes post with given id.
290 """
291 post = None
292 for p in self._stream:
293 if p['id'] == id:
294 post = p
295 break
296 if post is not None: post.delete()
297
298 def delete(self, post):
299 """Deletes post from users activity.
300 `post` can be either post id or Post()
301 object which will be identified and deleted.
302 After deleting post the stream will be filled.
303
304 :param post: post identifier
305 :type post: str, diaspy.models.Post
306 """
307 if type(post) == str: self._delid(post)
308 elif type(post) == Post: post.delete()
309 else: raise TypeError('this method accepts str or Post types: {0} given')
310 self.fill()
311
312
313 class Aspects(Generic):
314 """This stream contains the posts filtered by
315 the specified aspect IDs. You can choose the aspect IDs with
316 the parameter `aspect_ids` which value should be
317 a comma seperated list of aspect IDs.
318 If the parameter is ommitted all aspects are assumed.
319 An example call would be `aspects.json?aspect_ids=23,5,42`
320 """
321 _location = 'aspects.json'
322
323 def getAspectID(self, aspect_name):
324 """Returns id of an aspect of given name.
325 Returns -1 if aspect is not found.
326
327 :param aspect_name: aspect name (must be spelled exactly as when created)
328 :type aspect_name: str
329 :returns: int
330 """
331 id = -1
332 aspects = self._connection.getUserInfo()['aspects']
333 for aspect in aspects:
334 if aspect['name'] == aspect_name: id = aspect['id']
335 return id
336
337 def filterByIDs(self, ids):
338 self._location += '?{0}'.format(','.join(ids))
339 self.fill()
340
341 def add(self, aspect_name, visible=0):
342 """This function adds a new aspect.
343 Status code 422 is accepted because it is returned by D* when
344 you try to add aspect already present on your aspect list.
345
346 :returns: Aspect() object of just created aspect
347 """
348 data = {'authenticity_token': self._connection.get_token(),
349 'aspect[name]': aspect_name,
350 'aspect[contacts_visible]': visible}
351
352 request = self._connection.post('aspects', data=data)
353 if request.status_code not in [200, 422]:
354 raise Exception('wrong status code: {0}'.format(request.status_code))
355
356 id = self.getAspectID(aspect_name)
357 return Aspect(self._connection, id)
358
359 def remove(self, aspect_id=-1, name=''):
360 """This method removes an aspect.
361 You can give it either id or name of the aspect.
362 When both are specified, id takes precedence over name.
363
364 Status code 500 is accepted because although the D* will
365 go nuts it will remove the aspect anyway.
366
367 :param aspect_id: id fo aspect to remove
368 :type aspect_id: int
369 :param name: name of aspect to remove
370 :type name: str
371 """
372 if aspect_id == -1 and name: aspect_id = self.getAspectID(name)
373 data = {'_method': 'delete',
374 'authenticity_token': self._connection.get_token()}
375 request = self._connection.post('aspects/{0}'.format(aspect_id), data=data)
376 if request.status_code not in [200, 302, 500]:
377 raise Exception('wrong status code: {0}: cannot remove aspect'.format(request.status_code))
378
379
380 class Commented(Generic):
381 """This stream contains all posts
382 the user has made a comment on.
383 """
384 _location = 'commented.json'
385
386
387 class Liked(Generic):
388 """This stream contains all posts the user liked.
389 """
390 _location = 'liked.json'
391
392
393 class Mentions(Generic):
394 """This stream contains all posts
395 the user is mentioned in.
396 """
397 _location = 'mentions.json'
398
399
400 class FollowedTags(Generic):
401 """This stream contains all posts
402 containing tags the user is following.
403 """
404 _location = 'followed_tags.json'
405
406 def remove(self, tag_id):
407 """Stop following a tag.
408
409 :param tag_id: tag id
410 :type tag_id: int
411 """
412 data = {'authenticity_token': self._connection.get_token()}
413 request = self._connection.delete('tag_followings/{0}'.format(tag_id), data=data)
414 if request.status_code != 404:
415 raise Exception('wrong status code: {0}'.format(request.status_code))
416
417 def add(self, tag_name):
418 """Follow new tag.
419 Error code 403 is accepted because pods respod with it when request
420 is sent to follow a tag that a user already follows.
421
422 :param tag_name: tag name
423 :type tag_name: str
424 :returns: int (response code)
425 """
426 data = {'name': tag_name,
427 'authenticity_token': self._connection.get_token(),
428 }
429 headers = {'content-type': 'application/json',
430 'x-csrf-token': self._connection.get_token(),
431 'accept': 'application/json'
432 }
433
434 request = self._connection.post('tag_followings', data=json.dumps(data), headers=headers)
435
436 if request.status_code not in [201, 403]:
437 raise Exception('wrong error code: {0}'.format(request.status_code))
438 return request.status_code
439
440
441 class Tag(Generic):
442 """This stream contains all posts containing a tag.
443 """
444 def __init__(self, connection, tag):
445 """
446 :param connection: Connection() object
447 :type connection: diaspy.connection.Connection
448 :param tag: tag name
449 :type tag: str
450 """
451 self._connection = connection
452 self._location = 'tags/{0}.json'.format(tag)
453 self.fill()