Fix errors in collection views
[mediagoblin.git] / mediagoblin / tests / tools.py
index 7f20f6e7f7c589c670b052d295aacf65a85ec4e1..cc4a7addf66c8c10c6b3aeb67a89703aae882002 100644 (file)
@@ -1,5 +1,5 @@
 # GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 MediaGoblin contributors.  See AUTHORS.
+# Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
+import os
 import pkg_resources
-import os, shutil
+import shutil
+
+from functools import wraps
 
 from paste.deploy import loadapp
 from webtest import TestApp
 
 from mediagoblin import mg_globals
+from mediagoblin.db.models import User, MediaEntry, Collection
 from mediagoblin.tools import testing
-from mediagoblin.middleware.testing import TestingMiddleware
 from mediagoblin.init.config import read_mediagoblin_config
-from mediagoblin.decorators import _make_safe
 from mediagoblin.db.open import setup_connection_and_db_from_config
+from mediagoblin.db.base import Session
+from mediagoblin.meddleware import BaseMeddleware
+from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.gmg_commands.dbupdate import run_dbupdate
+from mediagoblin.init.celery import setup_celery_app
 
 
 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
@@ -51,13 +58,52 @@ $ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
 class BadCeleryEnviron(Exception): pass
 
 
+class TestingMeddleware(BaseMeddleware):
+    """
+    Meddleware for the Unit tests
+
+    It might make sense to perform some tests on all
+    requests/responses. Or prepare them in a special
+    manner. For example all html responses could be tested
+    for being valid html *after* being rendered.
+
+    This module is getting inserted at the front of the
+    meddleware list, which means: requests are handed here
+    first, responses last. So this wraps up the "normal"
+    app.
+
+    If you need to add a test, either add it directly to
+    the appropiate process_request or process_response, or
+    create a new method and call it from process_*.
+    """
+
+    def process_response(self, request, response):
+        # All following tests should be for html only!
+        if getattr(response, 'content_type', None) != "text/html":
+            # Get out early
+            return
+
+        # If the template contains a reference to
+        # /mgoblin_static/ instead of using
+        # /request.staticdirect(), error out here.
+        # This could probably be implemented as a grep on
+        # the shipped templates easier...
+        if response.text.find("/mgoblin_static/") >= 0:
+            raise AssertionError(
+                "Response HTML contains reference to /mgoblin_static/ "
+                "instead of staticdirect. Request was for: "
+                + request.full_path)
+
+        return
+
+
 def suicide_if_bad_celery_environ():
     if not os.environ.get('CELERY_CONFIG_MODULE') == \
             'mediagoblin.init.celery.from_tests':
         raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-    
 
-def get_test_app(dump_old_app=True):
+
+def get_app(dump_old_app=True):
     suicide_if_bad_celery_environ()
 
     # Make sure we've turned on testing
@@ -73,6 +119,9 @@ def get_test_app(dump_old_app=True):
     if MGOBLIN_APP and not dump_old_app:
         return MGOBLIN_APP
 
+    Session.rollback()
+    Session.remove()
+
     # Remove and reinstall user_dev directories
     if os.path.exists(TEST_USER_DEV):
         shutil.rmtree(TEST_USER_DEV)
@@ -85,31 +134,21 @@ def get_test_app(dump_old_app=True):
     global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
     app_config = global_config['mediagoblin']
 
-    # Wipe database
-    # @@: For now we're dropping collections, but we could also just
-    # collection.remove() ?
-    connection, db = setup_connection_and_db_from_config(app_config)
-    assert db.name == MEDIAGOBLIN_TEST_DB_NAME
-
-    collections_to_wipe = [
-        collection
-        for collection in db.collection_names()
-        if not collection.startswith('system.')]
-
-    for collection in collections_to_wipe:
-        db.drop_collection(collection)
-
-    # TODO: Drop and recreate indexes
+    # Run database setup/migrations
+    run_dbupdate(app_config, global_config)
 
     # setup app and return
     test_app = loadapp(
         'config:' + TEST_SERVER_CONFIG)
 
-    # Insert the TestingMiddleware, which can do some
+    # Re-setup celery
+    setup_celery_app(app_config, global_config)
+
+    # Insert the TestingMeddleware, which can do some
     # sanity checks on every request/response.
     # Doing it this way is probably not the cleanest way.
     # We'll fix it, when we have plugins!
-    mg_globals.app.middleware.insert(0, TestingMiddleware(mg_globals.app))
+    mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
 
     app = TestApp(test_app)
     MGOBLIN_APP = app
@@ -123,12 +162,13 @@ def setup_fresh_app(func):
 
     Cleans out test buckets and passes in a new, fresh test_app.
     """
+    @wraps(func)
     def wrapper(*args, **kwargs):
-        test_app = get_test_app()
+        test_app = get_app()
         testing.clear_test_buckets()
         return func(test_app, *args, **kwargs)
 
-    return _make_safe(wrapper, func)
+    return wrapper
 
 
 def install_fixtures_simple(db, fixtures):
@@ -145,19 +185,81 @@ def assert_db_meets_expected(db, expected):
     """
     Assert a database contains the things we expect it to.
 
-    Objects are found via '_id', so you should make sure your document
-    has an _id.
+    Objects are found via 'id', so you should make sure your document
+    has an id.
 
     Args:
      - db: pymongo or mongokit database connection
      - expected: the data we expect.  Formatted like:
          {'collection_name': [
-             {'_id': 'foo',
+             {'id': 'foo',
               'some_field': 'some_value'},]}
     """
     for collection_name, collection_data in expected.iteritems():
         collection = db[collection_name]
         for expected_document in collection_data:
-            document = collection.find_one({'_id': expected_document['_id']})
+            document = collection.find_one({'id': expected_document['id']})
             assert document is not None  # make sure it exists
             assert document == expected_document  # make sure it matches
+
+
+def fixture_add_user(username=u'chris', password=u'toast',
+                     active_user=True):
+    # Reuse existing user or create a new one
+    test_user = User.query.filter_by(username=username).first()
+    if test_user is None:
+        test_user = User()
+    test_user.username = username
+    test_user.email = username + u'@example.com'
+    if password is not None:
+        test_user.pw_hash = bcrypt_gen_password_hash(password)
+    if active_user:
+        test_user.email_verified = True
+        test_user.status = u'active'
+
+    test_user.save()
+
+    # Reload
+    test_user = User.query.filter_by(username=username).first()
+
+    # ... and detach from session:
+    Session.expunge(test_user)
+
+    return test_user
+
+
+def fixture_media_entry(title=u"Some title", slug=None,
+                        uploader=None, save=True, gen_slug=True):
+    entry = MediaEntry()
+    entry.title = title
+    entry.slug = slug
+    entry.uploader = uploader or fixture_add_user().id
+    entry.media_type = u'image'
+    
+    if gen_slug:
+        entry.generate_slug()
+    if save:
+        entry.save()
+
+    return entry
+
+
+def fixture_add_collection(name=u"My first Collection", user=None):
+    if user is None:
+        user = fixture_add_user()
+    coll = Collection.query.filter_by(creator=user.id, title=name).first()
+    if coll is not None:
+        return coll
+    coll = Collection()
+    coll.creator = user.id
+    coll.title = name
+    coll.generate_slug()
+    coll.save()
+
+    # Reload
+    Session.refresh(coll)
+
+    # ... and detach from session:
+    Session.expunge(coll)
+
+    return coll