Merge branch 'merge-pyconfigure'
[mediagoblin.git] / mediagoblin / tests / tools.py
index 794ed940a98bc2b8dad69d11f6c8b607eee20012..98361adc18eddf940882e1d06356171a561c9547 100644 (file)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
-import sys
 import os
 import pkg_resources
 import shutil
 
-from functools import wraps
 
 from paste.deploy import loadapp
 from webtest import TestApp
 
 from mediagoblin import mg_globals
-from mediagoblin.db.models import User, MediaEntry, Collection
+from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
+    CommentSubscription, CommentNotification
 from mediagoblin.tools import testing
 from mediagoblin.init.config import read_mediagoblin_config
 from mediagoblin.db.base import Session
 from mediagoblin.meddleware import BaseMeddleware
-from mediagoblin.auth.lib import bcrypt_gen_password_hash
+from mediagoblin.auth import gen_password_hash
 from mediagoblin.gmg_commands.dbupdate import run_dbupdate
 
 
@@ -40,8 +39,6 @@ TEST_SERVER_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_paste.ini')
 TEST_APP_CONFIG = pkg_resources.resource_filename(
     'mediagoblin.tests', 'test_mgoblin_app.ini')
-TEST_USER_DEV = pkg_resources.resource_filename(
-    'mediagoblin.tests', 'test_user_dev')
 
 
 USER_DEV_DIRECTORIES_TO_SETUP = ['media/public', 'media/queue']
@@ -103,7 +100,7 @@ def get_app(request, paste_config=None, mgoblin_config=None):
     # This is the directory we're copying the paste/mgoblin config stuff into
     run_dir = request.config._tmpdirhandler.mktemp(
         'mgoblin_app', numbered=True)
-    user_dev_dir = run_dir.mkdir('test_user_dev').strpath
+    user_dev_dir = run_dir.mkdir('user_dev').strpath
 
     new_paste_config = run_dir.join('paste.ini').strpath
     new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
@@ -167,13 +164,13 @@ def assert_db_meets_expected(db, expected):
     for collection_name, collection_data in expected.iteritems():
         collection = db[collection_name]
         for expected_document in collection_data:
-            document = collection.find_one({'id': expected_document['id']})
+            document = collection.query.filter_by(id=expected_document['id']).first()
             assert document is not None  # make sure it exists
             assert document == expected_document  # make sure it matches
 
 
 def fixture_add_user(username=u'chris', password=u'toast',
-                     active_user=True):
+                     active_user=True, wants_comment_notification=True):
     # Reuse existing user or create a new one
     test_user = User.query.filter_by(username=username).first()
     if test_user is None:
@@ -181,11 +178,13 @@ def fixture_add_user(username=u'chris', password=u'toast',
     test_user.username = username
     test_user.email = username + u'@example.com'
     if password is not None:
-        test_user.pw_hash = bcrypt_gen_password_hash(password)
+        test_user.pw_hash = gen_password_hash(password)
     if active_user:
         test_user.email_verified = True
         test_user.status = u'active'
 
+    test_user.wants_comment_notification = wants_comment_notification
+
     test_user.save()
 
     # Reload
@@ -197,19 +196,79 @@ def fixture_add_user(username=u'chris', password=u'toast',
     return test_user
 
 
+def fixture_comment_subscription(entry, notify=True, send_email=None):
+    if send_email is None:
+        uploader = User.query.filter_by(id=entry.uploader).first()
+        send_email = uploader.wants_comment_notification
+
+    cs = CommentSubscription(
+        media_entry_id=entry.id,
+        user_id=entry.uploader,
+        notify=notify,
+        send_email=send_email)
+
+    cs.save()
+
+    cs = CommentSubscription.query.filter_by(id=cs.id).first()
+
+    Session.expunge(cs)
+
+    return cs
+
+
+def fixture_add_comment_notification(entry_id, subject_id, user_id,
+                                     seen=False):
+    cn = CommentNotification(user_id=user_id,
+                             seen=seen,
+                             subject_id=subject_id)
+    cn.save()
+
+    cn = CommentNotification.query.filter_by(id=cn.id).first()
+
+    Session.expunge(cn)
+
+    return cn
+
+
 def fixture_media_entry(title=u"Some title", slug=None,
-                        uploader=None, save=True, gen_slug=True):
+                        uploader=None, save=True, gen_slug=True,
+                        state=u'unprocessed', fake_upload=True,
+                        expunge=True):
+    """
+    Add a media entry for testing purposes.
+
+    Caution: if you're adding multiple entries with fake_upload=True,
+    make sure you save between them... otherwise you'll hit an
+    IntegrityError from multiple newly-added-MediaEntries adding
+    FileKeynames at once.  :)
+    """
+    if uploader is None:
+        uploader = fixture_add_user().id
+
     entry = MediaEntry()
     entry.title = title
     entry.slug = slug
-    entry.uploader = uploader or fixture_add_user().id
+    entry.uploader = uploader
     entry.media_type = u'image'
+    entry.state = state
+
+    if fake_upload:
+        entry.media_files = {'thumb': ['a', 'b', 'c.jpg'],
+                             'medium': ['d', 'e', 'f.png'],
+                             'original': ['g', 'h', 'i.png']}
+        entry.media_type = u'mediagoblin.media_types.image'
 
     if gen_slug:
         entry.generate_slug()
+
     if save:
         entry.save()
 
+    if expunge:
+        entry = MediaEntry.query.filter_by(id=entry.id).first()
+
+        Session.expunge(entry)
+
     return entry
 
 
@@ -232,3 +291,26 @@ def fixture_add_collection(name=u"My first Collection", user=None):
     Session.expunge(coll)
 
     return coll
+
+def fixture_add_comment(author=None, media_entry=None, comment=None):
+    if author is None:
+        author = fixture_add_user().id
+
+    if media_entry is None:
+        media_entry = fixture_media_entry().id
+
+    if comment is None:
+        comment = \
+            'Auto-generated test comment by user #{0} on media #{0}'.format(
+                author, media_entry)
+
+    comment = MediaComment(author=author,
+                      media_entry=media_entry,
+                      content=comment)
+
+    comment.save()
+
+    Session.expunge(comment)
+
+    return comment
+