# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Inspire Records"""
from __future__ import absolute_import, division, print_function
from copy import deepcopy
from datetime import datetime
import uuid
import arrow
from elasticsearch.exceptions import NotFoundError
from flask import current_app
from fs.opener import fsopen
from six.moves.urllib.parse import urlparse, unquote
from invenio_records.api import RecordMetadata
from inspire_dojson.utils import get_recid_from_ref, strip_empty_values, absolute_url
from inspire_schemas.api import validate
from inspire_schemas.builders import LiteratureBuilder
from invenio_files_rest.models import Bucket
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier
from invenio_records_files.api import Record
from invenio_db import db
from sqlalchemy import Text, or_, not_, cast, type_coerce
from sqlalchemy.dialects.postgresql import JSONB, ARRAY
from sqlalchemy.sql.functions import GenericFunction
from inspirehep.modules.pidstore.minters import inspire_recid_minter
from inspirehep.modules.pidstore.utils import get_pid_type_from_schema, get_endpoint_from_pid_type
from inspirehep.modules.records.utils import get_pid_from_record_uri, populate_earliest_date
from inspirehep.utils.record_getter import (
RecordGetterError,
get_es_record_by_uuid
)
MAX_UNIQUE_KEY_COUNT = 50000
[docs]class referenced_records(GenericFunction):
type = ARRAY(Text)
[docs]class InspireRecord(Record):
"""Record class that fetches records from DataBase."""
@classmethod
[docs] def create(cls, data, id_=None, **kwargs):
"""Override the default ``create``.
To handle also the docmuments and figures retrieval.
Note:
Might create an extra revision in the record if it had to download
any documents or figures.
Keyword Args:
id_(uuid): an optional uuid to assign to the created record object.
files_src_records(List[InspireRecord]): if passed, it will try to
get the files for the documents and figures from the first
record in the list that has it in it's files iterator before
downloading them, for example to merge existing
records.
skip_files(bool): if ``True`` it will skip the files retrieval
described above. Note also that, if not passed, it will fall
back to the value of the ``RECORDS_SKIP_FILES`` configuration
variable.
Examples:
>>> record = {
... '$schema': 'hep.json',
... }
>>> record = InspireRecord.create(record)
>>> record.commit()
"""
files_src_records = kwargs.pop('files_src_records', [])
skip_files = kwargs.pop(
'skip_files', current_app.config.get('RECORDS_SKIP_FILES'))
id_ = id_ or uuid.uuid4()
data = strip_empty_values(data)
with db.session.begin_nested():
cls.mint(id_, data)
record = super(InspireRecord, cls).create(data, id_=id_, **kwargs)
if not skip_files:
record.download_documents_and_figures(
src_records=files_src_records,
)
return record
@classmethod
[docs] def create_or_update(cls, data, **kwargs):
"""Create or update a record.
It will check if there is any record registered with the same
``control_number`` and ``pid_type``. If it's ``True``, it will update
the current record, otherwise it will create a new one.
Keyword Args:
files_src_records(List[InspireRecord]): if passed, it will try to
get the files for the documents and figures from the first
record in the list that has it in it's files iterator before
downloading them, for example to merge existing
records.
skip_files(bool): if ``True`` it will skip the files retrieval
described above. Note also that, if not passed, it will fall
back to the value of the ``RECORDS_SKIP_FILES`` configuration
variable.
Examples:
>>> record = {
... '$schema': 'hep.json',
... }
>>> record = InspireRecord.create_or_update(record)
>>> record.commit()
"""
pid_type = get_pid_type_from_schema(data['$schema'])
control_number = data.get('control_number')
files_src_records = kwargs.pop('files_src_records', [])
skip_files = kwargs.pop(
'skip_files', current_app.config.get('RECORDS_SKIP_FILES'))
try:
pid = PersistentIdentifier.get(pid_type, control_number)
record = super(InspireRecord, cls).get_record(pid.object_uuid)
record.clear()
record.update(data, skip_files=skip_files, **kwargs)
if data.get('legacy_creation_date'):
record.model.created = datetime.strptime(data['legacy_creation_date'], '%Y-%m-%d')
except PIDDoesNotExistError:
record = cls.create(data, skip_files=skip_files, **kwargs)
if data.get('legacy_creation_date'):
record.model.created = datetime.strptime(data['legacy_creation_date'], '%Y-%m-%d')
if data.get('deleted'):
new_recid = get_recid_from_ref(data.get('new_record'))
if not new_recid:
record.delete()
if not skip_files:
record.download_documents_and_figures(
src_records=files_src_records,
)
return record
[docs] def update(self, data, **kwargs):
"""Override the default ``update``.
To handle also the docmuments and figures retrieval.
Keyword Args:
files_src_records(InspireRecord): if passed, it will try to get the
files for the documents and figures from this record's files
iterator before downloading them, for example to merge existing
records.
skip_files(bool): if ``True`` it will skip the files retrieval
described above. Note also that, if not passed, it will fall
back to the value of the ``RECORDS_SKIP_FILES`` configuration
variable.
"""
files_src_records = kwargs.pop('files_src_records', ())
skip_files = kwargs.pop(
'skip_files', current_app.config.get('RECORDS_SKIP_FILES'))
super(InspireRecord, self).update(data, **kwargs)
if not skip_files:
self.download_documents_and_figures(
src_records=files_src_records,
only_new=True,
)
[docs] def merge(self, other):
"""Redirect pidstore of current record to the other InspireRecord.
Args:
other(InspireRecord): The record that self(record) is going to be
redirected.
"""
pids_deleted = PersistentIdentifier.query.filter(
PersistentIdentifier.object_uuid == self.id
).all()
pid_merged = PersistentIdentifier.query.filter(
PersistentIdentifier.object_uuid == other.id
).one()
with db.session.begin_nested():
for pid in pids_deleted:
pid.redirect(pid_merged)
db.session.add(pid)
[docs] def delete(self):
"""Mark as deleted all pidstores for a specific record."""
pids = PersistentIdentifier.query.filter(
PersistentIdentifier.object_uuid == self.id
).all()
with db.session.begin_nested():
for pid in pids:
pid.delete()
db.session.add(pid)
self['deleted'] = True
def _delete(self, *args, **kwargs):
super(InspireRecord, self).delete(*args, **kwargs)
def _create_bucket(self, location=None, storage_class=None):
"""Create file bucket for workflow object."""
if location is None:
location = current_app.config[
'RECORDS_DEFAULT_FILE_LOCATION_NAME'
]
if storage_class is None:
storage_class = current_app.config[
'RECORDS_DEFAULT_STORAGE_CLASS'
]
bucket = Bucket.create(
location=location,
storage_class=storage_class
)
return bucket
[docs] def validate(self):
"""Validate the record, also ensuring format compliance."""
validate(self)
@staticmethod
[docs] def mint(id_, data):
"""Mint the record."""
return inspire_recid_minter(id_, data)
def _resolve_doc_or_fig_url(
self,
doc_or_fig_obj,
src_records=(),
only_new=False,
):
"""Resolves the given document url according to the current record.
It will fill it up with the local url in case it's found in the given
src_records.
Returns:
dict: with the medatada of the given ``doc_or_fig_obj`` updated as
follows:
* In the case it has to be downloaded: the url is set to
something that you can use fsopen on.
* In the case it does not have to be downloaded: the url is
left intact pointing to the /api/files endpoint.
Raises:
Exception: if the url of the given ``doc_or_fig_obj`` is
unresolvable.
"""
doc_or_fig_obj = deepcopy(doc_or_fig_obj)
key = doc_or_fig_obj['key']
src_record_file = next(
(
rec.files[key].file.uri
for rec in src_records
if key in rec.files
),
None,
)
def _should_take_from_src_records(
self,
key,
src_record_file,
only_new,
):
should_take = False
if (
not only_new and
src_record_file
):
should_take = True
elif (
only_new and
src_record_file and
key not in self.files
):
should_take = True
return should_take
def _already_there(self, only_new):
return (
only_new and
key in self.files
)
def _is_internal_document(doc_or_fig_obj):
return doc_or_fig_obj['url'].startswith('/api/files/')
def _get_meaningful_exception(
self,
key,
src_record_file,
only_new,
doc_or_fig_obj,
):
exception = None
if not src_record_file and key not in self.files:
exception = Exception(
"Bad document %s, refers to an already downloaded "
"url, but none found in this record nor in the passed "
"src_records."
% doc_or_fig_obj
)
elif not only_new and not src_record_file:
exception = Exception(
"Can't download %s, as no src_records were passed that "
"contained it, and only_new was set to False."
% doc_or_fig_obj
)
else:
exception = Exception(
"Unexpected document resolution error for document %s"
% doc_or_fig_obj
)
return exception
if not _is_internal_document(doc_or_fig_obj):
return doc_or_fig_obj
if _should_take_from_src_records(self, key, src_record_file, only_new):
doc_or_fig_obj['url'] = src_record_file
return doc_or_fig_obj
if _already_there(self, only_new):
return doc_or_fig_obj
raise _get_meaningful_exception(
self,
key,
src_record_file,
only_new,
doc_or_fig_obj,
)
def _download_to_docs_or_figs(
self,
document=None,
figure=None,
src_records=(),
only_new=False,
):
if not document and not figure:
raise TypeError(
'No document nor figure passed, at least one is needed.'
)
is_document = bool(document)
doc_or_fig_obj = self._resolve_doc_or_fig_url(
doc_or_fig_obj=document or figure,
src_records=src_records,
only_new=only_new,
)
if doc_or_fig_obj['url'].startswith('/api/files/'):
return self.add_document_or_figure(
metadata=doc_or_fig_obj,
key=doc_or_fig_obj['key'],
is_document=is_document,
)
key = doc_or_fig_obj['key']
if key not in self.files:
key = self._get_unique_files_key(base_file_name=key)
url = doc_or_fig_obj['url']
scheme = urlparse(url).scheme
if scheme == 'file':
url = unquote(url)
stream = fsopen(url, mode='rb')
return self.add_document_or_figure(
metadata=doc_or_fig_obj,
key=key,
stream=stream,
is_document=is_document,
)
def _get_unique_files_key(self, base_file_name):
def _strip_old_control_number(base_name):
base_name = base_name.split('_', 1)[-1]
return base_name
def _append_current_control_number(control_number, base_name):
prefix = '%s_' % control_number
if not base_name.startswith(prefix):
base_name = _strip_old_control_number(base_name)
prepended_key = '%s%s' % (prefix, base_name)
else:
prepended_key = base_name
return prepended_key
prepended_key = _append_current_control_number(
self['control_number'],
base_file_name,
)
new_key = prepended_key
count = 1
while new_key in self.files:
new_key = '%s_%s' % (prepended_key, count)
count += 1
# This should never happen, but just in case to abort infinite
# loops we add this safeguard.
if count > MAX_UNIQUE_KEY_COUNT:
raise Exception(
'Unable to find a unique key in the first %s, aborting.'
% MAX_UNIQUE_KEY_COUNT
)
return new_key
def _get_ref(self):
"""Returns full url to this object (as in $ref)"""
pid_value = self.get('control_number')
pid_type = get_pid_type_from_schema(self.get('$schema'))
endpoint = get_endpoint_from_pid_type(pid_type)
return absolute_url(u'/api/{endpoint}/{control_number}'.format(endpoint=endpoint,
control_number=pid_value))
def _get_index_ref(self):
"""Return shorten version of pid_value/pid_type for proper querying with index"""
pid_value = self.get('control_number')
pid_type = get_pid_type_from_schema(self.get('$schema'))
endpoint = get_endpoint_from_pid_type(pid_type)
shorten_endpoint = endpoint[:3]
return "{pid_value}{shorten_endpoint}".format(pid_value=pid_value,
shorten_endpoint=shorten_endpoint)
def _query_citing_records(self, show_duplicates=False):
"""Returns records which cites this one."""
index_ref = self._get_index_ref()
if not index_ref:
raise Exception("There is no index_ref for this object")
citation_query = RecordMetadata.query.with_entities(RecordMetadata.id,
RecordMetadata.json['control_number'])
citation_filter = referenced_records(RecordMetadata.json).contains([index_ref])
filter_deleted_records = or_(not_(type_coerce(RecordMetadata.json, JSONB).has_key('deleted')), # noqa: W601
not_(RecordMetadata.json['deleted'] == cast(True, JSONB)))
only_literature_collection = type_coerce(RecordMetadata.json, JSONB)['_collections'].contains(['Literature'])
citations = citation_query.filter(citation_filter,
filter_deleted_records,
only_literature_collection)
if not show_duplicates:
# It just hides duplicates, and still can show citations
# which do not have proper PID in PID store
# Duplicated data should be removed with the CLI command
citations = citations.distinct(RecordMetadata.json['control_number'])
return citations
@property
def get_citing_records_query(self):
return self._query_citing_records()
[docs] def get_citations_count(self, show_duplicates=False):
"""Returns citations count for this record."""
count = self._query_citing_records(show_duplicates).count()
return count
[docs] def dumps(self):
"""Returns a dict 'representation' of the record.
Note: this is not suitable to create a new record from it, as the
representation will include some extra fields that should not be
present in the record's json, see the 'to_dict' method instead.
"""
base_dict = super(InspireRecord, self).dumps()
populate_earliest_date(base_dict)
return base_dict
[docs] def to_dict(self):
"""Gets a deep copy of the record's json."""
return deepcopy(dict(self))
[docs] def get_modified_references(self):
"""Return the ids of the references diff between the latest and the
previous version.
The diff includes references added or deleted. Changes in a
reference's content won't be detected.
Also, it detects if record was deleted/un-deleted compared to the
previous version and, in such cases, returns the full list of
references.
References not linked to any record will be ignored.
Note: record should be committed to DB in order to correctly get the
previous version.
Returns:
Set[Tuple[str, int]]: pids of references changed from the previous
version.
"""
def _get_ids_from_refs(references):
return set([
get_pid_from_record_uri(ref['record']['$ref'])
for ref in references
if 'record' in ref
])
try:
prev_version = self.model.versions.filter_by(
version_id=self.model.version_id).one().previous.json
except AttributeError:
prev_version = {}
changed_deleted_status = self.get('deleted', False) ^ prev_version.get('deleted', False)
if changed_deleted_status:
return _get_ids_from_refs(self.get('references', []))
ids_latest = _get_ids_from_refs(self.get('references', []))
ids_oldest = _get_ids_from_refs(prev_version.get('references', []))
return set.symmetric_difference(ids_latest, ids_oldest)
[docs]class ESRecord(InspireRecord):
"""Record class that fetches records from ElasticSearch."""
@classmethod
[docs] def get_record(cls, object_uuid, with_deleted=False):
"""Get record instance from ElasticSearch."""
try:
return cls(get_es_record_by_uuid(object_uuid))
except RecordGetterError as e:
if isinstance(e.cause, NotFoundError):
# Raise this error so the interface will render a 404 page
# rather than a 500
raise PIDDoesNotExistError('es_record', object_uuid)
else:
raise
@property
def updated(self):
"""Get last updated timestamp."""
if self.get('_updated'):
return arrow.get(self['_updated']).naive
else:
return datetime.utcnow()