Change codebase to query or create correct User model
[mediagoblin.git] / mediagoblin / tests / tools.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import pkg_resources
20 import shutil
21
22 import six
23
24 from paste.deploy import loadapp
25 from webtest import TestApp
26
27 from mediagoblin import mg_globals
28 from mediagoblin.db.models import User, LocalUser, MediaEntry, Collection, MediaComment, \
29 CommentSubscription, CommentNotification, Privilege, CommentReport, Client, \
30 RequestToken, AccessToken, Activity, Generator
31 from mediagoblin.tools import testing
32 from mediagoblin.init.config import read_mediagoblin_config
33 from mediagoblin.db.base import Session
34 from mediagoblin.meddleware import BaseMeddleware
35 from mediagoblin.auth import gen_password_hash
36 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
37 from mediagoblin.tools.crypto import random_string
38
39 from datetime import datetime
40
41
42 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
43 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
44 'mediagoblin.tests', 'test_paste.ini')
45 TEST_APP_CONFIG = pkg_resources.resource_filename(
46 'mediagoblin.tests', 'test_mgoblin_app.ini')
47
48
49 USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
50
51
52 class TestingMeddleware(BaseMeddleware):
53 """
54 Meddleware for the Unit tests
55
56 It might make sense to perform some tests on all
57 requests/responses. Or prepare them in a special
58 manner. For example all html responses could be tested
59 for being valid html *after* being rendered.
60
61 This module is getting inserted at the front of the
62 meddleware list, which means: requests are handed here
63 first, responses last. So this wraps up the "normal"
64 app.
65
66 If you need to add a test, either add it directly to
67 the appropiate process_request or process_response, or
68 create a new method and call it from process_*.
69 """
70
71 def process_response(self, request, response):
72 # All following tests should be for html only!
73 if getattr(response, 'content_type', None) != "text/html":
74 # Get out early
75 return
76
77 # If the template contains a reference to
78 # /mgoblin_static/ instead of using
79 # /request.staticdirect(), error out here.
80 # This could probably be implemented as a grep on
81 # the shipped templates easier...
82 if response.text.find("/mgoblin_static/") >= 0:
83 raise AssertionError(
84 "Response HTML contains reference to /mgoblin_static/ "
85 "instead of staticdirect. Request was for: "
86 + request.full_path)
87
88 return
89
90
91 def get_app(request, paste_config=None, mgoblin_config=None):
92 """Create a MediaGoblin app for testing.
93
94 Args:
95 - request: Not an http request, but a pytest fixture request. We
96 use this to make temporary directories that pytest
97 automatically cleans up as needed.
98 - paste_config: particular paste config used by this application.
99 - mgoblin_config: particular mediagoblin config used by this
100 application.
101 """
102 paste_config = paste_config or TEST_SERVER_CONFIG
103 mgoblin_config = mgoblin_config or TEST_APP_CONFIG
104
105 # This is the directory we're copying the paste/mgoblin config stuff into
106 run_dir = request.config._tmpdirhandler.mktemp(
107 'mgoblin_app', numbered=True)
108 user_dev_dir = run_dir.mkdir('user_dev').strpath
109
110 new_paste_config = run_dir.join('paste.ini').strpath
111 new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
112 shutil.copyfile(paste_config, new_paste_config)
113 shutil.copyfile(mgoblin_config, new_mgoblin_config)
114
115 Session.rollback()
116 Session.remove()
117
118 # install user_dev directories
119 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
120 full_dir = os.path.join(user_dev_dir, directory)
121 os.makedirs(full_dir)
122
123 # Get app config
124 global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
125 app_config = global_config['mediagoblin']
126
127 # Run database setup/migrations
128 run_dbupdate(app_config, global_config)
129
130 # setup app and return
131 test_app = loadapp(
132 'config:' + new_paste_config)
133
134 # Insert the TestingMeddleware, which can do some
135 # sanity checks on every request/response.
136 # Doing it this way is probably not the cleanest way.
137 # We'll fix it, when we have plugins!
138 mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
139
140 app = TestApp(test_app)
141 return app
142
143
144 def install_fixtures_simple(db, fixtures):
145 """
146 Very simply install fixtures in the database
147 """
148 for collection_name, collection_fixtures in six.iteritems(fixtures):
149 collection = db[collection_name]
150 for fixture in collection_fixtures:
151 collection.insert(fixture)
152
153
154 def assert_db_meets_expected(db, expected):
155 """
156 Assert a database contains the things we expect it to.
157
158 Objects are found via 'id', so you should make sure your document
159 has an id.
160
161 Args:
162 - db: pymongo or mongokit database connection
163 - expected: the data we expect. Formatted like:
164 {'collection_name': [
165 {'id': 'foo',
166 'some_field': 'some_value'},]}
167 """
168 for collection_name, collection_data in six.iteritems(expected):
169 collection = db[collection_name]
170 for expected_document in collection_data:
171 document = collection.query.filter_by(id=expected_document['id']).first()
172 assert document is not None # make sure it exists
173 assert document == expected_document # make sure it matches
174
175
176 def fixture_add_user(username=u'chris', password=u'toast',
177 privileges=[], wants_comment_notification=True):
178 # Reuse existing user or create a new one
179 test_user = User.query.filter(LocalUser.username==username).first()
180 if test_user is None:
181 test_user = LocalUser()
182 test_user.username = username
183 test_user.email = username + u'@example.com'
184 if password is not None:
185 test_user.pw_hash = gen_password_hash(password)
186 test_user.wants_comment_notification = wants_comment_notification
187 for privilege in privileges:
188 query = Privilege.query.filter(Privilege.privilege_name==privilege)
189 if query.count():
190 test_user.all_privileges.append(query.one())
191
192 test_user.save()
193 # Reload
194 test_user = User.query.filter(LocalUser.username==username).first()
195
196 # ... and detach from session:
197 Session.expunge(test_user)
198
199 return test_user
200
201
202 def fixture_comment_subscription(entry, notify=True, send_email=None):
203 if send_email is None:
204 uploader = User.query.filter_by(id=entry.uploader).first()
205 send_email = uploader.wants_comment_notification
206
207 cs = CommentSubscription(
208 media_entry_id=entry.id,
209 user_id=entry.uploader,
210 notify=notify,
211 send_email=send_email)
212
213 cs.save()
214
215 cs = CommentSubscription.query.filter_by(id=cs.id).first()
216
217 Session.expunge(cs)
218
219 return cs
220
221
222 def fixture_add_comment_notification(entry_id, subject_id, user_id,
223 seen=False):
224 cn = CommentNotification(user_id=user_id,
225 seen=seen,
226 subject_id=subject_id)
227 cn.save()
228
229 cn = CommentNotification.query.filter_by(id=cn.id).first()
230
231 Session.expunge(cn)
232
233 return cn
234
235
236 def fixture_media_entry(title=u"Some title", slug=None,
237 uploader=None, save=True, gen_slug=True,
238 state=u'unprocessed', fake_upload=True,
239 expunge=True):
240 """
241 Add a media entry for testing purposes.
242
243 Caution: if you're adding multiple entries with fake_upload=True,
244 make sure you save between them... otherwise you'll hit an
245 IntegrityError from multiple newly-added-MediaEntries adding
246 FileKeynames at once. :)
247 """
248 if uploader is None:
249 uploader = fixture_add_user().id
250
251 entry = MediaEntry()
252 entry.title = title
253 entry.slug = slug
254 entry.uploader = uploader
255 entry.media_type = u'image'
256 entry.state = state
257
258 if fake_upload:
259 entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
260 'medium': ['d', 'e', 'f.png'],
261 'original': ['g', 'h', 'i.png']}
262 entry.media_type = u'mediagoblin.media_types.image'
263
264 if gen_slug:
265 entry.generate_slug()
266
267 if save:
268 entry.save()
269
270 if expunge:
271 entry = MediaEntry.query.filter_by(id=entry.id).first()
272
273 Session.expunge(entry)
274
275 return entry
276
277
278 def fixture_add_collection(name=u"My first Collection", user=None):
279 if user is None:
280 user = fixture_add_user()
281 coll = Collection.query.filter_by(creator=user.id, title=name).first()
282 if coll is not None:
283 return coll
284 coll = Collection()
285 coll.creator = user.id
286 coll.title = name
287 coll.generate_slug()
288 coll.save()
289
290 # Reload
291 Session.refresh(coll)
292
293 # ... and detach from session:
294 Session.expunge(coll)
295
296 return coll
297
298 def fixture_add_comment(author=None, media_entry=None, comment=None):
299 if author is None:
300 author = fixture_add_user().id
301
302 if media_entry is None:
303 media_entry = fixture_media_entry().id
304
305 if comment is None:
306 comment = \
307 'Auto-generated test comment by user #{0} on media #{0}'.format(
308 author, media_entry)
309
310 comment = MediaComment(author=author,
311 media_entry=media_entry,
312 content=comment)
313
314 comment.save()
315
316 Session.expunge(comment)
317
318 return comment
319
320 def fixture_add_comment_report(comment=None, reported_user=None,
321 reporter=None, created=None, report_content=None):
322 if comment is None:
323 comment = fixture_add_comment()
324
325 if reported_user is None:
326 reported_user = fixture_add_user()
327
328 if reporter is None:
329 reporter = fixture_add_user()
330
331 if created is None:
332 created=datetime.now()
333
334 if report_content is None:
335 report_content = \
336 'Auto-generated test report'
337
338 comment_report = CommentReport(comment=comment,
339 reported_user = reported_user,
340 reporter = reporter,
341 created = created,
342 report_content=report_content)
343
344 comment_report.save()
345
346 Session.expunge(comment_report)
347
348 return comment_report
349
350 def fixture_add_activity(obj, verb="post", target=None, generator=None, actor=None):
351 if generator is None:
352 generator = Generator(
353 name="GNU MediaGoblin",
354 object_type="service"
355 )
356 generator.save()
357
358 if actor is None:
359 actor = fixture_add_user()
360
361 activity = Activity(
362 verb=verb,
363 actor=actor.id,
364 generator=generator.id,
365 )
366
367 activity.set_object(obj)
368
369 if target is not None:
370 activity.set_target(target)
371
372 activity.save()
373 return activity