Commit | Line | Data |
---|---|---|
c5678c1a | 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
cf29e8a8 | 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
c5678c1a CAW |
3 | # |
4 | # This program is free software: you can redistribute it and/or modify | |
5 | # it under the terms of the GNU Affero General Public License as published by | |
6 | # the Free Software Foundation, either version 3 of the License, or | |
7 | # (at your option) any later version. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU Affero General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU Affero General Public License | |
15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
16 | ||
17 | ||
1e03504e | 18 | import os |
c5678c1a | 19 | import pkg_resources |
1e03504e JK |
20 | import shutil |
21 | ||
c5678c1a | 22 | |
623bee73 | 23 | from paste.deploy import loadapp |
c5678c1a CAW |
24 | from webtest import TestApp |
25 | ||
91b89bde | 26 | from mediagoblin import mg_globals |
2d7b6bde | 27 | from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \ |
e1561d04 | 28 | CommentSubscription, CommentNotification, Privilege |
152a3bfa | 29 | from mediagoblin.tools import testing |
421129b6 | 30 | from mediagoblin.init.config import read_mediagoblin_config |
39dc3bf8 | 31 | from mediagoblin.db.base import Session |
56dc1c9d | 32 | from mediagoblin.meddleware import BaseMeddleware |
fa723291 | 33 | from mediagoblin.auth import gen_password_hash |
d693f6bd | 34 | from mediagoblin.gmg_commands.dbupdate import run_dbupdate |
c5678c1a CAW |
35 | |
36 | ||
cfd2cbf3 | 37 | MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' |
623bee73 | 38 | TEST_SERVER_CONFIG = pkg_resources.resource_filename( |
5c441e75 | 39 | 'mediagoblin.tests', 'test_paste.ini') |
c5678c1a | 40 | TEST_APP_CONFIG = pkg_resources.resource_filename( |
623bee73 | 41 | 'mediagoblin.tests', 'test_mgoblin_app.ini') |
6588acc1 | 42 | |
c5678c1a | 43 | |
9e1fa239 | 44 | USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue'] |
c5678c1a | 45 | |
c5678c1a | 46 | |
56dc1c9d | 47 | class TestingMeddleware(BaseMeddleware): |
33d11e99 | 48 | """ |
ce5ae8da | 49 | Meddleware for the Unit tests |
a11aa2d1 | 50 | |
33d11e99 E |
51 | It might make sense to perform some tests on all |
52 | requests/responses. Or prepare them in a special | |
53 | manner. For example all html responses could be tested | |
54 | for being valid html *after* being rendered. | |
55 | ||
56 | This module is getting inserted at the front of the | |
ce5ae8da | 57 | meddleware list, which means: requests are handed here |
33d11e99 E |
58 | first, responses last. So this wraps up the "normal" |
59 | app. | |
60 | ||
61 | If you need to add a test, either add it directly to | |
62 | the appropiate process_request or process_response, or | |
63 | create a new method and call it from process_*. | |
64 | """ | |
65 | ||
33d11e99 E |
66 | def process_response(self, request, response): |
67 | # All following tests should be for html only! | |
74af60bb | 68 | if getattr(response, 'content_type', None) != "text/html": |
33d11e99 E |
69 | # Get out early |
70 | return | |
71 | ||
72 | # If the template contains a reference to | |
73 | # /mgoblin_static/ instead of using | |
74 | # /request.staticdirect(), error out here. | |
75 | # This could probably be implemented as a grep on | |
76 | # the shipped templates easier... | |
77 | if response.text.find("/mgoblin_static/") >= 0: | |
78 | raise AssertionError( | |
79 | "Response HTML contains reference to /mgoblin_static/ " | |
80 | "instead of staticdirect. Request was for: " | |
81 | + request.full_path) | |
82 | ||
83 | return | |
84 | ||
85 | ||
5c2ece74 CAW |
86 | def get_app(request, paste_config=None, mgoblin_config=None): |
87 | """Create a MediaGoblin app for testing. | |
88 | ||
89 | Args: | |
90 | - request: Not an http request, but a pytest fixture request. We | |
91 | use this to make temporary directories that pytest | |
92 | automatically cleans up as needed. | |
93 | - paste_config: particular paste config used by this application. | |
94 | - mgoblin_config: particular mediagoblin config used by this | |
95 | application. | |
96 | """ | |
6588acc1 CAW |
97 | paste_config = paste_config or TEST_SERVER_CONFIG |
98 | mgoblin_config = mgoblin_config or TEST_APP_CONFIG | |
99 | ||
5c2ece74 CAW |
100 | # This is the directory we're copying the paste/mgoblin config stuff into |
101 | run_dir = request.config._tmpdirhandler.mktemp( | |
102 | 'mgoblin_app', numbered=True) | |
491029bc | 103 | user_dev_dir = run_dir.mkdir('user_dev').strpath |
5c2ece74 CAW |
104 | |
105 | new_paste_config = run_dir.join('paste.ini').strpath | |
106 | new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath | |
107 | shutil.copyfile(paste_config, new_paste_config) | |
108 | shutil.copyfile(mgoblin_config, new_mgoblin_config) | |
109 | ||
9c768866 BS |
110 | Session.rollback() |
111 | Session.remove() | |
112 | ||
5c2ece74 | 113 | # install user_dev directories |
c5678c1a | 114 | for directory in USER_DEV_DIRECTORIES_TO_SETUP: |
5c2ece74 | 115 | full_dir = os.path.join(user_dev_dir, directory) |
c5678c1a CAW |
116 | os.makedirs(full_dir) |
117 | ||
118 | # Get app config | |
5c2ece74 | 119 | global_config, validation_result = read_mediagoblin_config(new_mgoblin_config) |
623bee73 | 120 | app_config = global_config['mediagoblin'] |
c5678c1a | 121 | |
d693f6bd | 122 | # Run database setup/migrations |
30520c92 | 123 | run_dbupdate(app_config, global_config) |
c5678c1a CAW |
124 | |
125 | # setup app and return | |
0a791a94 | 126 | test_app = loadapp( |
5c2ece74 | 127 | 'config:' + new_paste_config) |
623bee73 | 128 | |
ce5ae8da | 129 | # Insert the TestingMeddleware, which can do some |
91b89bde | 130 | # sanity checks on every request/response. |
34b0874d E |
131 | # Doing it this way is probably not the cleanest way. |
132 | # We'll fix it, when we have plugins! | |
ce5ae8da | 133 | mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) |
91b89bde | 134 | |
623bee73 | 135 | app = TestApp(test_app) |
623bee73 | 136 | return app |
3aa4c668 CAW |
137 | |
138 | ||
85663692 CAW |
139 | def install_fixtures_simple(db, fixtures): |
140 | """ | |
141 | Very simply install fixtures in the database | |
142 | """ | |
143 | for collection_name, collection_fixtures in fixtures.iteritems(): | |
144 | collection = db[collection_name] | |
145 | for fixture in collection_fixtures: | |
146 | collection.insert(fixture) | |
147 | ||
148 | ||
149 | def assert_db_meets_expected(db, expected): | |
150 | """ | |
151 | Assert a database contains the things we expect it to. | |
152 | ||
5c2b8486 SS |
153 | Objects are found via 'id', so you should make sure your document |
154 | has an id. | |
85663692 CAW |
155 | |
156 | Args: | |
157 | - db: pymongo or mongokit database connection | |
158 | - expected: the data we expect. Formatted like: | |
159 | {'collection_name': [ | |
5c2b8486 | 160 | {'id': 'foo', |
85663692 CAW |
161 | 'some_field': 'some_value'},]} |
162 | """ | |
163 | for collection_name, collection_data in expected.iteritems(): | |
164 | collection = db[collection_name] | |
165 | for expected_document in collection_data: | |
44082b12 | 166 | document = collection.query.filter_by(id=expected_document['id']).first() |
85663692 CAW |
167 | assert document is not None # make sure it exists |
168 | assert document == expected_document # make sure it matches | |
9754802d E |
169 | |
170 | ||
a5cf95c5 | 171 | def fixture_add_user(username=u'chris', password=u'toast', |
e1561d04 | 172 | privileges=[], wants_comment_notification=True): |
4fc0a289 SS |
173 | # Reuse existing user or create a new one |
174 | test_user = User.query.filter_by(username=username).first() | |
175 | if test_user is None: | |
176 | test_user = User() | |
9754802d E |
177 | test_user.username = username |
178 | test_user.email = username + u'@example.com' | |
179 | if password is not None: | |
fa723291 | 180 | test_user.pw_hash = gen_password_hash(password) |
2d7b6bde | 181 | test_user.wants_comment_notification = wants_comment_notification |
e1561d04 | 182 | for privilege in privileges: |
183 | query = Privilege.query.filter(Privilege.privilege_name==privilege) | |
184 | if query.count(): | |
185 | test_user.all_privileges.append(query.one()) | |
2d7b6bde | 186 | |
9754802d | 187 | test_user.save() |
37ef4c66 | 188 | # Reload |
a5cf95c5 | 189 | test_user = User.query.filter_by(username=username).first() |
37ef4c66 E |
190 | |
191 | # ... and detach from session: | |
37ef4c66 E |
192 | Session.expunge(test_user) |
193 | ||
9754802d | 194 | return test_user |
cd75b228 E |
195 | |
196 | ||
2d7b6bde JW |
197 | def fixture_comment_subscription(entry, notify=True, send_email=None): |
198 | if send_email is None: | |
199 | uploader = User.query.filter_by(id=entry.uploader).first() | |
200 | send_email = uploader.wants_comment_notification | |
201 | ||
202 | cs = CommentSubscription( | |
203 | media_entry_id=entry.id, | |
204 | user_id=entry.uploader, | |
205 | notify=notify, | |
206 | send_email=send_email) | |
207 | ||
208 | cs.save() | |
209 | ||
210 | cs = CommentSubscription.query.filter_by(id=cs.id).first() | |
211 | ||
212 | Session.expunge(cs) | |
213 | ||
214 | return cs | |
215 | ||
216 | ||
217 | def fixture_add_comment_notification(entry_id, subject_id, user_id, | |
218 | seen=False): | |
219 | cn = CommentNotification(user_id=user_id, | |
220 | seen=seen, | |
221 | subject_id=subject_id) | |
222 | cn.save() | |
223 | ||
224 | cn = CommentNotification.query.filter_by(id=cn.id).first() | |
225 | ||
226 | Session.expunge(cn) | |
227 | ||
228 | return cn | |
229 | ||
230 | ||
e9b4e500 | 231 | def fixture_media_entry(title=u"Some title", slug=None, |
2d7b6bde JW |
232 | uploader=None, save=True, gen_slug=True, |
233 | state=u'unprocessed', fake_upload=True, | |
234 | expunge=True): | |
c3de34d4 CAW |
235 | """ |
236 | Add a media entry for testing purposes. | |
237 | ||
238 | Caution: if you're adding multiple entries with fake_upload=True, | |
239 | make sure you save between them... otherwise you'll hit an | |
240 | IntegrityError from multiple newly-added-MediaEntries adding | |
241 | FileKeynames at once. :) | |
242 | """ | |
2d7b6bde JW |
243 | if uploader is None: |
244 | uploader = fixture_add_user().id | |
245 | ||
e9b4e500 E |
246 | entry = MediaEntry() |
247 | entry.title = title | |
248 | entry.slug = slug | |
2d7b6bde | 249 | entry.uploader = uploader |
e9b4e500 | 250 | entry.media_type = u'image' |
2d7b6bde JW |
251 | entry.state = state |
252 | ||
253 | if fake_upload: | |
254 | entry.media_files = {'thumb': ['a', 'b', 'c.jpg'], | |
255 | 'medium': ['d', 'e', 'f.png'], | |
256 | 'original': ['g', 'h', 'i.png']} | |
257 | entry.media_type = u'mediagoblin.media_types.image' | |
c121a7d3 | 258 | |
e9b4e500 E |
259 | if gen_slug: |
260 | entry.generate_slug() | |
2d7b6bde | 261 | |
e9b4e500 E |
262 | if save: |
263 | entry.save() | |
264 | ||
2d7b6bde JW |
265 | if expunge: |
266 | entry = MediaEntry.query.filter_by(id=entry.id).first() | |
267 | ||
268 | Session.expunge(entry) | |
269 | ||
e9b4e500 E |
270 | return entry |
271 | ||
272 | ||
cd75b228 E |
273 | def fixture_add_collection(name=u"My first Collection", user=None): |
274 | if user is None: | |
275 | user = fixture_add_user() | |
276 | coll = Collection.query.filter_by(creator=user.id, title=name).first() | |
277 | if coll is not None: | |
278 | return coll | |
279 | coll = Collection() | |
280 | coll.creator = user.id | |
281 | coll.title = name | |
282 | coll.generate_slug() | |
283 | coll.save() | |
284 | ||
285 | # Reload | |
286 | Session.refresh(coll) | |
287 | ||
288 | # ... and detach from session: | |
289 | Session.expunge(coll) | |
290 | ||
291 | return coll | |
491029bc | 292 | |
2d7b6bde JW |
293 | def fixture_add_comment(author=None, media_entry=None, comment=None): |
294 | if author is None: | |
295 | author = fixture_add_user().id | |
296 | ||
297 | if media_entry is None: | |
298 | media_entry = fixture_media_entry().id | |
299 | ||
300 | if comment is None: | |
301 | comment = \ | |
302 | 'Auto-generated test comment by user #{0} on media #{0}'.format( | |
303 | author, media_entry) | |
304 | ||
305 | comment = MediaComment(author=author, | |
306 | media_entry=media_entry, | |
307 | content=comment) | |
308 | ||
309 | comment.save() | |
310 | ||
311 | Session.expunge(comment) | |
312 | ||
313 | return comment | |
314 |