in mediagoblin.db.models
"""
for Model, rows in self.foundations.items():
- print u'\n + Laying foundations for %s table' % (Model.__name__)
+ self.printer(u' + Laying foundations for %s table\n' %
+ (Model.__name__))
for parameters in rows:
new_row = Model(**parameters)
- new_row.save()
+ self.session.add(new_row)
def create_new_migration_record(self):
"""
self.init_tables()
# auto-set at latest migration number
self.create_new_migration_record()
- self.populate_table_foundations()
-
self.printer(u"done.\n")
+ self.populate_table_foundations()
self.set_current_migration()
return u'inited'
class Privilege_v0(declarative_base()):
__tablename__ = 'core__privileges'
id = Column(Integer, nullable=False, primary_key=True, unique=True)
- privilege_name = Column(Unicode, nullable=False)
+ privilege_name = Column(Unicode, nullable=False, unique=True)
class PrivilegeUserAssociation_v0(declarative_base()):
__tablename__ = 'core__privileges_users'
def __repr__(self):
return "<Privilege %s>" % (self.privilege_name)
- def is_admin_or_moderator(self):
- '''
- This method is necessary to check if a user is able to take moderation
- actions.
- '''
-
- return (self.privilege_name==u'admin' or
- self.privilege_name==u'moderator')
class PrivilegeUserAssociation(Base):
'''
<a>{%- trans %}Reported Media Entry{% endtrans -%}</a>
{% endif %}
</td>
- <td>{{ report.report_content[:21] }}{% if report.report_content|count >20 %}...{% endif %}</td>
+ <td>{{ report.report_content[:21] }}
+ {% if report.report_content|count >20 %}...{% endif %}</td>
<td>{%- trans %}Resolve{% endtrans -%}</td>
</tr>
{% endfor %}
<a class="right_align">{{ user.username }}'s report history</a>
<span class=clear></span>
<h2>{{ user.username }}'s Privileges</h2>
+ {% if request.user.has_privilege('admin') and not user_banned and
+ not user.id == request.user.id %}
+ <input type=button class="button_action right_align"
+ value="Ban User" />
+ {% elif request.user.has_privilege('admin') and
+ not user.id == request.user.id %}
+ <input type=button class="button_action right_align"
+ value="UnBan User" />
+ {% endif %}
<form action="{{ request.urlgen('mediagoblin.moderation.give_or_take_away_privilege',
user=user.username) }}"
- method=post >
+ method=post >
<table class="admin_side_panel">
<tr>
<th>{% trans %}Privilege{% endtrans %}</th>
from mediagoblin.tests.tools import fixture_add_user
from .resources import GOOD_JPG, GOOD_PNG, EVIL_FILE, EVIL_JPG, EVIL_PNG, \
BIG_BLUE
+from mediagoblin.db.models import Privilege
_log = logging.getLogger(__name__)
self.db = mg_globals.database
self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joapi', self.user_password)
+ self.user = fixture_add_user(u'joapi', self.user_password,
+ privileges=[u'active',u'uploader'])
def login(self, test_app):
test_app.post(
template.clear_test_template_context()
response = test_app.post(
'/auth/register/', {
- 'username': u'happygirl',
- 'password': 'iamsohappy',
- 'email': 'happygrrl@example.org'})
+ 'username': u'angrygirl',
+ 'password': 'iamsoangry',
+ 'email': 'angrygrrl@example.org'})
response.follow()
## Did we redirect to the proper page? Use the right template?
- assert urlparse.urlsplit(response.location)[2] == '/u/happygirl/'
+ assert urlparse.urlsplit(response.location)[2] == '/u/angrygirl/'
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
## Make sure user is in place
new_user = mg_globals.database.User.query.filter_by(
- username=u'happygirl').first()
+ username=u'angrygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
## Make sure we get email confirmation, and try verifying
assert len(mail.EMAIL_TEST_INBOX) == 1
message = mail.EMAIL_TEST_INBOX.pop()
- assert message['To'] == 'happygrrl@example.org'
+ assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/verification_email.txt']
assert email_context['verification_url'] in message.get_payload(decode=True)
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
new_user = mg_globals.database.User.query.filter_by(
- username=u'happygirl').first()
+ username=u'angrygirl').first()
assert new_user
assert new_user.status == u'needs_email_verification'
assert new_user.email_verified == False
# assert context['verification_successful'] == True
# TODO: Would be good to test messages here when we can do so...
new_user = mg_globals.database.User.query.filter_by(
- username=u'happygirl').first()
+ username=u'angrygirl').first()
assert new_user
assert new_user.status == u'active'
assert new_user.email_verified == True
template.clear_test_template_context()
response = test_app.post(
'/auth/register/', {
- 'username': u'happygirl',
- 'password': 'iamsohappy2',
- 'email': 'happygrrl2@example.org'})
+ 'username': u'angrygirl',
+ 'password': 'iamsoangry2',
+ 'email': 'angrygrrl2@example.org'})
context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/register.html']
template.clear_test_template_context()
response = test_app.post(
'/auth/forgot_password/',
- {'username': u'happygirl'})
+ {'username': u'angrygirl'})
response.follow()
## Did we redirect to the proper page? Use the right template?
## Make sure link to change password is sent by email
assert len(mail.EMAIL_TEST_INBOX) == 1
message = mail.EMAIL_TEST_INBOX.pop()
- assert message['To'] == 'happygrrl@example.org'
+ assert message['To'] == 'angrygrrl@example.org'
email_context = template.TEMPLATE_TEST_CONTEXT[
'mediagoblin/auth/fp_verification_email.txt']
#TODO - change the name of verification_url to something forgot-password-ish
template.clear_test_template_context()
response = test_app.post(
'/auth/forgot_password/verify/', {
- 'password': 'iamveryveryhappy',
+ 'password': 'iamveryveryangry',
'token': parsed_get_params['token']})
response.follow()
assert 'mediagoblin/auth/login.html' in template.TEMPLATE_TEST_CONTEXT
template.clear_test_template_context()
response = test_app.post(
'/auth/login/', {
- 'username': u'happygirl',
- 'password': 'iamveryveryhappy'})
+ 'username': u'angrygirl',
+ 'password': 'iamveryveryangry'})
# User should be redirected
response.follow()
Test logging in and logging out
"""
# Make a new user
- test_user = fixture_add_user(active_user=False)
+ test_user = fixture_add_user()
# Get login
# methods, and so it makes sense to test them here.
from mediagoblin.db.base import Session
-from mediagoblin.db.models import MediaEntry
+from mediagoblin.db.models import MediaEntry, User, Privilege
from mediagoblin.tests.tools import fixture_add_user
qbert_entry.generate_slug()
assert qbert_entry.slug is None
+class TestUserHasPrivilege:
+ def _setup(self):
+ self.natalie_user = fixture_add_user(u'natalie')
+ self.aeva_user = fixture_add_user(u'aeva')
+ self.natalie_user.all_privileges += [
+ Privilege.query.filter(
+ Privilege.privilege_name == u'admin').one(),
+ Privilege.query.filter(
+ Privilege.privilege_name == u'moderator').one()]
+ self.aeva_user.all_privileges += [
+ Privilege.query.filter(
+ Privilege.privilege_name == u'moderator').one()]
+
+ def test_privilege_added_correctly(self, test_app):
+ self._setup()
+ admin = Privilege.query.filter(
+ Privilege.privilege_name == u'admin').one()
+ # first make sure the privileges were added successfully
+
+ assert admin in self.natalie_user.all_privileges
+ assert admin not in self.aeva_user.all_privileges
+
+ def test_user_has_privilege_one(self, test_app):
+ self._setup()
+
+ # then test out the user.has_privilege method for one privilege
+ assert not natalie_user.has_privilege(u'commenter')
+ assert aeva_user.has_privilege(u'active')
+
+
+ def test_user_has_privileges_multiple(self, test_app):
+ self._setup()
+
+ # when multiple args are passed to has_privilege, the method returns
+ # True if the user has ANY of the privileges
+ assert natalie_user.has_privilege(u'admin',u'commenter')
+ assert aeva_user.has_privilege(u'moderator',u'active')
+ assert not natalie_user.has_privilege(u'commenter',u'uploader')
+
+
def test_media_data_init(test_app):
Session.rollback()
obj_in_session += 1
print repr(obj)
assert obj_in_session == 0
+
self.pman = pluginapi.PluginManager()
self.user_password = u'4cc355_70k3N'
- self.user = fixture_add_user(u'joauth', self.user_password)
+ self.user = fixture_add_user(u'joauth', self.user_password,
+ privileges=[u'active'])
self.login()
SET1_MODELS = [Creature1, Level1]
+FOUNDATIONS = {Creature1:[{'name':u'goblin','num_legs':2,'is_demon':False},
+ {'name':u'cerberus','num_legs':4,'is_demon':True}]
+ }
+
SET1_MIGRATIONS = {}
#######################################################
session.commit()
-
def create_test_engine():
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=False)
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
+ u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS, Session(),
printer)
# Check latest migration and database current migration
assert result == u'inited'
# Check output
assert printer.combined_string == (
- "-> Initializing main mediagoblin tables... done.\n")
+ "-> Initializing main mediagoblin tables... done.\n" + \
+ " + Laying foundations for Creature1 table\n" )
# Check version in database
assert migration_manager.latest_migration == 0
assert migration_manager.database_current_migration == 0
+
# Install the initial set
# -----------------------
# Try to "re-migrate" with same manager settings... nothing should happen
migration_manager = MigrationManager(
- u'__main__', SET1_MODELS, SET1_MIGRATIONS, Session(),
- printer)
+ u'__main__', SET1_MODELS, FOUNDATIONS, SET1_MIGRATIONS,
+ Session(), printer)
assert migration_manager.init_or_migrate() == None
# Check version in database
# Now check to see if stuff seems to be in there.
session = Session()
+ # Check the creation of the foundation rows on the creature table
+ creature = session.query(Creature1).filter_by(
+ name=u'goblin').one()
+ assert creature.num_legs == 2
+ assert creature.is_demon == False
+
+ creature = session.query(Creature1).filter_by(
+ name=u'cerberus').one()
+ assert creature.num_legs == 4
+ assert creature.is_demon == True
+
+
+ # Check the creation of the inserted rows on the creature and levels tables
+
creature = session.query(Creature1).filter_by(
name=u'centipede').one()
assert creature.num_legs == 100
# isn't said to be updated yet
printer = CollectingPrinter()
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
# Make sure version matches expected
migration_manager = MigrationManager(
- u'__main__', SET3_MODELS, SET3_MIGRATIONS, Session(),
+ u'__main__', SET3_MODELS, FOUNDATIONS, SET3_MIGRATIONS, Session(),
printer)
assert migration_manager.latest_migration == 8
assert migration_manager.database_current_migration == 8
# Now check to see if stuff seems to be in there.
session = Session()
+
+
+ # Start with making sure that the foundations did not run again
+ assert session.query(Creature3).filter_by(
+ name=u'goblin').count() == 1
+ assert session.query(Creature3).filter_by(
+ name=u'cerberus').count() == 1
+
+ # Then make sure the models have been migrated correctly
creature = session.query(Creature3).filter_by(
name=u'centipede').one()
assert creature.num_limbs == 100.0
from mediagoblin import mg_globals
from mediagoblin.db.models import User, MediaEntry, Collection, MediaComment, \
- CommentSubscription, CommentNotification
+ CommentSubscription, CommentNotification, Privilege
from mediagoblin.tools import testing
from mediagoblin.init.config import read_mediagoblin_config
from mediagoblin.db.base import Session
def fixture_add_user(username=u'chris', password=u'toast',
- active_user=True, wants_comment_notification=True):
+ privileges=[], wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = User.query.filter_by(username=username).first()
if test_user is None:
test_user.email = username + u'@example.com'
if password is not None:
test_user.pw_hash = gen_password_hash(password)
- if active_user:
- test_user.email_verified = True
- test_user.status = u'active'
-
test_user.wants_comment_notification = wants_comment_notification
+ for privilege in privileges:
+ query = Privilege.query.filter(Privilege.privilege_name==privilege)
+ if query.count():
+ test_user.all_privileges.append(query.one())
test_user.save()
-
# Reload
test_user = User.query.filter_by(username=username).first()