Fix errors in collection views
[mediagoblin.git] / mediagoblin / tests / tools.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import pkg_resources
20 import shutil
21
22 from functools import wraps
23
24 from paste.deploy import loadapp
25 from webtest import TestApp
26
27 from mediagoblin import mg_globals
28 from mediagoblin.db.models import User, MediaEntry, Collection
29 from mediagoblin.tools import testing
30 from mediagoblin.init.config import read_mediagoblin_config
31 from mediagoblin.db.open import setup_connection_and_db_from_config
32 from mediagoblin.db.base import Session
33 from mediagoblin.meddleware import BaseMeddleware
34 from mediagoblin.auth.lib import bcrypt_gen_password_hash
35 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
36 from mediagoblin.init.celery import setup_celery_app
37
38
39 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
40 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
41 'mediagoblin.tests', 'test_paste.ini')
42 TEST_APP_CONFIG = pkg_resources.resource_filename(
43 'mediagoblin.tests', 'test_mgoblin_app.ini')
44 TEST_USER_DEV = pkg_resources.resource_filename(
45 'mediagoblin.tests', 'test_user_dev')
46 MGOBLIN_APP = None
47
48 USER_DEV_DIRECTORIES_TO_SETUP = [
49 'media/public', 'media/queue',
50 'beaker/sessions/data', 'beaker/sessions/lock']
51
52 BAD_CELERY_MESSAGE = """\
53 Sorry, you *absolutely* must run nosetests with the
54 mediagoblin.init.celery.from_tests module. Like so:
55 $ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
56
57
58 class BadCeleryEnviron(Exception): pass
59
60
61 class TestingMeddleware(BaseMeddleware):
62 """
63 Meddleware for the Unit tests
64
65 It might make sense to perform some tests on all
66 requests/responses. Or prepare them in a special
67 manner. For example all html responses could be tested
68 for being valid html *after* being rendered.
69
70 This module is getting inserted at the front of the
71 meddleware list, which means: requests are handed here
72 first, responses last. So this wraps up the "normal"
73 app.
74
75 If you need to add a test, either add it directly to
76 the appropiate process_request or process_response, or
77 create a new method and call it from process_*.
78 """
79
80 def process_response(self, request, response):
81 # All following tests should be for html only!
82 if getattr(response, 'content_type', None) != "text/html":
83 # Get out early
84 return
85
86 # If the template contains a reference to
87 # /mgoblin_static/ instead of using
88 # /request.staticdirect(), error out here.
89 # This could probably be implemented as a grep on
90 # the shipped templates easier...
91 if response.text.find("/mgoblin_static/") >= 0:
92 raise AssertionError(
93 "Response HTML contains reference to /mgoblin_static/ "
94 "instead of staticdirect. Request was for: "
95 + request.full_path)
96
97 return
98
99
100 def suicide_if_bad_celery_environ():
101 if not os.environ.get('CELERY_CONFIG_MODULE') == \
102 'mediagoblin.init.celery.from_tests':
103 raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
104
105
106 def get_app(dump_old_app=True):
107 suicide_if_bad_celery_environ()
108
109 # Make sure we've turned on testing
110 testing._activate_testing()
111
112 # Leave this imported as it sets up celery.
113 from mediagoblin.init.celery import from_tests
114
115 global MGOBLIN_APP
116
117 # Just return the old app if that exists and it's okay to set up
118 # and return
119 if MGOBLIN_APP and not dump_old_app:
120 return MGOBLIN_APP
121
122 Session.rollback()
123 Session.remove()
124
125 # Remove and reinstall user_dev directories
126 if os.path.exists(TEST_USER_DEV):
127 shutil.rmtree(TEST_USER_DEV)
128
129 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
130 full_dir = os.path.join(TEST_USER_DEV, directory)
131 os.makedirs(full_dir)
132
133 # Get app config
134 global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
135 app_config = global_config['mediagoblin']
136
137 # Run database setup/migrations
138 run_dbupdate(app_config, global_config)
139
140 # setup app and return
141 test_app = loadapp(
142 'config:' + TEST_SERVER_CONFIG)
143
144 # Re-setup celery
145 setup_celery_app(app_config, global_config)
146
147 # Insert the TestingMeddleware, which can do some
148 # sanity checks on every request/response.
149 # Doing it this way is probably not the cleanest way.
150 # We'll fix it, when we have plugins!
151 mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
152
153 app = TestApp(test_app)
154 MGOBLIN_APP = app
155
156 return app
157
158
159 def setup_fresh_app(func):
160 """
161 Decorator to setup a fresh test application for this function.
162
163 Cleans out test buckets and passes in a new, fresh test_app.
164 """
165 @wraps(func)
166 def wrapper(*args, **kwargs):
167 test_app = get_app()
168 testing.clear_test_buckets()
169 return func(test_app, *args, **kwargs)
170
171 return wrapper
172
173
174 def install_fixtures_simple(db, fixtures):
175 """
176 Very simply install fixtures in the database
177 """
178 for collection_name, collection_fixtures in fixtures.iteritems():
179 collection = db[collection_name]
180 for fixture in collection_fixtures:
181 collection.insert(fixture)
182
183
184 def assert_db_meets_expected(db, expected):
185 """
186 Assert a database contains the things we expect it to.
187
188 Objects are found via 'id', so you should make sure your document
189 has an id.
190
191 Args:
192 - db: pymongo or mongokit database connection
193 - expected: the data we expect. Formatted like:
194 {'collection_name': [
195 {'id': 'foo',
196 'some_field': 'some_value'},]}
197 """
198 for collection_name, collection_data in expected.iteritems():
199 collection = db[collection_name]
200 for expected_document in collection_data:
201 document = collection.find_one({'id': expected_document['id']})
202 assert document is not None # make sure it exists
203 assert document == expected_document # make sure it matches
204
205
206 def fixture_add_user(username=u'chris', password=u'toast',
207 active_user=True):
208 # Reuse existing user or create a new one
209 test_user = User.query.filter_by(username=username).first()
210 if test_user is None:
211 test_user = User()
212 test_user.username = username
213 test_user.email = username + u'@example.com'
214 if password is not None:
215 test_user.pw_hash = bcrypt_gen_password_hash(password)
216 if active_user:
217 test_user.email_verified = True
218 test_user.status = u'active'
219
220 test_user.save()
221
222 # Reload
223 test_user = User.query.filter_by(username=username).first()
224
225 # ... and detach from session:
226 Session.expunge(test_user)
227
228 return test_user
229
230
231 def fixture_media_entry(title=u"Some title", slug=None,
232 uploader=None, save=True, gen_slug=True):
233 entry = MediaEntry()
234 entry.title = title
235 entry.slug = slug
236 entry.uploader = uploader or fixture_add_user().id
237 entry.media_type = u'image'
238
239 if gen_slug:
240 entry.generate_slug()
241 if save:
242 entry.save()
243
244 return entry
245
246
247 def fixture_add_collection(name=u"My first Collection", user=None):
248 if user is None:
249 user = fixture_add_user()
250 coll = Collection.query.filter_by(creator=user.id, title=name).first()
251 if coll is not None:
252 return coll
253 coll = Collection()
254 coll.creator = user.id
255 coll.title = name
256 coll.generate_slug()
257 coll.save()
258
259 # Reload
260 Session.refresh(coll)
261
262 # ... and detach from session:
263 Session.expunge(coll)
264
265 return coll