# GNU MediaGoblin -- federated, autonomous media hosting
-# Copyright (C) 2011 MediaGoblin contributors. See AUTHORS.
+# Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import pkg_resources
-import os, shutil
+import shutil
+
from paste.deploy import loadapp
from webtest import TestApp
from mediagoblin import mg_globals
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+ CommentSubscription, CommentNotification
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
-from mediagoblin.decorators import _make_safe
-from mediagoblin.db.open import setup_connection_and_db_from_config
+from mediagoblin.db.base import Session
from mediagoblin.meddleware import BaseMeddleware
-from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.auth import gen_password_hash
+from mediagoblin.gmg_commands.dbupdate import run_dbupdate
MEDIAGOBLIN_TEST_DB_NAME = u'__mediagoblin_tests__'
'mediagoblin.tests', 'test_paste.ini')
TEST_APP_CONFIG = pkg_resources.resource_filename(
'mediagoblin.tests', 'test_mgoblin_app.ini')
-TEST_USER_DEV = pkg_resources.resource_filename(
- 'mediagoblin.tests', 'test_user_dev')
-MGOBLIN_APP = None
-
-USER_DEV_DIRECTORIES_TO_SETUP = [
- 'media/public', 'media/queue',
- 'beaker/sessions/data', 'beaker/sessions/lock']
-BAD_CELERY_MESSAGE = """\
-Sorry, you *absolutely* must run nosetests with the
-mediagoblin.init.celery.from_tests module. Like so:
-$ CELERY_CONFIG_MODULE=mediagoblin.init.celery.from_tests ./bin/nosetests"""
-
-class BadCeleryEnviron(Exception): pass
+USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
class TestingMeddleware(BaseMeddleware):
"""
Meddleware for the Unit tests
-
+
It might make sense to perform some tests on all
requests/responses. Or prepare them in a special
manner. For example all html responses could be tested
def process_response(self, request, response):
# All following tests should be for html only!
- if response.content_type != "text/html":
+ if getattr(response, 'content_type', None) != "text/html":
# Get out early
return
return
-def suicide_if_bad_celery_environ():
- if not os.environ.get('CELERY_CONFIG_MODULE') == \
- 'mediagoblin.init.celery.from_tests':
- raise BadCeleryEnviron(BAD_CELERY_MESSAGE)
-
+def get_app(request, paste_config=None, mgoblin_config=None):
+ """Create a MediaGoblin app for testing.
-def get_test_app(dump_old_app=True):
- suicide_if_bad_celery_environ()
-
- # Make sure we've turned on testing
- testing._activate_testing()
-
- # Leave this imported as it sets up celery.
- from mediagoblin.init.celery import from_tests
+ Args:
+ - request: Not an http request, but a pytest fixture request. We
+ use this to make temporary directories that pytest
+ automatically cleans up as needed.
+ - paste_config: particular paste config used by this application.
+ - mgoblin_config: particular mediagoblin config used by this
+ application.
+ """
+ paste_config = paste_config or TEST_SERVER_CONFIG
+ mgoblin_config = mgoblin_config or TEST_APP_CONFIG
- global MGOBLIN_APP
+ # This is the directory we're copying the paste/mgoblin config stuff into
+ run_dir = request.config._tmpdirhandler.mktemp(
+ 'mgoblin_app', numbered=True)
+ user_dev_dir = run_dir.mkdir('user_dev').strpath
- # Just return the old app if that exists and it's okay to set up
- # and return
- if MGOBLIN_APP and not dump_old_app:
- return MGOBLIN_APP
+ new_paste_config = run_dir.join('paste.ini').strpath
+ new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
+ shutil.copyfile(paste_config, new_paste_config)
+ shutil.copyfile(mgoblin_config, new_mgoblin_config)
- # Remove and reinstall user_dev directories
- if os.path.exists(TEST_USER_DEV):
- shutil.rmtree(TEST_USER_DEV)
+ Session.rollback()
+ Session.remove()
+ # install user_dev directories
for directory in USER_DEV_DIRECTORIES_TO_SETUP:
- full_dir = os.path.join(TEST_USER_DEV, directory)
+ full_dir = os.path.join(user_dev_dir, directory)
os.makedirs(full_dir)
# Get app config
- global_config, validation_result = read_mediagoblin_config(TEST_APP_CONFIG)
+ global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
app_config = global_config['mediagoblin']
- # Wipe database
- # @@: For now we're dropping collections, but we could also just
- # collection.remove() ?
- connection, db = setup_connection_and_db_from_config(app_config)
- assert db.name == MEDIAGOBLIN_TEST_DB_NAME
-
- collections_to_wipe = [
- collection
- for collection in db.collection_names()
- if not collection.startswith('system.')]
-
- for collection in collections_to_wipe:
- db.drop_collection(collection)
-
- # TODO: Drop and recreate indexes
+ # Run database setup/migrations
+ run_dbupdate(app_config, global_config)
# setup app and return
test_app = loadapp(
- 'config:' + TEST_SERVER_CONFIG)
+ 'config:' + new_paste_config)
# Insert the TestingMeddleware, which can do some
# sanity checks on every request/response.
mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
app = TestApp(test_app)
- MGOBLIN_APP = app
return app
-def setup_fresh_app(func):
- """
- Decorator to setup a fresh test application for this function.
-
- Cleans out test buckets and passes in a new, fresh test_app.
- """
- def wrapper(*args, **kwargs):
- test_app = get_test_app()
- testing.clear_test_buckets()
- return func(test_app, *args, **kwargs)
-
- return _make_safe(wrapper, func)
-
-
def install_fixtures_simple(db, fixtures):
"""
Very simply install fixtures in the database
"""
Assert a database contains the things we expect it to.
- Objects are found via '_id', so you should make sure your document
- has an _id.
+ Objects are found via 'id', so you should make sure your document
+ has an id.
Args:
- db: pymongo or mongokit database connection
- expected: the data we expect. Formatted like:
{'collection_name': [
- {'_id': 'foo',
+ {'id': 'foo',
'some_field': 'some_value'},]}
"""
for collection_name, collection_data in expected.iteritems():
collection = db[collection_name]
for expected_document in collection_data:
- document = collection.find_one({'_id': expected_document['_id']})
+ document = collection.query.filter_by(id=expected_document['id']).first()
assert document is not None # make sure it exists
assert document == expected_document # make sure it matches
-def fixture_add_user(username = u'chris', password = 'toast',
- active_user = True):
- test_user = mg_globals.database.User()
+def fixture_add_user(username=u'chris', password=u'toast',
+ active_user=True, wants_comment_notification=True):
+ # Reuse existing user or create a new one
+ test_user = User.query.filter_by(username=username).first()
+ if test_user is None:
+ test_user = User()
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
- test_user.pw_hash = bcrypt_gen_password_hash(password)
+ test_user.pw_hash = gen_password_hash(password)
if active_user:
test_user.email_verified = True
test_user.status = u'active'
+ test_user.wants_comment_notification = wants_comment_notification
+
test_user.save()
+ # Reload
+ test_user = User.query.filter_by(username=username).first()
+
+ # ... and detach from session:
+ Session.expunge(test_user)
+
return test_user
+
+
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+ if send_email is None:
+ uploader = User.query.filter_by(id=entry.uploader).first()
+ send_email = uploader.wants_comment_notification
+
+ cs = CommentSubscription(
+ media_entry_id=entry.id,
+ user_id=entry.uploader,
+ notify=notify,
+ send_email=send_email)
+
+ cs.save()
+
+ cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+ Session.expunge(cs)
+
+ return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+ seen=False):
+ cn = CommentNotification(user_id=user_id,
+ seen=seen,
+ subject_id=subject_id)
+ cn.save()
+
+ cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+ Session.expunge(cn)
+
+ return cn
+
+
+def fixture_media_entry(title=u"Some title", slug=None,
+ uploader=None, save=True, gen_slug=True,
+ state=u'unprocessed', fake_upload=True,
+ expunge=True):
+ """
+ Add a media entry for testing purposes.
+
+ Caution: if you're adding multiple entries with fake_upload=True,
+ make sure you save between them... otherwise you'll hit an
+ IntegrityError from multiple newly-added-MediaEntries adding
+ FileKeynames at once. :)
+ """
+ if uploader is None:
+ uploader = fixture_add_user().id
+
+ entry = MediaEntry()
+ entry.title = title
+ entry.slug = slug
+ entry.uploader = uploader
+ entry.media_type = u'image'
+ entry.state = state
+
+ if fake_upload:
+ entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+ 'medium': ['d', 'e', 'f.png'],
+ 'original': ['g', 'h', 'i.png']}
+ entry.media_type = u'mediagoblin.media_types.image'
+
+ if gen_slug:
+ entry.generate_slug()
+
+ if save:
+ entry.save()
+
+ if expunge:
+ entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+ Session.expunge(entry)
+
+ return entry
+
+
+def fixture_add_collection(name=u"My first Collection", user=None):
+ if user is None:
+ user = fixture_add_user()
+ coll = Collection.query.filter_by(creator=user.id, title=name).first()
+ if coll is not None:
+ return coll
+ coll = Collection()
+ coll.creator = user.id
+ coll.title = name
+ coll.generate_slug()
+ coll.save()
+
+ # Reload
+ Session.refresh(coll)
+
+ # ... and detach from session:
+ Session.expunge(coll)
+
+ return coll
+
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+ if author is None:
+ author = fixture_add_user().id
+
+ if media_entry is None:
+ media_entry = fixture_media_entry().id
+
+ if comment is None:
+ comment = \
+ 'Auto-generated test comment by user #{0} on media #{0}'.format(
+ author, media_entry)
+
+ comment = MediaComment(author=author,
+ media_entry=media_entry,
+ content=comment)
+
+ comment.save()
+
+ Session.expunge(comment)
+
+ return comment
+