526a2a1d89a367ff2bce0b4a7d7b7b2d6efe829e
[mediagoblin.git] / mediagoblin / tests / tools.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import pkg_resources
20 import shutil
21
22 from functools import wraps
23
24 from paste.deploy import loadapp
25 from webtest import TestApp
26
27 from mediagoblin import mg_globals
28 from mediagoblin.db.models import User, MediaEntry, Collection
29 from mediagoblin.tools import testing
30 from mediagoblin.init.config import read_mediagoblin_config
31 from mediagoblin.db.open import setup_connection_and_db_from_config
32 from mediagoblin.db.base import Session
33 from mediagoblin.meddleware import BaseMeddleware
34 from mediagoblin.auth.lib import bcrypt_gen_password_hash
35 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
36 from mediagoblin.init.celery import setup_celery_app
37
38
39 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
40 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
41 'mediagoblin.tests', 'test_paste.ini')
42 TEST_APP_CONFIG = pkg_resources.resource_filename(
43 'mediagoblin.tests', 'test_mgoblin_app.ini')
44 TEST_USER_DEV = pkg_resources.resource_filename(
45 'mediagoblin.tests', 'test_user_dev')
46 MGOBLIN_APP = None
47 OLD_MGOBLIN_APP_CONFIGS = (None, None)
48
49
50 USER_DEV_DIRECTORIES_TO_SETUP = [
51 'media/public', 'media/queue',
52 'beaker/sessions/data', 'beaker/sessions/lock']
53
54 BAD_CELERY_MESSAGE = """\
55 Sorry, you *absolutely* must run nosetests with the
56 mediagoblin.init.celery.from_tests module. Like so:
57 $ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
58
59
60 class BadCeleryEnviron(Exception): pass
61
62
63 class TestingMeddleware(BaseMeddleware):
64 """
65 Meddleware for the Unit tests
66
67 It might make sense to perform some tests on all
68 requests/responses. Or prepare them in a special
69 manner. For example all html responses could be tested
70 for being valid html *after* being rendered.
71
72 This module is getting inserted at the front of the
73 meddleware list, which means: requests are handed here
74 first, responses last. So this wraps up the "normal"
75 app.
76
77 If you need to add a test, either add it directly to
78 the appropiate process_request or process_response, or
79 create a new method and call it from process_*.
80 """
81
82 def process_response(self, request, response):
83 # All following tests should be for html only!
84 if getattr(response, 'content_type', None) != "text/html":
85 # Get out early
86 return
87
88 # If the template contains a reference to
89 # /mgoblin_static/ instead of using
90 # /request.staticdirect(), error out here.
91 # This could probably be implemented as a grep on
92 # the shipped templates easier...
93 if response.text.find("/mgoblin_static/") >= 0:
94 raise AssertionError(
95 "Response HTML contains reference to /mgoblin_static/ "
96 "instead of staticdirect. Request was for: "
97 + request.full_path)
98
99 return
100
101
102 def suicide_if_bad_celery_environ():
103 if not os.environ.get('CELERY_CONFIG_MODULE') == \
104 'mediagoblin.init.celery.from_tests':
105 raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
106
107
108 def get_app(paste_config=None, mgoblin_config=None, dump_old_app=True):
109 paste_config = paste_config or TEST_SERVER_CONFIG
110 mgoblin_config = mgoblin_config or TEST_APP_CONFIG
111
112 suicide_if_bad_celery_environ()
113
114 # Make sure we've turned on testing
115 testing._activate_testing()
116
117 # Leave this imported as it sets up celery.
118 from mediagoblin.init.celery import from_tests
119
120 global MGOBLIN_APP
121
122 # Just return the old app if that exists and it's okay to set up
123 # and return
124 #
125 # ...Man I can't wait till we get rid of paste configs in tests.
126 global OLD_MGOBLIN_APP_CONFIGS
127 old_paste, old_mgoblin = OLD_MGOBLIN_APP_CONFIGS
128
129 if MGOBLIN_APP and not dump_old_app \
130 and old_paste == paste_config and old_mgoblin == mgoblin_config:
131 return MGOBLIN_APP
132
133 Session.rollback()
134 Session.remove()
135
136 # Remove and reinstall user_dev directories
137 if os.path.exists(TEST_USER_DEV):
138 shutil.rmtree(TEST_USER_DEV)
139
140 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
141 full_dir = os.path.join(TEST_USER_DEV, directory)
142 os.makedirs(full_dir)
143
144 # Get app config
145 global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
146 app_config = global_config['mediagoblin']
147
148 # Run database setup/migrations
149 run_dbupdate(app_config, global_config)
150
151 # setup app and return
152 test_app = loadapp(
153 'config:' + TEST_SERVER_CONFIG)
154
155 # Re-setup celery
156 setup_celery_app(app_config, global_config)
157
158 # Insert the TestingMeddleware, which can do some
159 # sanity checks on every request/response.
160 # Doing it this way is probably not the cleanest way.
161 # We'll fix it, when we have plugins!
162 mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
163
164 app = TestApp(test_app)
165 MGOBLIN_APP = app
166
167 # Make sure we can see if this app matches the next app if not
168 # re-setting-up
169 OLD_MGOBLIN_APP_CONFIGS = (paste_config, mgoblin_config)
170
171 return app
172
173
174 class SetupFreshApp(object):
175 """
176 Decorator to setup a fresh test application for this function.
177
178 Cleans out test buckets and passes in a new, fresh test_app.
179 """
180 def __init__(self, paste_config, mgoblin_config, dump_old_app=True):
181 self.paste_config = paste_config
182 self.mgoblin_config = mgoblin_config
183
184 def __call__(self, func):
185 @wraps(func)
186 def wrapper(*args, **kwargs):
187 test_app = get_app(
188 paste_config=self.paste_config,
189 mgoblin_config=self.mgoblin_config,
190 dump_old_app=self.dump_old_app)
191 testing.clear_test_buckets()
192 return func(test_app, *args, **kwargs)
193
194 return wrapper
195
196 setup_fresh_app = SetupFreshApp(TEST_SERVER_CONFIG, TEST_APP_CONFIG)
197
198
199 def install_fixtures_simple(db, fixtures):
200 """
201 Very simply install fixtures in the database
202 """
203 for collection_name, collection_fixtures in fixtures.iteritems():
204 collection = db[collection_name]
205 for fixture in collection_fixtures:
206 collection.insert(fixture)
207
208
209 def assert_db_meets_expected(db, expected):
210 """
211 Assert a database contains the things we expect it to.
212
213 Objects are found via 'id', so you should make sure your document
214 has an id.
215
216 Args:
217 - db: pymongo or mongokit database connection
218 - expected: the data we expect. Formatted like:
219 {'collection_name': [
220 {'id': 'foo',
221 'some_field': 'some_value'},]}
222 """
223 for collection_name, collection_data in expected.iteritems():
224 collection = db[collection_name]
225 for expected_document in collection_data:
226 document = collection.find_one({'id': expected_document['id']})
227 assert document is not None # make sure it exists
228 assert document == expected_document # make sure it matches
229
230
231 def fixture_add_user(username=u'chris', password=u'toast',
232 active_user=True):
233 # Reuse existing user or create a new one
234 test_user = User.query.filter_by(username=username).first()
235 if test_user is None:
236 test_user = User()
237 test_user.username = username
238 test_user.email = username + u'@example.com'
239 if password is not None:
240 test_user.pw_hash = bcrypt_gen_password_hash(password)
241 if active_user:
242 test_user.email_verified = True
243 test_user.status = u'active'
244
245 test_user.save()
246
247 # Reload
248 test_user = User.query.filter_by(username=username).first()
249
250 # ... and detach from session:
251 Session.expunge(test_user)
252
253 return test_user
254
255
256 def fixture_media_entry(title=u"Some title", slug=None,
257 uploader=None, save=True, gen_slug=True):
258 entry = MediaEntry()
259 entry.title = title
260 entry.slug = slug
261 entry.uploader = uploader or fixture_add_user().id
262 entry.media_type = u'image'
263
264 if gen_slug:
265 entry.generate_slug()
266 if save:
267 entry.save()
268
269 return entry
270
271
272 def fixture_add_collection(name=u"My first Collection", user=None):
273 if user is None:
274 user = fixture_add_user()
275 coll = Collection.query.filter_by(creator=user.id, title=name).first()
276 if coll is not None:
277 return coll
278 coll = Collection()
279 coll.creator = user.id
280 coll.title = name
281 coll.generate_slug()
282 coll.save()
283
284 # Reload
285 Session.refresh(coll)
286
287 # ... and detach from session:
288 Session.expunge(coll)
289
290 return coll