| 1 | # GNU MediaGoblin -- federated, autonomous media hosting |
| 2 | # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. |
| 3 | # |
| 4 | # This program is free software: you can redistribute it and/or modify |
| 5 | # it under the terms of the GNU Affero General Public License as published by |
| 6 | # the Free Software Foundation, either version 3 of the License, or |
| 7 | # (at your option) any later version. |
| 8 | # |
| 9 | # This program is distributed in the hope that it will be useful, |
| 10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | # GNU Affero General Public License for more details. |
| 13 | # |
| 14 | # You should have received a copy of the GNU Affero General Public License |
| 15 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 16 | |
| 17 | |
| 18 | import sys |
| 19 | import os |
| 20 | import pkg_resources |
| 21 | import shutil |
| 22 | |
| 23 | from functools import wraps |
| 24 | |
| 25 | from paste.deploy import loadapp |
| 26 | from webtest import TestApp |
| 27 | |
| 28 | from mediagoblin import mg_globals |
| 29 | from mediagoblin.db.models import User, MediaEntry, Collection |
| 30 | from mediagoblin.tools import testing |
| 31 | from mediagoblin.init.config import read_mediagoblin_config |
| 32 | from mediagoblin.db.base import Session |
| 33 | from mediagoblin.meddleware import BaseMeddleware |
| 34 | from mediagoblin.auth.lib import bcrypt_gen_password_hash |
| 35 | from mediagoblin.gmg_commands.dbupdate import run_dbupdate |
| 36 | |
| 37 | |
| 38 | MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' |
| 39 | TEST_SERVER_CONFIG = pkg_resources.resource_filename( |
| 40 | 'mediagoblin.tests', 'test_paste.ini') |
| 41 | TEST_APP_CONFIG = pkg_resources.resource_filename( |
| 42 | 'mediagoblin.tests', 'test_mgoblin_app.ini') |
| 43 | |
| 44 | |
| 45 | USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue'] |
| 46 | |
| 47 | |
| 48 | class TestingMeddleware(BaseMeddleware): |
| 49 | """ |
| 50 | Meddleware for the Unit tests |
| 51 | |
| 52 | It might make sense to perform some tests on all |
| 53 | requests/responses. Or prepare them in a special |
| 54 | manner. For example all html responses could be tested |
| 55 | for being valid html *after* being rendered. |
| 56 | |
| 57 | This module is getting inserted at the front of the |
| 58 | meddleware list, which means: requests are handed here |
| 59 | first, responses last. So this wraps up the "normal" |
| 60 | app. |
| 61 | |
| 62 | If you need to add a test, either add it directly to |
| 63 | the appropiate process_request or process_response, or |
| 64 | create a new method and call it from process_*. |
| 65 | """ |
| 66 | |
| 67 | def process_response(self, request, response): |
| 68 | # All following tests should be for html only! |
| 69 | if getattr(response, 'content_type', None) != "text/html": |
| 70 | # Get out early |
| 71 | return |
| 72 | |
| 73 | # If the template contains a reference to |
| 74 | # /mgoblin_static/ instead of using |
| 75 | # /request.staticdirect(), error out here. |
| 76 | # This could probably be implemented as a grep on |
| 77 | # the shipped templates easier... |
| 78 | if response.text.find("/mgoblin_static/") >= 0: |
| 79 | raise AssertionError( |
| 80 | "Response HTML contains reference to /mgoblin_static/ " |
| 81 | "instead of staticdirect. Request was for: " |
| 82 | + request.full_path) |
| 83 | |
| 84 | return |
| 85 | |
| 86 | |
| 87 | def get_app(request, paste_config=None, mgoblin_config=None): |
| 88 | """Create a MediaGoblin app for testing. |
| 89 | |
| 90 | Args: |
| 91 | - request: Not an http request, but a pytest fixture request. We |
| 92 | use this to make temporary directories that pytest |
| 93 | automatically cleans up as needed. |
| 94 | - paste_config: particular paste config used by this application. |
| 95 | - mgoblin_config: particular mediagoblin config used by this |
| 96 | application. |
| 97 | """ |
| 98 | paste_config = paste_config or TEST_SERVER_CONFIG |
| 99 | mgoblin_config = mgoblin_config or TEST_APP_CONFIG |
| 100 | |
| 101 | # This is the directory we're copying the paste/mgoblin config stuff into |
| 102 | run_dir = request.config._tmpdirhandler.mktemp( |
| 103 | 'mgoblin_app', numbered=True) |
| 104 | user_dev_dir = run_dir.mkdir('user_dev').strpath |
| 105 | |
| 106 | new_paste_config = run_dir.join('paste.ini').strpath |
| 107 | new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath |
| 108 | shutil.copyfile(paste_config, new_paste_config) |
| 109 | shutil.copyfile(mgoblin_config, new_mgoblin_config) |
| 110 | |
| 111 | Session.rollback() |
| 112 | Session.remove() |
| 113 | |
| 114 | # install user_dev directories |
| 115 | for directory in USER_DEV_DIRECTORIES_TO_SETUP: |
| 116 | full_dir = os.path.join(user_dev_dir, directory) |
| 117 | os.makedirs(full_dir) |
| 118 | |
| 119 | # Get app config |
| 120 | global_config, validation_result = read_mediagoblin_config(new_mgoblin_config) |
| 121 | app_config = global_config['mediagoblin'] |
| 122 | |
| 123 | # Run database setup/migrations |
| 124 | run_dbupdate(app_config, global_config) |
| 125 | |
| 126 | # setup app and return |
| 127 | test_app = loadapp( |
| 128 | 'config:' + new_paste_config) |
| 129 | |
| 130 | # Insert the TestingMeddleware, which can do some |
| 131 | # sanity checks on every request/response. |
| 132 | # Doing it this way is probably not the cleanest way. |
| 133 | # We'll fix it, when we have plugins! |
| 134 | mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app)) |
| 135 | |
| 136 | app = TestApp(test_app) |
| 137 | |
| 138 | return app |
| 139 | |
| 140 | |
| 141 | def install_fixtures_simple(db, fixtures): |
| 142 | """ |
| 143 | Very simply install fixtures in the database |
| 144 | """ |
| 145 | for collection_name, collection_fixtures in fixtures.iteritems(): |
| 146 | collection = db[collection_name] |
| 147 | for fixture in collection_fixtures: |
| 148 | collection.insert(fixture) |
| 149 | |
| 150 | |
| 151 | def assert_db_meets_expected(db, expected): |
| 152 | """ |
| 153 | Assert a database contains the things we expect it to. |
| 154 | |
| 155 | Objects are found via 'id', so you should make sure your document |
| 156 | has an id. |
| 157 | |
| 158 | Args: |
| 159 | - db: pymongo or mongokit database connection |
| 160 | - expected: the data we expect. Formatted like: |
| 161 | {'collection_name': [ |
| 162 | {'id': 'foo', |
| 163 | 'some_field': 'some_value'},]} |
| 164 | """ |
| 165 | for collection_name, collection_data in expected.iteritems(): |
| 166 | collection = db[collection_name] |
| 167 | for expected_document in collection_data: |
| 168 | document = collection.find_one({'id': expected_document['id']}) |
| 169 | assert document is not None # make sure it exists |
| 170 | assert document == expected_document # make sure it matches |
| 171 | |
| 172 | |
| 173 | def fixture_add_user(username=u'chris', password=u'toast', |
| 174 | active_user=True): |
| 175 | # Reuse existing user or create a new one |
| 176 | test_user = User.query.filter_by(username=username).first() |
| 177 | if test_user is None: |
| 178 | test_user = User() |
| 179 | test_user.username = username |
| 180 | test_user.email = username + u'@example.com' |
| 181 | if password is not None: |
| 182 | test_user.pw_hash = bcrypt_gen_password_hash(password) |
| 183 | if active_user: |
| 184 | test_user.email_verified = True |
| 185 | test_user.status = u'active' |
| 186 | |
| 187 | test_user.save() |
| 188 | |
| 189 | # Reload |
| 190 | test_user = User.query.filter_by(username=username).first() |
| 191 | |
| 192 | # ... and detach from session: |
| 193 | Session.expunge(test_user) |
| 194 | |
| 195 | return test_user |
| 196 | |
| 197 | |
| 198 | def fixture_media_entry(title=u"Some title", slug=None, |
| 199 | uploader=None, save=True, gen_slug=True): |
| 200 | entry = MediaEntry() |
| 201 | entry.title = title |
| 202 | entry.slug = slug |
| 203 | entry.uploader = uploader or fixture_add_user().id |
| 204 | entry.media_type = u'image' |
| 205 | |
| 206 | if gen_slug: |
| 207 | entry.generate_slug() |
| 208 | if save: |
| 209 | entry.save() |
| 210 | |
| 211 | return entry |
| 212 | |
| 213 | |
| 214 | def fixture_add_collection(name=u"My first Collection", user=None): |
| 215 | if user is None: |
| 216 | user = fixture_add_user() |
| 217 | coll = Collection.query.filter_by(creator=user.id, title=name).first() |
| 218 | if coll is not None: |
| 219 | return coll |
| 220 | coll = Collection() |
| 221 | coll.creator = user.id |
| 222 | coll.title = name |
| 223 | coll.generate_slug() |
| 224 | coll.save() |
| 225 | |
| 226 | # Reload |
| 227 | Session.refresh(coll) |
| 228 | |
| 229 | # ... and detach from session: |
| 230 | Session.expunge(coll) |
| 231 | |
| 232 | return coll |
| 233 | |