X-Git-Url: https://vcs.fsf.org/?a=blobdiff_plain;f=mediagoblin%2Fdb%2Futil.py;h=8431361a89fc6dc6d22c962e238816fb9e3e0e60;hb=16ef14ba41f581ed950ae319f363e90540b2cbd7;hp=540a9244d578d53094f72acea45ad28d9163cd5e;hpb=c0516158896e2bdf1592f1331b9ecf1979d4c099;p=mediagoblin.git diff --git a/mediagoblin/db/util.py b/mediagoblin/db/util.py index 540a9244..8431361a 100644 --- a/mediagoblin/db/util.py +++ b/mediagoblin/db/util.py @@ -14,16 +14,63 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -try: - from mediagoblin.db.sql_switch import use_sql -except ImportError: - use_sql = False - -if use_sql: - from mediagoblin.db.sql.fake import ObjectId, InvalidId, DESCENDING - from mediagoblin.db.sql.util import atomic_update, check_media_slug_used, \ - media_entries_for_tag_slug -else: - from mediagoblin.db.mongo.util import \ - ObjectId, InvalidId, DESCENDING, atomic_update, \ - check_media_slug_used, media_entries_for_tag_slug +from mediagoblin.db.base import Session +from mediagoblin.db.models import MediaEntry, Tag, MediaTag, Collection + + +########################## +# Random utility functions +########################## + + +def atomic_update(table, query_dict, update_values): + table.query.filter_by(**query_dict).update(update_values, + synchronize_session=False) + Session.commit() + + +def check_media_slug_used(uploader_id, slug, ignore_m_id): + query = MediaEntry.query.filter_by(uploader=uploader_id, slug=slug) + if ignore_m_id is not None: + query = query.filter(MediaEntry.id != ignore_m_id) + does_exist = query.first() is not None + return does_exist + + +def media_entries_for_tag_slug(dummy_db, tag_slug): + return MediaEntry.query \ + .join(MediaEntry.tags_helper) \ + .join(MediaTag.tag_helper) \ + .filter( + (MediaEntry.state == u'processed') + & (Tag.slug == tag_slug)) + + +def clean_orphan_tags(commit=True): + """Search for unused MediaTags and delete them""" + q1 = Session.query(Tag).outerjoin(MediaTag).filter(MediaTag.id==None) + for t in q1: + Session.delete(t) + # The "let the db do all the work" version: + # q1 = Session.query(Tag.id).outerjoin(MediaTag).filter(MediaTag.id==None) + # q2 = Session.query(Tag).filter(Tag.id.in_(q1)) + # q2.delete(synchronize_session = False) + if commit: + Session.commit() + + +def check_collection_slug_used(creator_id, slug, ignore_c_id): + filt = (Collection.creator == creator_id) \ + & (Collection.slug == slug) + if ignore_c_id is not None: + filt = filt & (Collection.id != ignore_c_id) + does_exist = Session.query(Collection.id).filter(filt).first() is not None + return does_exist + + +if __name__ == '__main__': + from mediagoblin.db.open import setup_connection_and_db_from_config + + db = setup_connection_and_db_from_config({'sql_engine':'sqlite:///mediagoblin.db'}) + + clean_orphan_tags()