It's 2012 all up in here
[mediagoblin.git] / mediagoblin / tests / tools.py
index 342b54b7657e73265bd0b533ba4c38d74f36035e..7cf355b04ac87fa389ac83c767a74cf2f41156b7 100644 (file)
@@ -1,5 +1,5 @@
 # GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 Free Software Foundation, Inc
+# Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
 import pkg_resources
 import os, shutil
 
-from paste.deploy import appconfig, loadapp
+from paste.deploy import loadapp
 from webtest import TestApp
 
-from mediagoblin import util
+from mediagoblin import mg_globals
+from mediagoblin.tools import testing
+from mediagoblin.init.config import read_mediagoblin_config
 from mediagoblin.decorators import _make_safe
 from mediagoblin.db.open import setup_connection_and_db_from_config
+from mediagoblin.meddleware import BaseMeddleware
+from mediagoblin.auth.lib import bcrypt_gen_password_hash
 
 
-MEDIAGOBLIN_TEST_DB_NAME = '__mediagoblinunittests__'
+MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
+TEST_SERVER_CONFIG = pkg_resources.resource_filename(
+    'mediagoblin.tests', 'test_paste.ini')
 TEST_APP_CONFIG = pkg_resources.resource_filename(
-    'mediagoblin.tests', 'mgoblin_test_app.ini')
+    'mediagoblin.tests', 'test_mgoblin_app.ini')
 TEST_USER_DEV = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_user_dev')
 MGOBLIN_APP = None
@@ -37,17 +43,70 @@ USER_DEV_DIRECTORIES_TO_SETUP = [
     'media/public', 'media/queue',
     'beaker/sessions/data', 'beaker/sessions/lock']
 
+BAD_CELERY_MESSAGE = """\
+Sorry, you *absolutely* must run nosetests with the
+mediagoblin.init.celery.from_tests module.  Like so:
+$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
+
 
 class BadCeleryEnviron(Exception): pass
 
 
-def get_test_app(dump_old_app=True):
+class TestingMeddleware(BaseMeddleware):
+    """
+    Meddleware for the Unit tests
+    
+    It might make sense to perform some tests on all
+    requests/responses. Or prepare them in a special
+    manner. For example all html responses could be tested
+    for being valid html *after* being rendered.
+
+    This module is getting inserted at the front of the
+    meddleware list, which means: requests are handed here
+    first, responses last. So this wraps up the "normal"
+    app.
+
+    If you need to add a test, either add it directly to
+    the appropiate process_request or process_response, or
+    create a new method and call it from process_*.
+    """
+
+    def process_response(self, request, response):
+        # All following tests should be for html only!
+        if response.content_type != "text/html":
+            # Get out early
+            return
+
+        # If the template contains a reference to
+        # /mgoblin_static/ instead of using
+        # /request.staticdirect(), error out here.
+        # This could probably be implemented as a grep on
+        # the shipped templates easier...
+        if response.text.find("/mgoblin_static/") >= 0:
+            raise AssertionError(
+                "Response HTML contains reference to /mgoblin_static/ "
+                "instead of staticdirect. Request was for: "
+                + request.full_path)
+
+        return
+
+
+def suicide_if_bad_celery_environ():
     if not os.environ.get('CELERY_CONFIG_MODULE') == \
-            'mediagoblin.celery_setup.from_tests':
-        raise BadCeleryEnviron(
-            u"Sorry, you *absolutely* must run nosetests with the\n"
-            u"mediagoblin.celery_setup.from_tests module.  Like so:\n"
-            u"$ CELERY_CONFIG_MODULE=mediagoblin.celery_setup.from_tests ./bin/nosetests")
+            'mediagoblin.init.celery.from_tests':
+        raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
+    
+
+def get_test_app(dump_old_app=True):
+    suicide_if_bad_celery_environ()
+
+    # Make sure we've turned on testing
+    testing._activate_testing()
+
+    # Leave this imported as it sets up celery.
+    from mediagoblin.init.celery import from_tests
+
+    global MGOBLIN_APP
 
     # Just return the old app if that exists and it's okay to set up
     # and return
@@ -63,16 +122,14 @@ def get_test_app(dump_old_app=True):
         os.makedirs(full_dir)
 
     # Get app config
-    config = appconfig(
-        'config:' + os.path.basename(TEST_APP_CONFIG),
-        relative_to=os.path.dirname(TEST_APP_CONFIG),
-        name='mediagoblin')
+    global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
+    app_config = global_config['mediagoblin']
 
     # Wipe database
     # @@: For now we're dropping collections, but we could also just
     # collection.remove() ?
-    connection, db = setup_connection_and_db_from_config(
-        config.local_conf)
+    connection, db = setup_connection_and_db_from_config(app_config)
+    assert db.name == MEDIAGOBLIN_TEST_DB_NAME
 
     collections_to_wipe = [
         collection
@@ -82,17 +139,22 @@ def get_test_app(dump_old_app=True):
     for collection in collections_to_wipe:
         db.drop_collection(collection)
 
-    # Don't need these anymore...
-    del(connection)
-    del(db)
-
     # TODO: Drop and recreate indexes
 
     # setup app and return
     test_app = loadapp(
-        'config:' + TEST_APP_CONFIG)
+        'config:' + TEST_SERVER_CONFIG)
+
+    # Insert the TestingMeddleware, which can do some
+    # sanity checks on every request/response.
+    # Doing it this way is probably not the cleanest way.
+    # We'll fix it, when we have plugins!
+    mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
 
-    return TestApp(test_app)
+    app = TestApp(test_app)
+    MGOBLIN_APP = app
+
+    return app
 
 
 def setup_fresh_app(func):
@@ -103,7 +165,55 @@ def setup_fresh_app(func):
     """
     def wrapper(*args, **kwargs):
         test_app = get_test_app()
-        util.clear_test_buckets()
+        testing.clear_test_buckets()
         return func(test_app, *args, **kwargs)
 
     return _make_safe(wrapper, func)
+
+
+def install_fixtures_simple(db, fixtures):
+    """
+    Very simply install fixtures in the database
+    """
+    for collection_name, collection_fixtures in fixtures.iteritems():
+        collection = db[collection_name]
+        for fixture in collection_fixtures:
+            collection.insert(fixture)
+
+
+def assert_db_meets_expected(db, expected):
+    """
+    Assert a database contains the things we expect it to.
+
+    Objects are found via '_id', so you should make sure your document
+    has an _id.
+
+    Args:
+     - db: pymongo or mongokit database connection
+     - expected: the data we expect.  Formatted like:
+         {'collection_name': [
+             {'_id': 'foo',
+              'some_field': 'some_value'},]}
+    """
+    for collection_name, collection_data in expected.iteritems():
+        collection = db[collection_name]
+        for expected_document in collection_data:
+            document = collection.find_one({'_id': expected_document['_id']})
+            assert document is not None  # make sure it exists
+            assert document == expected_document  # make sure it matches
+
+
+def fixture_add_user(username = u'chris', password = 'toast',
+                     active_user = True):
+    test_user = mg_globals.database.User()
+    test_user.username = username
+    test_user.email = username + u'@example.com'
+    if password is not None:
+        test_user.pw_hash = bcrypt_gen_password_hash(password)
+    if active_user:
+        test_user.email_verified = True
+        test_user.status = u'active'
+
+    test_user.save()
+
+    return test_user