X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;ds=sidebyside;f=mediagoblin%2Ftests%2Ftools.py;h=cc4a7addf66c8c10c6b3aeb67a89703aae882002;hb=61e39d90e58b8cea5c837d6da5731ab42127c642;hp=01813e96ea4887e4882a1d2a87e755fd1fa2e993;hpb=e4113ad5b4226a7398ea0b629c2eae43b2f9c797;p=mediagoblin.git diff --git a/mediagoblin/tests/tools.py b/mediagoblin/tests/tools.py index 01813e96..cc4a7add 100644 --- a/mediagoblin/tests/tools.py +++ b/mediagoblin/tests/tools.py @@ -1,5 +1,5 @@ # GNU MediaGoblin -- federated, autonomous media hosting -# Copyright (C) 2011 MediaGoblin contributors. See AUTHORS. +# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,18 +15,25 @@ # along with this program. If not, see . +import os import pkg_resources -import os, shutil +import shutil + +from functools import wraps from paste.deploy import loadapp from webtest import TestApp from mediagoblin import mg_globals +from mediagoblin.db.models import User, MediaEntry, Collection from mediagoblin.tools import testing from mediagoblin.init.config import read_mediagoblin_config -from mediagoblin.decorators import _make_safe from mediagoblin.db.open import setup_connection_and_db_from_config +from mediagoblin.db.base import Session from mediagoblin.meddleware import BaseMeddleware +from mediagoblin.auth.lib import bcrypt_gen_password_hash +from mediagoblin.gmg_commands.dbupdate import run_dbupdate +from mediagoblin.init.celery import setup_celery_app MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__' @@ -54,7 +61,7 @@ class BadCeleryEnviron(Exception): pass class TestingMeddleware(BaseMeddleware): """ Meddleware for the Unit tests - + It might make sense to perform some tests on all requests/responses. Or prepare them in a special manner. For example all html responses could be tested @@ -72,7 +79,7 @@ class TestingMeddleware(BaseMeddleware): def process_response(self, request, response): # All following tests should be for html only! - if response.content_type != "text/html": + if getattr(response, 'content_type', None) != "text/html": # Get out early return @@ -94,9 +101,9 @@ def suicide_if_bad_celery_environ(): if not os.environ.get('CELERY_CONFIG_MODULE') == \ 'mediagoblin.init.celery.from_tests': raise BadCeleryEnviron(BAD_CELERY_MESSAGE) - -def get_test_app(dump_old_app=True): + +def get_app(dump_old_app=True): suicide_if_bad_celery_environ() # Make sure we've turned on testing @@ -112,6 +119,9 @@ def get_test_app(dump_old_app=True): if MGOBLIN_APP and not dump_old_app: return MGOBLIN_APP + Session.rollback() + Session.remove() + # Remove and reinstall user_dev directories if os.path.exists(TEST_USER_DEV): shutil.rmtree(TEST_USER_DEV) @@ -124,26 +134,16 @@ def get_test_app(dump_old_app=True): global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG) app_config = global_config['mediagoblin'] - # Wipe database - # @@: For now we're dropping collections, but we could also just - # collection.remove() ? - connection, db = setup_connection_and_db_from_config(app_config) - assert db.name == MEDIAGOBLIN_TEST_DB_NAME - - collections_to_wipe = [ - collection - for collection in db.collection_names() - if not collection.startswith('system.')] - - for collection in collections_to_wipe: - db.drop_collection(collection) - - # TODO: Drop and recreate indexes + # Run database setup/migrations + run_dbupdate(app_config, global_config) # setup app and return test_app = loadapp( 'config:' + TEST_SERVER_CONFIG) + # Re-setup celery + setup_celery_app(app_config, global_config) + # Insert the TestingMeddleware, which can do some # sanity checks on every request/response. # Doing it this way is probably not the cleanest way. @@ -162,12 +162,13 @@ def setup_fresh_app(func): Cleans out test buckets and passes in a new, fresh test_app. """ + @wraps(func) def wrapper(*args, **kwargs): - test_app = get_test_app() + test_app = get_app() testing.clear_test_buckets() return func(test_app, *args, **kwargs) - return _make_safe(wrapper, func) + return wrapper def install_fixtures_simple(db, fixtures): @@ -184,19 +185,81 @@ def assert_db_meets_expected(db, expected): """ Assert a database contains the things we expect it to. - Objects are found via '_id', so you should make sure your document - has an _id. + Objects are found via 'id', so you should make sure your document + has an id. Args: - db: pymongo or mongokit database connection - expected: the data we expect. Formatted like: {'collection_name': [ - {'_id': 'foo', + {'id': 'foo', 'some_field': 'some_value'},]} """ for collection_name, collection_data in expected.iteritems(): collection = db[collection_name] for expected_document in collection_data: - document = collection.find_one({'_id': expected_document['_id']}) + document = collection.find_one({'id': expected_document['id']}) assert document is not None # make sure it exists assert document == expected_document # make sure it matches + + +def fixture_add_user(username=u'chris', password=u'toast', + active_user=True): + # Reuse existing user or create a new one + test_user = User.query.filter_by(username=username).first() + if test_user is None: + test_user = User() + test_user.username = username + test_user.email = username + u'@example.com' + if password is not None: + test_user.pw_hash = bcrypt_gen_password_hash(password) + if active_user: + test_user.email_verified = True + test_user.status = u'active' + + test_user.save() + + # Reload + test_user = User.query.filter_by(username=username).first() + + # ... and detach from session: + Session.expunge(test_user) + + return test_user + + +def fixture_media_entry(title=u"Some title", slug=None, + uploader=None, save=True, gen_slug=True): + entry = MediaEntry() + entry.title = title + entry.slug = slug + entry.uploader = uploader or fixture_add_user().id + entry.media_type = u'image' + + if gen_slug: + entry.generate_slug() + if save: + entry.save() + + return entry + + +def fixture_add_collection(name=u"My first Collection", user=None): + if user is None: + user = fixture_add_user() + coll = Collection.query.filter_by(creator=user.id, title=name).first() + if coll is not None: + return coll + coll = Collection() + coll.creator = user.id + coll.title = name + coll.generate_slug() + coll.save() + + # Reload + Session.refresh(coll) + + # ... and detach from session: + Session.expunge(coll) + + return coll