Merge remote branch 'remotes/aaronw/bug614_verification_crash'
[mediagoblin.git] / mediagoblin / tests / tools.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16
17
18 import pkg_resources
19 import os, shutil
20
21 from paste.deploy import loadapp
22 from webtest import TestApp
23
24 from mediagoblin import mg_globals
25 from mediagoblin.tools import testing
26 from mediagoblin.init.config import read_mediagoblin_config
27 from mediagoblin.decorators import _make_safe
28 from mediagoblin.db.open import setup_connection_and_db_from_config
29
30
31 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
32 TEST_SERVER_CONFIG = pkg_resources.resource_filename(
33 'mediagoblin.tests', 'test_paste.ini')
34 TEST_APP_CONFIG = pkg_resources.resource_filename(
35 'mediagoblin.tests', 'test_mgoblin_app.ini')
36 TEST_USER_DEV = pkg_resources.resource_filename(
37 'mediagoblin.tests', 'test_user_dev')
38 MGOBLIN_APP = None
39
40 USER_DEV_DIRECTORIES_TO_SETUP = [
41 'media/public', 'media/queue',
42 'beaker/sessions/data', 'beaker/sessions/lock']
43
44 BAD_CELERY_MESSAGE = """\
45 Sorry, you *absolutely* must run nosetests with the
46 mediagoblin.init.celery.from_tests module. Like so:
47 $ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
48
49
50 class BadCeleryEnviron(Exception): pass
51
52
53 class TestingMiddleware(object):
54 """
55 Middleware for the Unit tests
56
57 It might make sense to perform some tests on all
58 requests/responses. Or prepare them in a special
59 manner. For example all html responses could be tested
60 for being valid html *after* being rendered.
61
62 This module is getting inserted at the front of the
63 middleware list, which means: requests are handed here
64 first, responses last. So this wraps up the "normal"
65 app.
66
67 If you need to add a test, either add it directly to
68 the appropiate process_request or process_response, or
69 create a new method and call it from process_*.
70 """
71
72 def __init__(self, mg_app):
73 self.app = mg_app
74
75 def process_request(self, request):
76 pass
77
78 def process_response(self, request, response):
79 # All following tests should be for html only!
80 if response.content_type != "text/html":
81 # Get out early
82 return
83
84 # If the template contains a reference to
85 # /mgoblin_static/ instead of using
86 # /request.staticdirect(), error out here.
87 # This could probably be implemented as a grep on
88 # the shipped templates easier...
89 if response.text.find("/mgoblin_static/") >= 0:
90 raise AssertionError(
91 "Response HTML contains reference to /mgoblin_static/ "
92 "instead of staticdirect. Request was for: "
93 + request.full_path)
94
95 return
96
97
98 def suicide_if_bad_celery_environ():
99 if not os.environ.get('CELERY_CONFIG_MODULE') == \
100 'mediagoblin.init.celery.from_tests':
101 raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
102
103
104 def get_test_app(dump_old_app=True):
105 suicide_if_bad_celery_environ()
106
107 # Make sure we've turned on testing
108 testing._activate_testing()
109
110 # Leave this imported as it sets up celery.
111 from mediagoblin.init.celery import from_tests
112
113 global MGOBLIN_APP
114
115 # Just return the old app if that exists and it's okay to set up
116 # and return
117 if MGOBLIN_APP and not dump_old_app:
118 return MGOBLIN_APP
119
120 # Remove and reinstall user_dev directories
121 if os.path.exists(TEST_USER_DEV):
122 shutil.rmtree(TEST_USER_DEV)
123
124 for directory in USER_DEV_DIRECTORIES_TO_SETUP:
125 full_dir = os.path.join(TEST_USER_DEV, directory)
126 os.makedirs(full_dir)
127
128 # Get app config
129 global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
130 app_config = global_config['mediagoblin']
131
132 # Wipe database
133 # @@: For now we're dropping collections, but we could also just
134 # collection.remove() ?
135 connection, db = setup_connection_and_db_from_config(app_config)
136 assert db.name == MEDIAGOBLIN_TEST_DB_NAME
137
138 collections_to_wipe = [
139 collection
140 for collection in db.collection_names()
141 if not collection.startswith('system.')]
142
143 for collection in collections_to_wipe:
144 db.drop_collection(collection)
145
146 # TODO: Drop and recreate indexes
147
148 # setup app and return
149 test_app = loadapp(
150 'config:' + TEST_SERVER_CONFIG)
151
152 # Insert the TestingMiddleware, which can do some
153 # sanity checks on every request/response.
154 # Doing it this way is probably not the cleanest way.
155 # We'll fix it, when we have plugins!
156 mg_globals.app.middleware.insert(0, TestingMiddleware(mg_globals.app))
157
158 app = TestApp(test_app)
159 MGOBLIN_APP = app
160
161 return app
162
163
164 def setup_fresh_app(func):
165 """
166 Decorator to setup a fresh test application for this function.
167
168 Cleans out test buckets and passes in a new, fresh test_app.
169 """
170 def wrapper(*args, **kwargs):
171 test_app = get_test_app()
172 testing.clear_test_buckets()
173 return func(test_app, *args, **kwargs)
174
175 return _make_safe(wrapper, func)
176
177
178 def install_fixtures_simple(db, fixtures):
179 """
180 Very simply install fixtures in the database
181 """
182 for collection_name, collection_fixtures in fixtures.iteritems():
183 collection = db[collection_name]
184 for fixture in collection_fixtures:
185 collection.insert(fixture)
186
187
188 def assert_db_meets_expected(db, expected):
189 """
190 Assert a database contains the things we expect it to.
191
192 Objects are found via '_id', so you should make sure your document
193 has an _id.
194
195 Args:
196 - db: pymongo or mongokit database connection
197 - expected: the data we expect. Formatted like:
198 {'collection_name': [
199 {'_id': 'foo',
200 'some_field': 'some_value'},]}
201 """
202 for collection_name, collection_data in expected.iteritems():
203 collection = db[collection_name]
204 for expected_document in collection_data:
205 document = collection.find_one({'_id': expected_document['_id']})
206 assert document is not None # make sure it exists
207 assert document == expected_document # make sure it matches