# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Tasks to check if the incoming record already exist."""
from __future__ import absolute_import, division, print_function
from flask import current_app
from invenio_db import db
from invenio_workflows import workflow_object_class, WorkflowEngine
from invenio_workflows.errors import WorkflowsError
from inspire_matcher.api import match
from inspire_utils.dedupers import dedupe_list
from inspirehep.utils.record import get_arxiv_categories, get_value
from inspirehep.modules.workflows.tasks.actions import mark
from ..utils import with_debug_logging
@with_debug_logging
[docs]def exact_match(obj, eng):
"""Return ``True`` if the record is already present in the system.
Uses the default configuration of the ``inspire-matcher`` to find
duplicates of the current workflow object in the system.
Also sets the ``matches.exact`` property in ``extra_data`` to the list of
control numbers that matched.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Returns:
bool: ``True`` if the workflow object has a duplicate in the system
``False`` otherwise.
"""
exact_match_config = current_app.config['EXACT_MATCH']
matches = dedupe_list(match(obj.data, exact_match_config))
record_ids = [el['_source']['control_number'] for el in matches]
obj.extra_data.setdefault('matches', {})['exact'] = record_ids
return bool(record_ids)
@with_debug_logging
[docs]def fuzzy_match(obj, eng):
"""Return ``True`` if a similar record is found in the system.
Uses a custom configuration for ``inspire-matcher`` to find records
similar to the current workflow object's payload in the system.
Also sets the ``matches.fuzzy`` property in ``extra_data`` to the list of
the brief of first 5 record that matched.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Returns:
bool: ``True`` if the workflow object has a duplicate in the system
``False`` otherwise.
"""
if not current_app.config.get('FEATURE_FLAG_ENABLE_FUZZY_MATCHER'):
return False
fuzzy_match_config = current_app.config['FUZZY_MATCH']
matches = dedupe_list(match(obj.data, fuzzy_match_config))
record_ids = [_get_hep_record_brief(el['_source']) for el in matches]
obj.extra_data.setdefault('matches', {})['fuzzy'] = record_ids[0:5]
return bool(record_ids)
def _get_hep_record_brief(hep_record):
brief = {
'control_number': hep_record['control_number'],
'title': get_value(hep_record, 'titles[0].title'),
}
abstract = get_value(hep_record, 'abstracts[0].value')
if abstract is not None:
brief['abstract'] = abstract
arxiv_eprint = get_value(hep_record, 'arxiv_eprints[0].value')
if arxiv_eprint is not None:
brief['arxiv_eprint'] = arxiv_eprint
number_of_pages = get_value(hep_record, 'number_of_pages')
if number_of_pages is not None:
brief['number_of_pages'] = number_of_pages
earliest_date = get_value(hep_record, 'earliest_date')
if earliest_date is not None:
brief['earliest_date'] = earliest_date
authors = hep_record.get('authors')
if authors is not None:
brief['authors_count'] = len(authors)
author_briefs = []
for author in authors[:3]:
author_briefs.append({'full_name': author['full_name']})
brief['authors'] = author_briefs
public_notes = hep_record.get('public_notes')
if public_notes is not None:
public_notes_value = []
for public_note in public_notes:
public_notes_value.append({'value': public_note['value']})
brief['public_notes'] = public_notes_value
publication_info = hep_record.get('publication_info')
if publication_info is not None:
brief['publication_info'] = publication_info
return brief
@with_debug_logging
[docs]def is_fuzzy_match_approved(obj, eng):
"""Check if a fuzzy match has been approved by a human."""
return obj.extra_data.get('fuzzy_match_approved_id')
@with_debug_logging
@with_debug_logging
[docs]def auto_approve(obj, eng):
"""Check if auto approve the current ingested article.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Return:
bool: True when the record belongs to an arXiv category that is fully
harvested or if the primary category is `physics.data-an`, otherwise
False.
"""
return has_fully_harvested_category(obj.data) or physics_data_an_is_primary_category(obj.data)
[docs]def has_fully_harvested_category(record):
"""Check if the record in `obj.data` has fully harvested categories.
Arguments:
record(dict): the ingested article.
Return:
bool: True when the record belongs to an arXiv category that is fully
harvested, otherwise False.
"""
record_categories = set(get_arxiv_categories(record))
harvested_categories = current_app.config.get('ARXIV_CATEGORIES', {})
return len(
record_categories &
set(
harvested_categories.get('core') +
harvested_categories.get('non-core')
)
) > 0
[docs]def physics_data_an_is_primary_category(record):
record_categories = get_arxiv_categories(record)
if record_categories:
return record_categories[0] == 'physics.data-an'
return False
@with_debug_logging
[docs]def match_non_completed_wf_in_holdingpen(obj, eng):
"""Return ``True`` if a matching wf is processing in the HoldingPen.
Uses a custom configuration of the ``inspire-matcher`` to find duplicates
of the current workflow object in the Holding Pen not in the
COMPLETED state.
Also sets ``holdingpen_matches`` in ``extra_data`` to the list of ids that
matched.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Returns:
bool: ``True`` if the workflow object has a duplicate in the Holding
Pen that is not COMPLETED, ``False`` otherwise.
"""
def _non_completed(base_record, match_result):
return not get_value(match_result, '_source._workflow.status') == 'COMPLETED'
matched_ids = pending_in_holding_pen(obj, _non_completed)
obj.extra_data['holdingpen_matches'] = matched_ids
return bool(matched_ids)
[docs]def raise_if_match_wf_in_error_or_initial(obj, eng):
"""Raise if a matching wf is in ERROR or INITIAL state in the HoldingPen.
Uses a custom configuration of the ``inspire-matcher`` to find duplicates
of the current workflow object in the Holding Pen not in the
that are in ERROR or INITIAL state.
If any match is found, it sets ``error_workflows_matched`` in
``extra_data`` to the list of ids that matched and raise an error.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
def _filter(base_record, match_result):
return get_value(
match_result, '_source._workflow.status'
) in ('ERROR', 'INITIAL')
matched_ids = pending_in_holding_pen(obj, _filter)
if bool(matched_ids):
obj.extra_data['error_workflows_matched'] = matched_ids
raise WorkflowsError(
'Cannot continue processing. Found workflows in ERROR or INITIAL '
'state: {}'.format(matched_ids)
)
[docs]def match_previously_rejected_wf_in_holdingpen(obj, eng):
"""Return ``True`` if matches a COMPLETED and rejected wf in the HoldingPen.
Uses a custom configuration of the ``inspire-matcher`` to find duplicates
of the current workflow object in the Holding Pen in the
COMPLETED state, marked as ``approved = False``.
Also sets ``holdingpen_matches`` in ``extra_data`` to the list of ids that
matched.
Arguments:
obj: a workflow object.
eng: a workflow engine.
Returns:
bool: ``True`` if the workflow object has a duplicate in the Holding
Pen that is not COMPLETED, ``False`` otherwise.
"""
def _rejected_and_completed(base_record, match_result):
return get_value(match_result, '_source._workflow.status') == 'COMPLETED' and \
get_value(match_result, '_source._extra_data.approved') is False
matched_ids = pending_in_holding_pen(obj, _rejected_and_completed)
obj.extra_data['previously_rejected_matches'] = matched_ids
return bool(matched_ids)
@with_debug_logging
[docs]def pending_in_holding_pen(obj, validation_func):
"""Return the list of matching workflows in the holdingpen.
Matches the holdingpen records by their ``arxiv_eprint``, their ``doi``,
and by a custom validator function.
Args:
obj: a workflow object.
validation_func: a function used to filter the matched records.
Returns:
(list): the ids matching the current ``obj`` that satisfy
``validation_func``.
"""
config = {
'algorithm': [
{
'queries': [
{
'path': 'arxiv_eprints.value',
'search_path': 'metadata.arxiv_eprints.value.raw',
'type': 'exact',
},
{
'path': 'dois.value',
'search_path': 'metadata.dois.value.raw',
'type': 'exact',
},
],
'validator': validation_func,
},
],
'doc_type': 'hep',
'index': 'holdingpen-hep',
}
matches = dedupe_list(match(obj.data, config))
return [int(el['_id']) for el in matches if int(el['_id']) != obj.id]
@with_debug_logging
[docs]def delete_self_and_stop_processing(obj, eng):
"""Delete both versions of itself and stops the workflow."""
db.session.delete(obj.model)
eng.skip_token()
@with_debug_logging
[docs]def stop_processing(obj, eng):
"""Stop processing the given workflow.
Stops the given workflow engine. This causes the stop of all the workflows
related to it.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
eng.stop()
[docs]def has_same_source(extra_data_key):
"""Match a workflow in obj.extra_data[`extra_data_key`] by the source.
Takes a list of workflows from extra_data using as key `extra_data_key`
and goes through them checking if at least one workflow has the same source
of the current workflow object.
Args:
extra_data_key: the key to retrieve a workflow list from the current
workflow object.
Returns:
bool: True if a workflow, whose id is in obj.extra_data[
`extra_data_key`], matches the current workflow by the source.
"""
def _get_wfs_same_source(obj, eng):
current_source = get_value(obj.data, 'acquisition_source.source').lower()
try:
workflows = obj.extra_data[extra_data_key]
except KeyError:
workflows = []
for wf_id in workflows:
wf = workflow_object_class.get(wf_id)
wf_source = get_value(wf.data, 'acquisition_source.source').lower()
if wf_source == current_source:
return True
return False
return _get_wfs_same_source
[docs]def stop_matched_holdingpen_wfs(obj, eng):
"""Stop the matched workflow objects in the holdingpen.
Stops the matched workflows in the holdingpen by replacing their steps with
a new one defined on the fly, containing a ``stop`` step, and executing it.
For traceability reason, these workflows are also marked as
``'stopped-by-wf'``, whose value is the current workflow's id.
In the use case of harvesting twice an article, this function is involved
to stop the first workflow and let the current one being processed,
since it the latest metadata.
Args:
obj: a workflow object.
eng: a workflow engine.
Returns:
None
"""
stopping_steps = [mark('stopped-by-wf', int(obj.id)), stop_processing]
obj.save()
for holdingpen_wf_id in obj.extra_data['holdingpen_matches']:
holdingpen_wf = workflow_object_class.get(holdingpen_wf_id)
holdingpen_wf_eng = WorkflowEngine.from_uuid(holdingpen_wf.id_workflow)
# stop this holdingpen workflow by replacing its steps with a stop step
holdingpen_wf_eng.callbacks.replace(stopping_steps)
holdingpen_wf_eng.process([holdingpen_wf])
@with_debug_logging
[docs]def has_more_than_one_exact_match(obj, eng):
"""Does the record have more than one exact match."""
exact_matches = obj.extra_data['matches']['exact']
return len(set(exact_matches)) > 1