This was a very small update, I'm hoping to rebase after this to solve some
[mediagoblin.git] / mediagoblin / tests / tools.py
index 4b61f259c68397f06fb0ca1da4878793f71591b2..ec17d79184c29b0cd23b1b36866c3537d64e1930 100644 (file)
@@ -1,5 +1,5 @@
 # GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 Free Software Foundation, Inc
+# Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published by
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
+import os
 import pkg_resources
-import os, shutil
+import shutil
+
 
 from paste.deploy import loadapp
 from webtest import TestApp
 
-from mediagoblin import util
+from mediagoblin import mg_globals
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+    CommentSubscription, CommentNotification, Privilege
+from mediagoblin.tools import testing
 from mediagoblin.init.config import read_mediagoblin_config
-from mediagoblin.decorators import _make_safe
-from mediagoblin.db.open import setup_connection_and_db_from_config
+from mediagoblin.db.base import Session
+from mediagoblin.meddleware import BaseMeddleware
+from mediagoblin.auth import gen_password_hash
+from mediagoblin.gmg_commands.dbupdate import run_dbupdate
 
 
 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
@@ -32,94 +39,104 @@ TEST_SERVER_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_paste.ini')
 TEST_APP_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_mgoblin_app.ini')
-TEST_USER_DEV = pkg_resources.resource_filename(
-    'mediagoblin.tests', 'test_user_dev')
-MGOBLIN_APP = None
 
-USER_DEV_DIRECTORIES_TO_SETUP = [
-    'media/public', 'media/queue',
-    'beaker/sessions/data', 'beaker/sessions/lock']
 
-BAD_CELERY_MESSAGE = """\
-Sorry, you *absolutely* must run nosetests with the
-mediagoblin.init.celery.from_tests module.  Like so:
-$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
+USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
+
+
+class TestingMeddleware(BaseMeddleware):
+    """
+    Meddleware for the Unit tests
+
+    It might make sense to perform some tests on all
+    requests/responses. Or prepare them in a special
+    manner. For example all html responses could be tested
+    for being valid html *after* being rendered.
+
+    This module is getting inserted at the front of the
+    meddleware list, which means: requests are handed here
+    first, responses last. So this wraps up the "normal"
+    app.
+
+    If you need to add a test, either add it directly to
+    the appropiate process_request or process_response, or
+    create a new method and call it from process_*.
+    """
 
+    def process_response(self, request, response):
+        # All following tests should be for html only!
+        if getattr(response, 'content_type', None) != "text/html":
+            # Get out early
+            return
 
-class BadCeleryEnviron(Exception): pass
+        # If the template contains a reference to
+        # /mgoblin_static/ instead of using
+        # /request.staticdirect(), error out here.
+        # This could probably be implemented as a grep on
+        # the shipped templates easier...
+        if response.text.find("/mgoblin_static/") >= 0:
+            raise AssertionError(
+                "Response HTML contains reference to /mgoblin_static/ "
+                "instead of staticdirect. Request was for: "
+                + request.full_path)
 
+        return
 
-def suicide_if_bad_celery_environ():
-    if not os.environ.get('CELERY_CONFIG_MODULE') == \
-            'mediagoblin.init.celery.from_tests':
-        raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-    
 
-def get_test_app(dump_old_app=True):
-    suicide_if_bad_celery_environ()
+def get_app(request, paste_config=None, mgoblin_config=None):
+    """Create a MediaGoblin app for testing.
 
-    # Leave this imported as it sets up celery.
-    from mediagoblin.init.celery import from_tests
+    Args:
+     - request: Not an http request, but a pytest fixture request.  We
+       use this to make temporary directories that pytest
+       automatically cleans up as needed.
+     - paste_config: particular paste config used by this application.
+     - mgoblin_config: particular mediagoblin config used by this
+       application.
+    """
+    paste_config = paste_config or TEST_SERVER_CONFIG
+    mgoblin_config = mgoblin_config or TEST_APP_CONFIG
 
-    global MGOBLIN_APP
+    # This is the directory we're copying the paste/mgoblin config stuff into
+    run_dir = request.config._tmpdirhandler.mktemp(
+        'mgoblin_app', numbered=True)
+    user_dev_dir = run_dir.mkdir('user_dev').strpath
 
-    # Just return the old app if that exists and it's okay to set up
-    # and return
-    if MGOBLIN_APP and not dump_old_app:
-        return MGOBLIN_APP
+    new_paste_config = run_dir.join('paste.ini').strpath
+    new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
+    shutil.copyfile(paste_config, new_paste_config)
+    shutil.copyfile(mgoblin_config, new_mgoblin_config)
 
-    # Remove and reinstall user_dev directories
-    if os.path.exists(TEST_USER_DEV):
-        shutil.rmtree(TEST_USER_DEV)
+    Session.rollback()
+    Session.remove()
 
+    # install user_dev directories
     for directory in USER_DEV_DIRECTORIES_TO_SETUP:
-        full_dir = os.path.join(TEST_USER_DEV, directory)
+        full_dir = os.path.join(user_dev_dir, directory)
         os.makedirs(full_dir)
 
     # Get app config
-    global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
+    global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
     app_config = global_config['mediagoblin']
 
-    # Wipe database
-    # @@: For now we're dropping collections, but we could also just
-    # collection.remove() ?
-    connection, db = setup_connection_and_db_from_config(app_config)
-    assert db.name == MEDIAGOBLIN_TEST_DB_NAME
-
-    collections_to_wipe = [
-        collection
-        for collection in db.collection_names()
-        if not collection.startswith('system.')]
-
-    for collection in collections_to_wipe:
-        db.drop_collection(collection)
-
-    # TODO: Drop and recreate indexes
+    # Run database setup/migrations
+    run_dbupdate(app_config, global_config)
 
     # setup app and return
     test_app = loadapp(
-        'config:' + TEST_SERVER_CONFIG)
+        'config:' + new_paste_config)
+
+    # Insert the TestingMeddleware, which can do some
+    # sanity checks on every request/response.
+    # Doing it this way is probably not the cleanest way.
+    # We'll fix it, when we have plugins!
+    mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
 
     app = TestApp(test_app)
-    MGOBLIN_APP = app
 
     return app
 
 
-def setup_fresh_app(func):
-    """
-    Decorator to setup a fresh test application for this function.
-
-    Cleans out test buckets and passes in a new, fresh test_app.
-    """
-    def wrapper(*args, **kwargs):
-        test_app = get_test_app()
-        util.clear_test_buckets()
-        return func(test_app, *args, **kwargs)
-
-    return _make_safe(wrapper, func)
-
-
 def install_fixtures_simple(db, fixtures):
     """
     Very simply install fixtures in the database
@@ -134,19 +151,165 @@ def assert_db_meets_expected(db, expected):
     """
     Assert a database contains the things we expect it to.
 
-    Objects are found via '_id', so you should make sure your document
-    has an _id.
+    Objects are found via 'id', so you should make sure your document
+    has an id.
 
     Args:
      - db: pymongo or mongokit database connection
      - expected: the data we expect.  Formatted like:
          {'collection_name': [
-             {'_id': 'foo',
+             {'id': 'foo',
               'some_field': 'some_value'},]}
     """
     for collection_name, collection_data in expected.iteritems():
         collection = db[collection_name]
         for expected_document in collection_data:
-            document = collection.find_one({'_id': expected_document['_id']})
+            document = collection.query.filter_by(id=expected_document['id']).first()
             assert document is not None  # make sure it exists
             assert document == expected_document  # make sure it matches
+
+
+def fixture_add_user(username=u'chris', password=u'toast',
+                     privileges=[], wants_comment_notification=True):
+    # Reuse existing user or create a new one
+    test_user = User.query.filter_by(username=username).first()
+    if test_user is None:
+        test_user = User()
+    test_user.username = username
+    test_user.email = username + u'@example.com'
+    if password is not None:
+        test_user.pw_hash = gen_password_hash(password)
+    test_user.wants_comment_notification = wants_comment_notification
+    for privilege in privileges:
+        query = Privilege.query.filter(Privilege.privilege_name==privilege)
+        if query.count():
+            test_user.all_privileges.append(query.one())
+
+    test_user.save()
+    # Reload
+    test_user = User.query.filter_by(username=username).first()
+
+    # ... and detach from session:
+    Session.expunge(test_user)
+
+    return test_user
+
+
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+    if send_email is None:
+        uploader = User.query.filter_by(id=entry.uploader).first()
+        send_email = uploader.wants_comment_notification
+
+    cs = CommentSubscription(
+        media_entry_id=entry.id,
+        user_id=entry.uploader,
+        notify=notify,
+        send_email=send_email)
+
+    cs.save()
+
+    cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+    Session.expunge(cs)
+
+    return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+                                     seen=False):
+    cn = CommentNotification(user_id=user_id,
+                             seen=seen,
+                             subject_id=subject_id)
+    cn.save()
+
+    cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+    Session.expunge(cn)
+
+    return cn
+
+
+def fixture_media_entry(title=u"Some title", slug=None,
+                        uploader=None, save=True, gen_slug=True,
+                        state=u'unprocessed', fake_upload=True,
+                        expunge=True):
+    """
+    Add a media entry for testing purposes.
+
+    Caution: if you're adding multiple entries with fake_upload=True,
+    make sure you save between them... otherwise you'll hit an
+    IntegrityError from multiple newly-added-MediaEntries adding
+    FileKeynames at once.  :)
+    """
+    if uploader is None:
+        uploader = fixture_add_user().id
+
+    entry = MediaEntry()
+    entry.title = title
+    entry.slug = slug
+    entry.uploader = uploader
+    entry.media_type = u'image'
+    entry.state = state
+
+    if fake_upload:
+        entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+                             'medium': ['d', 'e', 'f.png'],
+                             'original': ['g', 'h', 'i.png']}
+        entry.media_type = u'mediagoblin.media_types.image'
+
+    if gen_slug:
+        entry.generate_slug()
+
+    if save:
+        entry.save()
+
+    if expunge:
+        entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+        Session.expunge(entry)
+
+    return entry
+
+
+def fixture_add_collection(name=u"My first Collection", user=None):
+    if user is None:
+        user = fixture_add_user()
+    coll = Collection.query.filter_by(creator=user.id, title=name).first()
+    if coll is not None:
+        return coll
+    coll = Collection()
+    coll.creator = user.id
+    coll.title = name
+    coll.generate_slug()
+    coll.save()
+
+    # Reload
+    Session.refresh(coll)
+
+    # ... and detach from session:
+    Session.expunge(coll)
+
+    return coll
+
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+    if author is None:
+        author = fixture_add_user().id
+
+    if media_entry is None:
+        media_entry = fixture_media_entry().id
+
+    if comment is None:
+        comment = \
+            'Auto-generated test comment by user #{0} on media #{0}'.format(
+                author, media_entry)
+
+    comment = MediaComment(author=author,
+                      media_entry=media_entry,
+                      content=comment)
+
+    comment.save()
+
+    Session.expunge(comment)
+
+    return comment
+