def atomic_update(table, query_dict, update_values):
- table.find(query_dict).update(update_values,
+ table.query.filter_by(**query_dict).update(update_values,
synchronize_session=False)
Session.commit()
-def check_media_slug_used(dummy_db, uploader_id, slug, ignore_m_id):
- filt = (MediaEntry.uploader == uploader_id) \
- & (MediaEntry.slug == slug)
+def check_media_slug_used(uploader_id, slug, ignore_m_id):
+ query = MediaEntry.query.filter_by(uploader=uploader_id, slug=slug)
if ignore_m_id is not None:
- filt = filt & (MediaEntry.id != ignore_m_id)
- does_exist = Session.query(MediaEntry.id).filter(filt).first() is not None
+ query = query.filter(MediaEntry.id != ignore_m_id)
+ does_exist = query.first() is not None
return does_exist
Session.commit()
-def check_collection_slug_used(dummy_db, creator_id, slug, ignore_c_id):
+def check_collection_slug_used(creator_id, slug, ignore_c_id):
filt = (Collection.creator == creator_id) \
& (Collection.slug == slug)
if ignore_c_id is not None:
does_exist = Session.query(Collection.id).filter(filt).first() is not None
return does_exist
-
if __name__ == '__main__':
from mediagoblin.db.open import setup_connection_and_db_from_config