replace webob.Response with werkzeug Response
[mediagoblin.git] / mediagoblin / tests / tools.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import os
19 import pkg_resources
20 import shutil
21
22 from functools import wraps
23
24 from paste.deploy import loadapp
25 from webtest import TestApp
26
27 from mediagoblin import mg_globals
28 from mediagoblin.tools import testing
29 from mediagoblin.init.config import read_mediagoblin_config
30 from mediagoblin.db.open import setup_connection_and_db_from_config
31 from mediagoblin.db.sql.base import Session
32 from mediagoblin.meddleware import BaseMeddleware
33 from mediagoblin.auth.lib import bcrypt_gen_password_hash
34 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
35 from mediagoblin.init.celery import setup_celery_app
36
37
38 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
39 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
40 'mediagoblin.tests', 'test_paste.ini')
41 TEST_APP_CONFIG = pkg_resources.resource_filename(
42 'mediagoblin.tests', 'test_mgoblin_app.ini')
43 TEST_USER_DEV = pkg_resources.resource_filename(
44 'mediagoblin.tests', 'test_user_dev')
45 MGOBLIN_APP = None
46
47 USER_DEV_DIRECTORIES_TO_SETUP = [
48 'media/public', 'media/queue',
49 'beaker/sessions/data', 'beaker/sessions/lock']
50
51 BAD_CELERY_MESSAGE = """\
52 Sorry, you *absolutely* must run nosetests with the
53 mediagoblin.init.celery.from_tests module. Like so:
54 $ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
55
56
57 class BadCeleryEnviron(Exception): pass
58
59
60 class TestingMeddleware(BaseMeddleware):
61 """
62 Meddleware for the Unit tests
63
64 It might make sense to perform some tests on all
65 requests/responses. Or prepare them in a special
66 manner. For example all html responses could be tested
67 for being valid html *after* being rendered.
68
69 This module is getting inserted at the front of the
70 meddleware list, which means: requests are handed here
71 first, responses last. So this wraps up the "normal"
72 app.
73
74 If you need to add a test, either add it directly to
75 the appropiate process_request or process_response, or
76 create a new method and call it from process_*.
77 """
78
79 def process_response(self, request, response):
80 # All following tests should be for html only!
81 if getattr(response, 'content_type', None) != "text/html":
82 # Get out early
83 return
84
85 # If the template contains a reference to
86 # /mgoblin_static/ instead of using
87 # /request.staticdirect(), error out here.
88 # This could probably be implemented as a grep on
89 # the shipped templates easier...
90 if response.text.find("/mgoblin_static/") >= 0:
91 raise AssertionError(
92 "Response HTML contains reference to /mgoblin_static/ "
93 "instead of staticdirect. Request was for: "
94 + request.full_path)
95
96 return
97
98
99 def suicide_if_bad_celery_environ():
100 if not os.environ.get('CELERY_CONFIG_MODULE') == \
101 'mediagoblin.init.celery.from_tests':
102 raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
103
104
105 def get_test_app(dump_old_app=True):
106 suicide_if_bad_celery_environ()
107
108 # Make sure we've turned on testing
109 testing._activate_testing()
110
111 # Leave this imported as it sets up celery.
112 from mediagoblin.init.celery import from_tests
113
114 global MGOBLIN_APP
115
116 # Just return the old app if that exists and it's okay to set up
117 # and return
118 if MGOBLIN_APP and not dump_old_app:
119 return MGOBLIN_APP
120
121 Session.rollback()
122 Session.remove()
123
124 # Remove and reinstall user_dev directories
125 if os.path.exists(TEST_USER_DEV):
126 shutil.rmtree(TEST_USER_DEV)
127
128 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
129 full_dir = os.path.join(TEST_USER_DEV, directory)
130 os.makedirs(full_dir)
131
132 # Get app config
133 global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
134 app_config = global_config['mediagoblin']
135
136 # Run database setup/migrations
137 run_dbupdate(app_config, global_config)
138
139 # setup app and return
140 test_app = loadapp(
141 'config:' + TEST_SERVER_CONFIG)
142
143 # Re-setup celery
144 setup_celery_app(app_config, global_config)
145
146 # Insert the TestingMeddleware, which can do some
147 # sanity checks on every request/response.
148 # Doing it this way is probably not the cleanest way.
149 # We'll fix it, when we have plugins!
150 mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
151
152 app = TestApp(test_app)
153 MGOBLIN_APP = app
154
155 return app
156
157
158 def setup_fresh_app(func):
159 """
160 Decorator to setup a fresh test application for this function.
161
162 Cleans out test buckets and passes in a new, fresh test_app.
163 """
164 @wraps(func)
165 def wrapper(*args, **kwargs):
166 test_app = get_test_app()
167 testing.clear_test_buckets()
168 return func(test_app, *args, **kwargs)
169
170 return wrapper
171
172
173 def install_fixtures_simple(db, fixtures):
174 """
175 Very simply install fixtures in the database
176 """
177 for collection_name, collection_fixtures in fixtures.iteritems():
178 collection = db[collection_name]
179 for fixture in collection_fixtures:
180 collection.insert(fixture)
181
182
183 def assert_db_meets_expected(db, expected):
184 """
185 Assert a database contains the things we expect it to.
186
187 Objects are found via 'id', so you should make sure your document
188 has an id.
189
190 Args:
191 - db: pymongo or mongokit database connection
192 - expected: the data we expect. Formatted like:
193 {'collection_name': [
194 {'id': 'foo',
195 'some_field': 'some_value'},]}
196 """
197 for collection_name, collection_data in expected.iteritems():
198 collection = db[collection_name]
199 for expected_document in collection_data:
200 document = collection.find_one({'id': expected_document['id']})
201 assert document is not None # make sure it exists
202 assert document == expected_document # make sure it matches
203
204
205 def fixture_add_user(username=u'chris', password='toast',
206 active_user=True):
207 test_user = mg_globals.database.User()
208 test_user.username = username
209 test_user.email = username + u'@example.com'
210 if password is not None:
211 test_user.pw_hash = bcrypt_gen_password_hash(password)
212 if active_user:
213 test_user.email_verified = True
214 test_user.status = u'active'
215
216 test_user.save()
217
218 # Reload
219 test_user = mg_globals.database.User.find_one({'username': username})
220
221 # ... and detach from session:
222 from mediagoblin.db.sql.base import Session
223 Session.expunge(test_user)
224
225 return test_user