Merge branch 'merge-pyconfigure'
[mediagoblin.git] / mediagoblin / tests / tools.py
index 18d4ec0ca9172b16c61b6eb3c82be531467510ff..98361adc18eddf940882e1d06356171a561c9547 100644 (file)
@@ -19,21 +19,19 @@ import os
 import pkg_resources
 import shutil
 
-from functools import wraps
 
 from paste.deploy import loadapp
 from webtest import TestApp
 
 from mediagoblin import mg_globals
-from mediagoblin.db.models import User, Collection
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+    CommentSubscription, CommentNotification
 from mediagoblin.tools import testing
 from mediagoblin.init.config import read_mediagoblin_config
-from mediagoblin.db.open import setup_connection_and_db_from_config
 from mediagoblin.db.base import Session
 from mediagoblin.meddleware import BaseMeddleware
-from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.auth import gen_password_hash
 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
-from mediagoblin.init.celery import setup_celery_app
 
 
 MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
@@ -41,21 +39,9 @@ TEST_SERVER_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_paste.ini')
 TEST_APP_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_mgoblin_app.ini')
-TEST_USER_DEV = pkg_resources.resource_filename(
-    'mediagoblin.tests', 'test_user_dev')
-MGOBLIN_APP = None
 
-USER_DEV_DIRECTORIES_TO_SETUP = [
-    'media/public', 'media/queue',
-    'beaker/sessions/data', 'beaker/sessions/lock']
 
-BAD_CELERY_MESSAGE = """\
-Sorry, you *absolutely* must run nosetests with the
-mediagoblin.init.celery.from_tests module.  Like so:
-$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
-
-
-class BadCeleryEnviron(Exception): pass
+USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
 
 
 class TestingMeddleware(BaseMeddleware):
@@ -97,41 +83,40 @@ class TestingMeddleware(BaseMeddleware):
         return
 
 
-def suicide_if_bad_celery_environ():
-    if not os.environ.get('CELERY_CONFIG_MODULE') == \
-            'mediagoblin.init.celery.from_tests':
-        raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-
+def get_app(request, paste_config=None, mgoblin_config=None):
+    """Create a MediaGoblin app for testing.
 
-def get_app(dump_old_app=True):
-    suicide_if_bad_celery_environ()
-
-    # Make sure we've turned on testing
-    testing._activate_testing()
-
-    # Leave this imported as it sets up celery.
-    from mediagoblin.init.celery import from_tests
+    Args:
+     - request: Not an http request, but a pytest fixture request.  We
+       use this to make temporary directories that pytest
+       automatically cleans up as needed.
+     - paste_config: particular paste config used by this application.
+     - mgoblin_config: particular mediagoblin config used by this
+       application.
+    """
+    paste_config = paste_config or TEST_SERVER_CONFIG
+    mgoblin_config = mgoblin_config or TEST_APP_CONFIG
 
-    global MGOBLIN_APP
+    # This is the directory we're copying the paste/mgoblin config stuff into
+    run_dir = request.config._tmpdirhandler.mktemp(
+        'mgoblin_app', numbered=True)
+    user_dev_dir = run_dir.mkdir('user_dev').strpath
 
-    # Just return the old app if that exists and it's okay to set up
-    # and return
-    if MGOBLIN_APP and not dump_old_app:
-        return MGOBLIN_APP
+    new_paste_config = run_dir.join('paste.ini').strpath
+    new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
+    shutil.copyfile(paste_config, new_paste_config)
+    shutil.copyfile(mgoblin_config, new_mgoblin_config)
 
     Session.rollback()
     Session.remove()
 
-    # Remove and reinstall user_dev directories
-    if os.path.exists(TEST_USER_DEV):
-        shutil.rmtree(TEST_USER_DEV)
-
+    # install user_dev directories
     for directory in USER_DEV_DIRECTORIES_TO_SETUP:
-        full_dir = os.path.join(TEST_USER_DEV, directory)
+        full_dir = os.path.join(user_dev_dir, directory)
         os.makedirs(full_dir)
 
     # Get app config
-    global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
+    global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
     app_config = global_config['mediagoblin']
 
     # Run database setup/migrations
@@ -139,10 +124,7 @@ def get_app(dump_old_app=True):
 
     # setup app and return
     test_app = loadapp(
-        'config:' + TEST_SERVER_CONFIG)
-
-    # Re-setup celery
-    setup_celery_app(app_config, global_config)
+        'config:' + new_paste_config)
 
     # Insert the TestingMeddleware, which can do some
     # sanity checks on every request/response.
@@ -151,26 +133,10 @@ def get_app(dump_old_app=True):
     mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
 
     app = TestApp(test_app)
-    MGOBLIN_APP = app
 
     return app
 
 
-def setup_fresh_app(func):
-    """
-    Decorator to setup a fresh test application for this function.
-
-    Cleans out test buckets and passes in a new, fresh test_app.
-    """
-    @wraps(func)
-    def wrapper(*args, **kwargs):
-        test_app = get_app()
-        testing.clear_test_buckets()
-        return func(test_app, *args, **kwargs)
-
-    return wrapper
-
-
 def install_fixtures_simple(db, fixtures):
     """
     Very simply install fixtures in the database
@@ -198,13 +164,13 @@ def assert_db_meets_expected(db, expected):
     for collection_name, collection_data in expected.iteritems():
         collection = db[collection_name]
         for expected_document in collection_data:
-            document = collection.find_one({'id': expected_document['id']})
+            document = collection.query.filter_by(id=expected_document['id']).first()
             assert document is not None  # make sure it exists
             assert document == expected_document  # make sure it matches
 
 
 def fixture_add_user(username=u'chris', password=u'toast',
-                     active_user=True):
+                     active_user=True, wants_comment_notification=True):
     # Reuse existing user or create a new one
     test_user = User.query.filter_by(username=username).first()
     if test_user is None:
@@ -212,11 +178,13 @@ def fixture_add_user(username=u'chris', password=u'toast',
     test_user.username = username
     test_user.email = username + u'@example.com'
     if password is not None:
-        test_user.pw_hash = bcrypt_gen_password_hash(password)
+        test_user.pw_hash = gen_password_hash(password)
     if active_user:
         test_user.email_verified = True
         test_user.status = u'active'
 
+    test_user.wants_comment_notification = wants_comment_notification
+
     test_user.save()
 
     # Reload
@@ -228,6 +196,82 @@ def fixture_add_user(username=u'chris', password=u'toast',
     return test_user
 
 
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+    if send_email is None:
+        uploader = User.query.filter_by(id=entry.uploader).first()
+        send_email = uploader.wants_comment_notification
+
+    cs = CommentSubscription(
+        media_entry_id=entry.id,
+        user_id=entry.uploader,
+        notify=notify,
+        send_email=send_email)
+
+    cs.save()
+
+    cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+    Session.expunge(cs)
+
+    return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+                                     seen=False):
+    cn = CommentNotification(user_id=user_id,
+                             seen=seen,
+                             subject_id=subject_id)
+    cn.save()
+
+    cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+    Session.expunge(cn)
+
+    return cn
+
+
+def fixture_media_entry(title=u"Some title", slug=None,
+                        uploader=None, save=True, gen_slug=True,
+                        state=u'unprocessed', fake_upload=True,
+                        expunge=True):
+    """
+    Add a media entry for testing purposes.
+
+    Caution: if you're adding multiple entries with fake_upload=True,
+    make sure you save between them... otherwise you'll hit an
+    IntegrityError from multiple newly-added-MediaEntries adding
+    FileKeynames at once.  :)
+    """
+    if uploader is None:
+        uploader = fixture_add_user().id
+
+    entry = MediaEntry()
+    entry.title = title
+    entry.slug = slug
+    entry.uploader = uploader
+    entry.media_type = u'image'
+    entry.state = state
+
+    if fake_upload:
+        entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+                             'medium': ['d', 'e', 'f.png'],
+                             'original': ['g', 'h', 'i.png']}
+        entry.media_type = u'mediagoblin.media_types.image'
+
+    if gen_slug:
+        entry.generate_slug()
+
+    if save:
+        entry.save()
+
+    if expunge:
+        entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+        Session.expunge(entry)
+
+    return entry
+
+
 def fixture_add_collection(name=u"My first Collection", user=None):
     if user is None:
         user = fixture_add_user()
@@ -247,3 +291,26 @@ def fixture_add_collection(name=u"My first Collection", user=None):
     Session.expunge(coll)
 
     return coll
+
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+    if author is None:
+        author = fixture_add_user().id
+
+    if media_entry is None:
+        media_entry = fixture_media_entry().id
+
+    if comment is None:
+        comment = \
+            'Auto-generated test comment by user #{0} on media #{0}'.format(
+                author, media_entry)
+
+    comment = MediaComment(author=author,
+                      media_entry=media_entry,
+                      content=comment)
+
+    comment.save()
+
+    Session.expunge(comment)
+
+    return comment
+