# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Manage ORCID OAUTH token migration from INSPIRE legacy instance."""
from __future__ import absolute_import, division, print_function
import re
import traceback
from flask import current_app
from sqlalchemy.exc import SQLAlchemyError
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from redis import StrictRedis
from requests.exceptions import RequestException
from simplejson import loads
from time_execution import time_execution
from invenio_oauthclient.utils import oauth_link_external_id
from invenio_oauthclient.models import RemoteToken, User, RemoteAccount, UserIdentity
from invenio_db import db
from invenio_oauthclient.errors import AlreadyLinkedError
from inspire_utils.logging import getStackTraceLogger
from inspire_utils.record import get_value
from inspirehep.modules.orcid import exceptions as domain_exceptions
from inspirehep.modules.orcid.utils import get_literature_recids_for_orcid
from . import domain_models
LOGGER = getStackTraceLogger(__name__)
USER_EMAIL_EMPTY_PATTERN = '{}@FAKEEMAILINSPIRE.FAKE'
[docs]def legacy_orcid_arrays():
"""
Generator to fetch token data from redis.
Note: this function consumes the queue populated by the legacy tasklet:
inspire/bibtasklets/bst_orcidsync.py
Yields:
list: user data in the form of [orcid, token, email, name]
"""
redis_url = current_app.config.get('CACHE_REDIS_URL')
r = StrictRedis.from_url(redis_url)
key = 'legacy_orcid_tokens'
token = r.lpop(key)
while token:
yield loads(token)
token = r.lpop(key)
def _link_user_and_token(user, name, orcid, access_token):
"""Create a link between a user and token, if possible.
Args:
user (invenio_oauthclient.models.User): an existing user object to connect the token to
orcid (string): user's ORCID identifier
access_token (string): OAUTH token for the user
Returns:
str: the ORCID associated with the token newly created/enabled.
"""
try:
# Link user and ORCID
oauth_link_external_id(user, {
'id': orcid,
'method': 'orcid'
})
except AlreadyLinkedError:
# User already has their ORCID linked
pass
# Search for existing tokens.
# Note: there can be only 1 RemoteToken per given RemoteAccount.
existing_remote_token = RemoteToken.query.join(RemoteAccount).filter_by(
user=user).one_or_none()
# If not existing_remote_token, create one.
if not existing_remote_token:
with db.session.begin_nested():
RemoteToken.create(
user_id=user.id,
client_id=get_value(current_app.config, 'ORCID_APP_CREDENTIALS.consumer_key'),
token=access_token,
secret=None,
extra_data={
'orcid': orcid,
'full_name': name,
'allow_push': True,
}
)
return orcid
# If there is an existing_remote_token:
# ensure it is associated to this ORCID and
# set allow_push to True.
#
# Get the ORCID associated with this token.
user_identity = UserIdentity.query.filter_by(user=user, method='orcid').one_or_none()
# Ensure data consistency.
if not user_identity:
msg = 'No UserIdentity for user={}, method="orcid", while' \
' instead there is a RemoteAccount={} and RemoteToken={}'
raise Exception(msg.format(
user, existing_remote_token.remote_account, existing_remote_token))
if user_identity.id != existing_remote_token.remote_account.extra_data['orcid']:
msg = 'UserIdentity={} and RemoteToken={} ORCID mismatch: {} != {}'
raise Exception(msg.format(
user_identity, existing_remote_token, user_identity.id,
existing_remote_token.remote_account.extra_data['orcid']
))
if existing_remote_token.remote_account.extra_data['orcid'] != orcid:
raise RemoteTokenOrcidMismatch(user, [existing_remote_token.remote_account.extra_data['orcid'], orcid])
# Force the allow_push.
with db.session.begin_nested():
if not existing_remote_token.remote_account.extra_data['allow_push']:
existing_remote_token.remote_account.extra_data['allow_push'] = True
return orcid
return None
[docs]class RemoteTokenOrcidMismatch(Exception):
def __init__(self, user, orcids):
msg = ('A RemoteToken already exists for User={} and it is'
' associated to a different ORCID: {}').format(
user, ' != '.join(orcids)
)
super(RemoteTokenOrcidMismatch, self).__init__(msg)
def _register_user(name, email, orcid, token):
"""Add a token to the user, creating the user if doesn't exist.
There are multiple possible scenarios:
- user exists, has ORCID and token already linked
- user exists and has their ORCID linked, but no token is associated
- user exists, but doesn't have the ORCID identifier linked
- user doesn't exist at all
In all the above scenarios this will create the missing parts.
Args:
name (string): user's name
email (string): user's email address
orcid (string): user's ORCID identifier
token (string): OAUTH authorization token
Returns:
str: the ORCID associated with the new user if we created one, or the
ORCID associated with the user whose ``allow_push`` flag changed state.
"""
if not email:
# Generate a (fake) unique email address (because User.email is a
# unique field).
email = USER_EMAIL_EMPTY_PATTERN.format(orcid)
# Try to find an existing user entry
user = _find_user_matching(orcid, email)
# Make the user if didn't find existing one
if not user:
with db.session.begin_nested():
user = User()
user.email = email
user.active = True
db.session.add(user)
return _link_user_and_token(user, name, orcid, token)
@shared_task(ignore_result=True, bind=True, time_limit=5 * 60)
@time_execution
[docs]def import_legacy_orcid_tokens(self):
"""
Celery task to import OAUTH ORCID tokens from legacy.
Note: bind=True for compatibility with @time_execution.
"""
if get_value(current_app.config, 'ORCID_APP_CREDENTIALS.consumer_key') is None:
return
for user_data in legacy_orcid_arrays():
try:
orcid, token, email, name = user_data
orcid_to_push = _register_user(name, email, orcid, token)
if orcid_to_push:
LOGGER.info(
'allow_push now enabled on %s, will push all works now',
orcid_to_push
)
recids = get_literature_recids_for_orcid(orcid_to_push)
for recid in recids:
orcid_push.apply_async(
queue='orcid_push_legacy_tokens',
kwargs={
'orcid': orcid_to_push,
'rec_id': recid,
'oauth_token': token,
},
)
except SQLAlchemyError as ex:
LOGGER.exception(ex)
db.session.commit()
# `soft_time_limit` is used to schedule a retry.
# `time_limit` instead kills the worker process (its exception cannot be caught
# in order to schedule a retry).
# Race conditions between the 2 limits are possible, but the orcid push
# operation is idempotent.
@shared_task(bind=True, soft_time_limit=10 * 60, time_limit=11 * 60)
@time_execution
[docs]def orcid_push(self, orcid, rec_id, oauth_token, kwargs_to_pusher=None):
"""Celery task to push a record to ORCID.
Args:
self (celery.Task): the task
orcid (String): an orcid identifier.
rec_id (Int): inspire record's id to push to ORCID.
oauth_token (String): orcid token.
kwargs_to_pusher (Dict): extra kwargs to pass to the pusher object.
"""
if not current_app.config['FEATURE_FLAG_ENABLE_ORCID_PUSH']:
LOGGER.warning('ORCID push feature flag not enabled')
return
if not re.match(current_app.config.get(
'FEATURE_FLAG_ORCID_PUSH_WHITELIST_REGEX', '^$'), orcid):
LOGGER.warning('ORCID push not enabled for orcid={}'.format(orcid))
return
LOGGER.info('New orcid_push task for recid={} and orcid={}'.format(rec_id, orcid))
kwargs_to_pusher = kwargs_to_pusher or {}
try:
pusher = domain_models.OrcidPusher(orcid, rec_id, oauth_token, **kwargs_to_pusher)
pusher.push()
LOGGER.info('Orcid_push task for recid={} and orcid={} successfully completed'.format(rec_id, orcid))
except (RequestException, SoftTimeLimitExceeded) as exc:
# Trigger a retry only in case of network-related issues.
# RequestException is the base class for all request's library
# exceptions.
# OrcidPusher knows how to handle all expected HTTP exceptions and thus
# no retry is triggered in such cases.
# Other kinds of exceptions (like IOError or anything else due to bugs)
# does not trigger a retry.
# Enrich exception message.
if isinstance(exc, RequestException):
message = (exc.args[0:1] or ('',))[0]
message += '\nResponse={}'.format(exc.response.content)
message += '\nRequest={} {}'.format(exc.request.method, exc.request.url)
exc.args = (message,) + exc.args[1:]
# If max_retries=3, then self.request.retries is: [0, 1, 2, 3]
# thus backoff power 4 is (secs): [4*60, 16*60, 64*60]
backoff = (4 ** (self.request.retries + 1)) * 60
LOGGER.exception(
'Orcid_push task for recid={} and orcid={} raised an exception.'
' Retrying in {} secs. Exception={}'.format(
rec_id, orcid, backoff, traceback.format_exc()))
raise self.retry(max_retries=3, countdown=backoff, exc=exc)
except (domain_exceptions.RecordNotFoundException, domain_exceptions.StaleRecordDBVersionException) as exc:
# If max_retries=4, then self.request.retries is: [0, 1, 2, 3, 4]
# thus backoff power 5 is (secs): [5, 25, 125, 625]
backoff = 5 ** (self.request.retries + 1)
raise self.retry(max_retries=4, countdown=backoff, exc=exc)
except Exception:
LOGGER.exception(
'Orcid_push task for recid={} and orcid={} failed raising the'
' exception={}'.format(
rec_id, orcid, traceback.format_exc()))
raise
def _find_user_matching(orcid, email):
"""Attempt to find a user in our DB on either ORCID or email."""
user_identity = UserIdentity.query.filter_by(id=orcid, method='orcid').first()
if user_identity:
return user_identity.user
return User.query.filter_by(email=email).one_or_none()