class GMGTableBase(object):
query = Session.query_property()
- @classmethod
- def find(cls, query_dict):
- return cls.query.filter_by(**query_dict)
-
- @classmethod
- def find_one(cls, query_dict):
- return cls.query.filter_by(**query_dict).first()
-
- @classmethod
- def one(cls, query_dict):
- return cls.find(query_dict).one()
-
def get(self, key):
return getattr(self, key)
def atomic_update(table, query_dict, update_values):
- table.find(query_dict).update(update_values,
+ table.query.filter_by(**query_dict).update(update_values,
synchronize_session=False)
Session.commit()
"""
@wraps(controller)
def wrapper(request, *args, **kwargs):
- creator_id = request.db.User.find_one(
- {'username': request.matchdict['user']}).id
+ creator_id = request.db.User.query.filter_by(
+ username=request.matchdict['user']).first().id
if not (request.user.is_admin or
request.user.id == creator_id):
raise Forbidden()
"""
@wraps(controller)
def wrapper(request, *args, **kwargs):
- user = request.db.User.find_one(
- {'username': request.matchdict['user']})
+ user = request.db.User.query.filter_by(
+ username=request.matchdict['user']).first()
if not user:
return render_404(request)
- collection = request.db.Collection.find_one(
- {'slug': request.matchdict['collection'],
- 'creator': user.id})
+ collection = request.db.Collection.query.filter_by(
+ slug=request.matchdict['collection'],
+ creator=user.id).first()
# Still no collection? Okay, 404.
if not collection:
"""
@wraps(controller)
def wrapper(request, *args, **kwargs):
- user = request.db.User.find_one(
- {'username': request.matchdict['user']})
+ user = request.db.User.query.filter_by(
+ username=request.matchdict['user']).first()
if not user:
return render_404(request)
- collection_item = request.db.CollectionItem.find_one(
- {'id': request.matchdict['collection_item'] })
+ collection_item = request.db.CollectionItem.query.filter_by(
+ id=request.matchdict['collection_item']).first()
# Still no collection item? Okay, 404.
if not collection_item:
form.slug.data, collection.id)
# Make sure there isn't already a Collection with this title
- existing_collection = request.db.Collection.find_one({
- 'creator': request.user.id,
- 'title':form.title.data})
+ existing_collection = request.db.Collection.query.filter_by(
+ creator=request.user.id,
+ title=form.title.data).first()
if existing_collection and existing_collection.id != collection.id:
messages.add_message(
# TODO: Add import of queue files
queue_cache = BasicFileStorage(args._cache_path['queue'])
- for entry in db.MediaEntry.find():
+ for entry in db.MediaEntry.query.filter_by():
for name, path in entry.media_files.items():
_log.info('Importing: {0} - {1}'.format(
entry.title.encode('ascii', 'replace'),
# TODO: Add export of queue files
queue_cache = BasicFileStorage(args._cache_path['queue'])
- for entry in db.MediaEntry.find():
+ for entry in db.MediaEntry.query.filter_by():
for name, path in entry.media_files.items():
_log.info(u'Exporting {0} - {1}'.format(
entry.title,
db = mg_globals.database
users_with_username = \
- db.User.find({
- 'username': args.username.lower(),
- }).count()
+ db.User.query.filter_by(
+ username=args.username.lower()
+ ).count()
if users_with_username:
print u'Sorry, a user with that name already exists.'
db = mg_globals.database
- user = db.User.one({'username': unicode(args.username.lower())})
+ user = db.User.query.filter_by(
+ username=unicode(args.username.lower())).one()
if user:
user.is_admin = True
user.save()
db = mg_globals.database
- user = db.User.one({'username': unicode(args.username.lower())})
+ user = db.User.query.filter_by(
+ username=unicode(args.username.lower())).one()
if user:
user.pw_hash = auth.gen_password_hash(args.password)
user.save()
collection.generate_slug()
# Make sure this user isn't duplicating an existing collection
- existing_collection = request.db.Collection.find_one({
- 'creator': request.user.id,
- 'title':collection.title})
+ existing_collection = request.db.Collection.query.filter_by(
+ creator=request.user.id,
+ title=collection.title).first()
if existing_collection:
add_message(request, messages.ERROR,
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
'mediagoblin/user_pages/user.html']
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
- new_user = mg_globals.database.User.find_one(
- {'username': u'happygirl'})
+ new_user = mg_globals.database.User.query.filter_by(
+ username=u'happygirl').first()
assert new_user
assert new_user.status == u'active'
assert new_user.email_verified == True
assert urlparse.urlsplit(res.location)[2] == '/'
# Email shouldn't be saved
- email_in_db = mg_globals.database.User.find_one(
- {'email': 'new@example.com'})
+ email_in_db = mg_globals.database.User.query.filter_by(
+ email='new@example.com').first()
email = User.query.filter_by(username='chris').first().email
assert email_in_db is None
assert email == 'chris@example.com'
openid_plugin_app.get('/auth/logout')
# Get user and detach from session
- test_user = mg_globals.database.User.find_one({
- 'username': u'chris'})
+ test_user = mg_globals.database.User.query.filter_by(
+ username=u'chris').first()
Session.expunge(test_user)
# Log back in
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
# OpenID Added?
- new_openid = mg_globals.database.OpenIDUserURL.find_one(
- {'openid_url': u'http://add.myopenid.com'})
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
assert new_openid
_test_add()
assert 'mediagoblin/edit/edit_account.html' in template.TEMPLATE_TEST_CONTEXT
# OpenID deleted?
- new_openid = mg_globals.database.OpenIDUserURL.find_one(
- {'openid_url': u'http://add.myopenid.com'})
+ new_openid = mg_globals.database.OpenIDUserURL.query.filter_by(
+ openid_url=u'http://add.myopenid.com').first()
assert not new_openid
_test_delete(self, test_user)
return {'upload_files': [('file', filename)]}
def check_comments(self, request, media_id, count):
- comments = request.db.MediaComment.find({'media_entry': media_id})
+ comments = request.db.MediaComment.query.filter_by(media_entry=media_id)
assert count == len(list(comments))
def test_missing_fields(self):
assert 'mediagoblin/user_pages/user.html' in context
def check_media(self, request, find_data, count=None):
- media = MediaEntry.find(find_data)
+ media = MediaEntry.query.filter_by(**find_data)
if count is not None:
assert media.count() == count
if count == 0:
request = context['request']
- media = request.db.MediaEntry.find_one({
- u'title': u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE'})
+ media = request.db.MediaEntry.query.filter_by(
+ title=u'UNIQUE_TITLE_PLS_DONT_CREATE_OTHER_MEDIA_WITH_THIS_TITLE').first()
assert media.media_type == 'mediagoblin.media_types.image'
response, context = self.do_post({'title': title}, do_follow=True,
**self.upload_data(filename))
self.check_url(response, '/u/{0}/'.format(self.test_user.username))
- entry = mg_globals.database.MediaEntry.find_one({'title': title})
+ entry = mg_globals.database.MediaEntry.query.filter_by(title=title).first()
assert entry.state == 'failed'
assert entry.fail_error == u'mediagoblin.processing:BadMediaFail'
for collection_name, collection_data in expected.iteritems():
collection = db[collection_name]
for expected_document in collection_data:
- document = collection.find_one({'id': expected_document['id']})
+ document = collection.query.filter_by(id=expected_document['id']).first()
assert document is not None # make sure it exists
assert document == expected_document # make sure it matches