Source code for inspirehep.modules.records.permissions
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
from __future__ import absolute_import, division, print_function
from flask import current_app, session
from flask_principal import ActionNeed
from flask_security import current_user
from werkzeug.local import LocalProxy
from invenio_access.models import ActionUsers, ActionRoles
from invenio_access.permissions import (
Permission,
ParameterizedActionNeed,
)
from invenio_cache import current_cache
action_view_restricted_collection = ParameterizedActionNeed(
'view-restricted-collection', argument=None
)
action_update_collection = ParameterizedActionNeed(
'update-collection', argument=None
)
all_restricted_collections = LocalProxy(lambda: load_restricted_collections())
user_collections = LocalProxy(lambda: get_user_collections())
[docs]def get_user_collections():
"""Get user restricted collections."""
return session.get('restricted_collections', set())
[docs]def load_user_collections(app, user):
"""Load user restricted collections upon login.
Receiver for flask_login.user_logged_in
"""
user_collections = set(
[a.argument for a in ActionUsers.query.filter_by(
action='view-restricted-collection',
user_id=current_user.get_id()).all()]
)
user_roles = user.roles
for role in user_roles:
user_collections = user_collections | set(
[a.argument for a in ActionRoles.query.filter_by(
action='view-restricted-collection',
role_id=role.id).all()]
)
session['restricted_collections'] = user_collections
[docs]def load_restricted_collections():
restricted_collections = current_cache.get('restricted_collections')
if restricted_collections:
return restricted_collections
else:
restricted_collections = set(
[
a.argument for a in ActionUsers.query.filter_by(
action='view-restricted-collection').all()
]
)
restricted_collections = restricted_collections | set(
[
a.argument for a in ActionRoles.query.filter_by(
action='view-restricted-collection').all()
]
)
if restricted_collections:
current_cache.set(
'restricted_collections',
restricted_collections,
timeout=current_app.config.get(
'INSPIRE_COLLECTIONS_RESTRICTED_CACHE_TIMEOUT', 120)
)
return restricted_collections
[docs]def record_read_permission_factory(record=None):
"""Record permission factory."""
return RecordPermission.create(record=record, action='read')
[docs]def record_update_permission_factory(record=None):
"""Record permission factory."""
return RecordPermission.create(record=record, action='update')
[docs]class RecordPermission(Permission):
"""Record permission.
- Read access given if collection not restricted.
- Update access given to admins and cataloguers.
- All other actions are denied for the moment.
"""
read_actions = ['read']
update_actions = ['update']
def __init__(self, record, func, user):
"""Initialize a file permission object."""
self.record = record
self.func = func
self.user = user or current_user
[docs] def can(self):
"""Determine access."""
return self.func(self.user, self.record)
@classmethod
[docs] def create(cls, record, action, user=None):
"""Create a record permission."""
if action in cls.read_actions:
return cls(record, has_read_permission, user)
elif action in cls.update_actions:
return cls(record, has_update_permission, user)
else:
return cls(record, deny, user)
[docs]def has_read_permission(user, record):
"""Check if user has read access to the record."""
def _cant_view(collection):
return not Permission(
ParameterizedActionNeed(
'view-restricted-collection',
collection)).can()
user_roles = [r.name for r in current_user.roles]
if 'superuser' in user_roles:
return True
if '_collections' in record:
record_collections = set(record['_collections'])
restricted_coll = all_restricted_collections & record_collections
if restricted_coll:
if any(map(_cant_view, restricted_coll)):
return False
# By default we allow access
return True
[docs]def has_update_permission(user, record):
"""Check if user has update access to the record."""
def _cant_update(collection):
return not Permission(
ParameterizedActionNeed(
'update-collection',
collection)).can()
user_roles = [r.name for r in current_user.roles]
if 'superuser' in user_roles:
return True
if '_collections' in record:
record_collections = set(record['_collections'])
if any(map(_cant_update, record_collections)):
return False
return True
return False
[docs]def has_admin_permission(user, record):
"""Check if user has admin access to record."""
# Allow administrators
if Permission(ActionNeed('admin-access')):
return True
#
# Utility functions
#
[docs]def deny(user, record):
"""Deny access."""
return False