Source code for inspirehep.modules.orcid.domain_models

# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2018 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.

from __future__ import absolute_import, division, print_function

import logging

from flask import current_app
from time_execution import time_execution

from inspire_service_orcid import exceptions as orcid_client_exceptions
from inspire_service_orcid.client import OrcidClient

from inspirehep.utils.lock import distributed_lock
from inspirehep.utils.record_getter import (
    RecordGetterError,
    get_db_record,
)

from . import exceptions, utils
from .cache import OrcidCache
from .converter import OrcidConverter
from .putcode_getter import OrcidPutcodeGetter


logger = logging.getLogger(__name__)


[docs]class OrcidPusher(object): def __init__(self, orcid, recid, oauth_token, do_fail_if_duplicated_identifier=False, record_db_version=None): self.orcid = orcid self.recid = recid self.oauth_token = oauth_token self.do_fail_if_duplicated_identifier = do_fail_if_duplicated_identifier self.record_db_version = record_db_version self.inspire_record = self._get_inspire_record() self.cache = OrcidCache(orcid, recid) self.lock_name = 'orcid:{}'.format(self.orcid) self.client = OrcidClient(self.oauth_token, self.orcid) self.converter = None @time_execution def _get_inspire_record(self): try: inspire_record = get_db_record('lit', self.recid) except RecordGetterError as exc: raise exceptions.RecordNotFoundException( 'recid={} not found for pid_type=lit'.format(self.recid), from_exc=exc) # If the record_db_version was given, then ensure we are about to push # the right record version. # This check is related to the fact the orcid push at this moment is # triggered by the signal after_record_update (which happens after a # InspireRecord.commit()). This is not the actual commit to the db which # might happen at a later stage or not at all. # Note that connecting to the proper SQLAlchemy signal would also # have issues: https://github.com/mitsuhiko/flask-sqlalchemy/issues/645 if self.record_db_version and inspire_record.model.version_id < self.record_db_version: raise exceptions.StaleRecordDBVersionException( 'Requested push for db version={}, but actual record db' ' version={}'.format(self.record_db_version, inspire_record.model.version_id) ) return inspire_record @property def _do_force_cache_miss(self): """ Hook to force a cache miss. This can be leveraged in feature tests. """ for note in self.inspire_record.get('_private_notes', []): if note.get('value') == 'orcid-push-force-cache-miss': logger.info('OrcidPusher force cache miss for recid={} and orcid={}'.format( self.recid, self.orcid)) return True return False @property def _is_record_deleted(self): # Hook to force a delete. This can be leveraged in feature tests. for note in self.inspire_record.get('_private_notes', []): if note.get('value') == 'orcid-push-force-delete': logger.info('OrcidPusher force delete for recid={} and orcid={}'.format( self.recid, self.orcid)) return True return self.inspire_record.get('deleted', False) @time_execution
[docs] def push(self): putcode = None if not self._do_force_cache_miss: putcode = self.cache.read_work_putcode() if not self._is_record_deleted and \ not self.cache.has_work_content_changed(self.inspire_record): logger.info('OrcidPusher cache hit for recid={} and orcid={}'.format( self.recid, self.orcid)) return putcode logger.info('OrcidPusher cache miss for recid={} and orcid={}'.format( self.recid, self.orcid)) # If the record is deleted, then delete it. if self._is_record_deleted: self._delete_work(putcode) return None self.converter = OrcidConverter( record=self.inspire_record, url_pattern=current_app.config['LEGACY_RECORD_URL_PATTERN'], put_code=putcode, ) try: putcode = self._post_or_put_work(putcode) except orcid_client_exceptions.WorkAlreadyExistsException: # We POSTed the record as new work, but it failed because the work # already exists (identified by the external identifiers). # This means we do not have the putcode, thus we cache all # author's putcodes and PUT the work again. putcode = self._cache_all_author_putcodes() if not putcode: msg = 'No putcode was found in ORCID API for orcid={} and recid={}.'\ ' And the POST has previously failed for the same recid because'\ ' the work had already existed'.format(self.orcid, self.recid) raise exceptions.PutcodeNotFoundInOrcidException(msg) self._post_or_put_work(putcode) except orcid_client_exceptions.DuplicatedExternalIdentifierException: # We PUT a record changing its identifier, but there is another work # in ORCID with the same identifier. We need to find out the recid # of the clashing work in ORCID and push a fresh version of that # record. # This scenario might be triggered by a merge of 2 records in Inspire. if self.do_fail_if_duplicated_identifier: raise exceptions.DuplicatedExternalIdentifierPusherException self._push_work_with_clashing_identifier() self._post_or_put_work(putcode) self.cache.write_work_putcode(putcode, self.inspire_record) return putcode
@time_execution def _post_or_put_work(self, putcode=None): # Note: if putcode is None, then it's a POST (it means the work is new). # Otherwise a PUT (it means the work already exists and it has the given # putcode). xml_element = self.converter.get_xml(do_add_bibtex_citation=True) # ORCID API allows 1 non-idempotent call only for the same orcid at # the same time. Using `distributed_lock` to achieve this. with distributed_lock(self.lock_name, blocking=True): if putcode: response = self.client.put_updated_work(xml_element, putcode) else: response = self.client.post_new_work(xml_element) utils.log_service_response(logger, response, 'in OrcidPusher for recid={}'.format(self.recid)) try: response.raise_for_result() putcode = response['putcode'] except orcid_client_exceptions.WorkAlreadyExistsException: # Only raisable by a POST. raise except orcid_client_exceptions.DuplicatedExternalIdentifierException: # Only raisable by a PUT. raise except orcid_client_exceptions.BaseOrcidClientJsonException as exc: raise exceptions.InputDataInvalidException(from_exc=exc) return putcode @time_execution def _cache_all_author_putcodes(self): logger.info('New OrcidPusher cache all author putcodes for orcid={}'.format(self.orcid)) putcode_getter = OrcidPutcodeGetter(self.orcid, self.oauth_token) putcodes_recids = list(putcode_getter.get_all_inspire_putcodes_and_recids_iter()) # Can raise exceptions.InputDataInvalidException. putcode = None for fetched_putcode, fetched_recid in putcodes_recids: if fetched_recid == str(self.recid): putcode = int(fetched_putcode) cache = OrcidCache(self.orcid, fetched_recid) cache.write_work_putcode(fetched_putcode) # Ensure the putcode is actually in cache. # Note: this step is not really necessary and it can be skipped, but # at this moment it helps isolate a potential issue. if putcode and not self.cache.read_work_putcode(): raise exceptions.PutcodeNotFoundInCacheAfterCachingAllPutcodes( 'No putcode={} found in cache for recid={} after having' ' cached all author putcodes for orcid={}'.format( self.putcode, self.recid, self.orcid)) return putcode @time_execution def _delete_work(self, putcode=None): putcode = putcode or self._cache_all_author_putcodes() if not putcode: # Such recid does not exists (anymore?) in ORCID API. return # ORCID API allows 1 non-idempotent call only for the same orcid at # the same time. Using `distributed_lock` to achieve this. with distributed_lock(self.lock_name, blocking=True): response = self.client.delete_work(putcode) try: response.raise_for_result() except orcid_client_exceptions.PutcodeNotFoundDeleteException: # Such putcode does not exists (anymore?) in orcid. pass except orcid_client_exceptions.BaseOrcidClientJsonException as exc: raise exceptions.InputDataInvalidException(from_exc=exc) self.cache.delete_work_putcode() @time_execution def _push_work_with_clashing_identifier(self): putcode_getter = OrcidPutcodeGetter(self.orcid, self.oauth_token) ids = self.converter.added_external_identifiers for putcode, recid in putcode_getter.get_putcodes_and_recids_by_identifiers_iter(ids): if not putcode or not recid: continue # Local import to avoid import error. from inspirehep.modules.orcid import tasks # Execute the orcid_push Celery task synchronously. # If max_retries=3, then backoff power 4 is (secs): [4*60, 16*60, 64*60] backoff = lambda retry_count: (4 ** (retry_count + 1)) * 60 # noqa: E731ß utils.apply_celery_task_with_retry( tasks.orcid_push, kwargs={ 'orcid': self.orcid, 'rec_id': recid, 'oauth_token': self.oauth_token, # Set `do_fail_if_duplicated_identifier` to avoid an # infinite recursive calls chain. 'kwargs_to_pusher': dict( do_fail_if_duplicated_identifier=True, record_db_version=self.record_db_version) }, max_retries=5, countdown=backoff, time_limit=10 * 60, )