# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2014-2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Callback blueprint for interaction with legacy."""
from __future__ import absolute_import, division, print_function
import re
from os.path import join
from flask import (
Blueprint,
abort,
current_app,
jsonify,
redirect,
request,
)
from flask.views import MethodView
from flask_login import current_user
from inspire_schemas.api import validate
from inspire_utils.urls import ensure_scheme
from invenio_db import db
from invenio_workflows import (
ObjectStatus,
start,
workflow_object_class,
WorkflowEngine,
)
from invenio_workflows.errors import WorkflowsMissingObject
from jsonschema.exceptions import ValidationError
from inspirehep.modules.records.permissions import RecordPermission
from inspirehep.modules.workflows.errors import (
CallbackError,
CallbackRecordNotFoundError,
CallbackValidationError,
CallbackWorkflowNotInWaitingEditState,
CallbackWorkflowNotFoundError,
CallbackWorkflowNotInMergeState,
CallbackWorkflowNotInValidationError,
)
from inspirehep.modules.workflows.loaders import workflow_loader
from inspirehep.modules.workflows.models import WorkflowsPendingRecord
from inspirehep.modules.workflows.utils import (
get_resolve_validation_callback_url,
get_validation_errors,
)
from inspirehep.utils.record import get_value
from inspirehep.utils.record_getter import get_db_record, RecordGetterError
from inspirehep.utils.tickets import get_rt_link_for_ticket
callback_blueprint = Blueprint(
'inspire_workflows_callbacks',
__name__,
url_prefix="/callback",
template_folder='templates',
static_folder="static",
)
workflow_blueprint = Blueprint(
'inspire_workflows',
__name__,
url_prefix="/workflows",
template_folder='templates',
static_folder="static",
)
@callback_blueprint.errorhandler(CallbackError)
@workflow_blueprint.errorhandler(CallbackError)
[docs]def error_handler(error):
"""Callback error handler."""
response = jsonify(error.to_dict())
return response, error.code
def _get_base_url():
"""Return base URL for generated URLs for remote reference."""
base_url = current_app.config.get(
"LEGACY_ROBOTUPLOAD_URL",
current_app.config["SERVER_NAME"],
)
return ensure_scheme(base_url)
def _continue_workflow(workflow_id, recid, result=None):
"""Small wrapper to continue a workflow.
Will prepare the needed data from the record id and the result data if
passed.
:return: True if succeeded, False if the specified workflow id does not
exist.
"""
result = result if result is not None else {}
base_url = _get_base_url()
try:
workflow_object = workflow_object_class.get(workflow_id)
except WorkflowsMissingObject:
current_app.logger.error(
'No workflow object with the id %s could be found.',
workflow_id,
)
return False
workflow_object.extra_data['url'] = join(
base_url,
'record',
str(recid)
)
workflow_object.extra_data['recid'] = recid
workflow_object.data['control_number'] = recid
workflow_object.extra_data['callback_result'] = result
workflow_object.save()
db.session.commit()
workflow_object.continue_workflow(delayed=True)
return True
def _find_and_continue_workflow(workflow_id, recid, result=None):
workflow_found = _continue_workflow(
workflow_id=workflow_id,
recid=recid,
result=result,
)
if not workflow_found:
current_app.logger.warning(
'The workflow %s was not found.',
workflow_id,
)
return {
'success': False,
'message': 'workflow with id %s not found.' % workflow_id,
}
return {
'success': True,
'message': 'workflow with id %s continued.' % workflow_id,
}
def _put_workflow_in_error_state(workflow_id, error_message, result):
try:
workflow_object = workflow_object_class.get(workflow_id)
except WorkflowsMissingObject:
current_app.logger.error(
'No workflow object with the id %s could be found.',
workflow_id,
)
return {
'success': False,
'message': 'workflow with id %s not found.' % workflow_id,
}
workflow_object.status = ObjectStatus.ERROR
workflow_object.extra_data['callback_result'] = result
workflow_object.extra_data['_error_msg'] = error_message
workflow_object.save()
db.session.commit()
return {
'success': True,
'message': 'workflow %s updated with error.' % workflow_id,
}
@callback_blueprint.route('/workflows/webcoll', methods=['POST'])
[docs]def webcoll_callback():
"""Handle a callback from webcoll with the record ids processed.
Expects the request data to contain a list of record ids in the
recids field.
Example:
An example of callback::
$ curl \\
http://web:5000/callback/workflows/webcoll \\
-H "Host: localhost:5000" \\
-F 'recids=1234'
"""
recids = dict(request.form).get('recids', [])
pending_records = WorkflowsPendingRecord.query.filter(
WorkflowsPendingRecord.record_id.in_(recids)
).all()
response = {}
for pending_record in pending_records:
recid = int(pending_record.record_id)
workflow_id = pending_record.workflow_id
continue_response = _find_and_continue_workflow(
workflow_id=workflow_id,
recid=recid,
)
if continue_response['success']:
current_app.logger.debug(
'Successfully restarted workflow %s',
workflow_id,
)
response[recid] = {
'success': True,
'message': 'Successfully restarted workflow %s' % workflow_id,
}
else:
current_app.logger.debug(
'Error restarting workflow %s: %s',
workflow_id,
continue_response['message'],
)
response[recid] = {
'success': False,
'message': continue_response['message'],
}
db.session.delete(pending_record)
db.session.commit()
return jsonify(response)
def _robotupload_has_error(result):
recid = int(result.get('recid'))
if not result.get('success'):
message = result.get(
'error_message',
'No error message from robotupload.'
)
elif recid < 0:
message = result.get(
'error_message',
'Failed to create record on robotupload.',
)
else:
return False, ''
return True, message
def _is_an_update(workflow_id):
workflow_object = workflow_object_class.get(workflow_id)
return bool(workflow_object.extra_data.get('is-update'))
def _parse_robotupload_result(result, workflow_id):
response = {}
recid = int(result.get('recid'))
result_has_error, error_message = _robotupload_has_error(result)
if result_has_error:
response = {
'success': False,
'message': error_message,
}
return response
already_pending_ones = WorkflowsPendingRecord.query.filter_by(
record_id=recid,
).all()
if already_pending_ones:
current_app.logger.warning(
'The record %s was already found on the pending list.',
recid
)
response = {
'success': False,
'message': 'Recid %s already in pending list.' % recid,
}
return response
if not _is_an_update(workflow_id):
pending_entry = WorkflowsPendingRecord(
workflow_id=workflow_id,
record_id=recid,
)
db.session.add(pending_entry)
db.session.commit()
current_app.logger.debug(
'Successfully added recid:workflow %s:%s to pending list.',
recid,
workflow_id,
)
continue_response = _find_and_continue_workflow(
workflow_id=workflow_id,
recid=recid,
result=result,
)
if continue_response['success']:
current_app.logger.debug(
'Successfully restarted workflow %s',
workflow_id,
)
response = {
'success': True,
'message': 'Successfully restarted workflow %s' % workflow_id,
}
else:
current_app.logger.debug(
'Error restarting workflow %s: %s',
workflow_id,
continue_response['message'],
)
response = {
'success': False,
'message': continue_response['message'],
}
return response
@callback_blueprint.route('/workflows/robotupload', methods=['POST'])
[docs]def robotupload_callback():
"""Handle callback from robotupload.
If robotupload was successful caches the workflow
object id that corresponds to the uploaded record,
so the workflow can be resumed when webcoll finish
processing that record.
If robotupload encountered an error sends an email
to site administrator informing him about the error.
Examples:
An example of failed callback that did not get to create a recid (the
"nonce" is the workflow id)::
$ curl \\
http://web:5000/callback/workflows/robotupload \\
-H "Host: localhost:5000" \\
-H "Content-Type: application/json" \\
-d '{
"nonce": 1,
"results": [
{
"recid":-1,
"error_message": "Record already exists",
"success": false
}
]
}'
One that created the recid, but failed later::
$ curl \\
http://web:5000/callback/workflows/robotupload \\
-H "Host: localhost:5000" \\
-H "Content-Type: application/json" \\
-d '{
"nonce": 1,
"results": [
{
"recid":1234,
"error_message": "Unable to parse pdf.",
"success": false
}
]
}'
A successful one::
$ curl \\
http://web:5000/callback/workflows/robotupload \\
-H "Host: localhost:5000" \\
-H "Content-Type: application/json" \\
-d '{
"nonce": 1,
"results": [
{
"recid":1234,
"error_message": "",
"success": true
}
]
}'
"""
request_data = request.get_json()
workflow_id = request_data.get('nonce', '')
results = request_data.get('results', [])
responses = {}
for result in results:
recid = int(result.get('recid'))
if recid in responses:
# this should never happen
current_app.logger.warning('Received duplicated recid: %s', recid)
continue
response = _parse_robotupload_result(
result=result,
workflow_id=workflow_id,
)
if not response['success']:
error_set_result = _put_workflow_in_error_state(
workflow_id=workflow_id,
error_message='Error in robotupload: %s' % response['message'],
result=result,
)
if not error_set_result['success']:
response['message'] += (
'\nFailed to put the workflow in error state:%s' %
error_set_result['message']
)
responses[recid] = response
return jsonify(responses)
def _validate_workflow_schema(workflow_data):
"""Validate the ``metadata`` against the ``hep`` JSONSchema.
Args:
workflow_data (dist): the workflow dict.
Raises:
CallbackValidationError: if the workflow ``metadata`` is not valid
against ``hep`` JSONSchema.
"""
# Check for validation errors
try:
validate(workflow_data['metadata'])
except ValidationError:
workflow_data['_extra_data']['validation_errors'] = \
get_validation_errors(workflow_data['metadata'], 'hep')
workflow_data['_extra_data']['callback_url'] = \
get_resolve_validation_callback_url()
raise CallbackValidationError(workflow_data)
[docs]class ResolveMergeResource(MethodView):
"""Resolve merge callback.
When the workflow needs to resolve conficts, the workflow stops in
``HALTED`` state, to continue this endpoint is called. If it's called
and the conflicts are not resolved it will just save the workflow.
Args:
workflow_data (dict): the workflow object send in the request's payload.
"""
[docs] def put(self):
"""Handle callback for merge conflicts."""
workflow_data = workflow_loader()
workflow_id = workflow_data['id']
try:
workflow = workflow_object_class.get(workflow_id)
except WorkflowsMissingObject:
raise CallbackWorkflowNotFoundError(workflow_id)
if workflow.status != ObjectStatus.HALTED or \
'callback_url' not in workflow.extra_data:
raise CallbackWorkflowNotInMergeState(workflow.id)
conflicts = get_value(workflow_data['_extra_data'], 'conflicts', default=[])
workflow.data = workflow_data['metadata']
workflow.extra_data['conflicts'] = conflicts
if not conflicts:
workflow.status = ObjectStatus.RUNNING
workflow.extra_data.pop('callback_url', None)
workflow.extra_data.pop('conflicts', None)
workflow.save()
db.session.commit()
workflow.continue_workflow(delayed=True)
data = {
'message': 'Workflow {} is continuing.'.format(workflow.id),
}
return jsonify(data), 200
# just save
data = {
'message': 'Workflow {} has been saved with conflicts.'.format(workflow.id),
}
workflow.save()
db.session.commit()
return jsonify(data), 200
[docs]class ResolveValidationResource(MethodView):
"""Resolve validation error callback."""
[docs] def put(self):
"""Handle callback from validation errors.
When validation errors occur, the workflow stops in ``ERROR`` state, to
continue this endpoint is called.
Args:
workflow_data (dict): the workflow object send in the
request's payload.
Examples:
An example of successful call:
$ curl \\
http://web:5000/callback/workflows/resolve_validation_errors \\
-H "Host: localhost:5000" \\
-H "Content-Type: application/json" \\
-d '{
"_extra_data": {
... extra data content
},
"id": 910648,
"metadata": {
"$schema": "https://labs.inspirehep.net/schemas/records/hep.json",
... record content
}
}'
The response:
HTTP 200 OK
{"mesage": "Workflow 910648 validated, continuing it."}
A failed example:
$ curl \\
http://web:5000/callback/workflows/resolve_validation_errors \\
-H "Host: localhost:5000" \\
-H "Content-Type: application/json" \\
-d '{
"_extra_data": {
... extra data content
},
"id": 910648,
"metadata": {
"$schema": "https://labs.inspirehep.net/schemas/records/hep.json",
... record content
}
}'
The error response will contain the workflow that was passed, with the
new validation errors:
HTTP 400 Bad request
{
"_extra_data": {
"validatior_errors": [
{
"path": ["path", "to", "error"],
"message": "required: ['missing_key1', 'missing_key2']"
}
],
... rest of extra data content
},
"id": 910648,
"metadata": {
"$schema": "https://labs.inspirehep.net/schemas/records/hep.json",
... record content
}
}
"""
workflow_data = workflow_loader()
_validate_workflow_schema(workflow_data)
workflow_id = workflow_data['id']
try:
workflow = workflow_object_class.get(workflow_id)
except WorkflowsMissingObject:
raise CallbackWorkflowNotFoundError(workflow_id)
if workflow.status != ObjectStatus.ERROR or \
'callback_url' not in workflow.extra_data:
raise CallbackWorkflowNotInValidationError(workflow_id)
workflow.data = workflow_data['metadata']
workflow.status = ObjectStatus.RUNNING
workflow.extra_data.pop('callback_url', None)
workflow.extra_data.pop('validation_errors', None)
workflow.save()
db.session.commit()
workflow.continue_workflow(delayed=True)
data = {
'message': 'Workflow {} is continuing.'.format(workflow.id),
}
return jsonify(data), 200
[docs]class ResolveEditArticleResource(MethodView):
"""Resolve `edit_article` callback.
When the workflow needs to resolve conficts, the workflow stops in
``HALTED`` state, to continue this endpoint is called. If it's called
and the conflicts are not resolved it will just save the workflow.
Args:
workflow_data (dict): the workflow object send in the request's payload.
"""
[docs] def put(self):
"""Handle callback for merge conflicts."""
workflow_data = workflow_loader()
workflow_id = workflow_data['id']
try:
workflow = workflow_object_class.get(workflow_id)
except WorkflowsMissingObject:
raise CallbackWorkflowNotFoundError(workflow_id)
if workflow.status != ObjectStatus.WAITING or \
'callback_url' not in workflow.extra_data:
raise CallbackWorkflowNotInWaitingEditState(workflow.id)
recid = workflow_data['metadata'].get('control_number')
try:
record = get_db_record('lit', recid)
except RecordGetterError:
raise CallbackRecordNotFoundError(recid)
record_permission = RecordPermission.create(action='update', record=record)
if not record_permission.can():
abort(403, record_permission)
workflow_id = workflow.id
workflow.data = workflow_data['metadata']
workflow.status = ObjectStatus.RUNNING
workflow.extra_data.pop('callback_url', None)
workflow.save()
db.session.commit()
workflow.continue_workflow(delayed=True)
ticket_id = workflow_data['_extra_data'].get('curation_ticket_id')
if ticket_id:
redirect_url = get_rt_link_for_ticket(ticket_id)
else:
redirect_url = '%s://%s/' % (request.scheme, request.host)
data = {
'message': 'Workflow {} is continuing.'.format(workflow_id),
'redirect_url': redirect_url,
}
return jsonify(data), 200
[docs]def start_edit_article_workflow(recid):
try:
record = get_db_record('lit', recid)
except RecordGetterError:
raise CallbackRecordNotFoundError(recid)
record_permission = RecordPermission.create(action='update', record=record)
if not record_permission.can():
abort(403, record_permission)
# has to be done before start() since, it is deattaching this session
user_id = current_user.get_id()
eng_uuid = start('edit_article', data=record)
workflow_id = WorkflowEngine.from_uuid(eng_uuid).objects[0].id
workflow = workflow_object_class.get(workflow_id)
workflow.id_user = user_id
if request.referrer:
base_rt_url = get_rt_link_for_ticket('').replace('?', '\?')
ticket_match = re.match(base_rt_url + '(?P<ticket_id>\d+)', request.referrer)
if ticket_match:
ticket_id = int(ticket_match.group('ticket_id'))
workflow.extra_data['curation_ticket_id'] = ticket_id
workflow.save()
db.session.commit()
url = "{}{}".format(current_app.config['WORKFLOWS_EDITOR_API_URL'], workflow_id)
return redirect(location=url, code=302)
@workflow_blueprint.route('/inspect_merge/<int:holdingpen_id>', methods=['GET'])
[docs]def inspect_merge(holdingpen_id):
wf = workflow_object_class.get(holdingpen_id)
revision_id = wf.extra_data.get('merger_head_revision', None)
if revision_id is None:
abort(400, 'Cannot inspect merge operation on this workflow')
root = wf.extra_data.get('merger_original_root')
update = wf.extra_data['merger_root']
merged = get_db_record('lit', wf.data['control_number'])
# XXX merged.revisions[revision_id] should work if not for the messed up
# non-consecutive versions in prod
head = merged.model.versions.filter_by(version_id=(revision_id + 1)).one().json
return jsonify(
root=root,
head=head,
update=update,
merged=merged
)
callback_resolve_validation = ResolveValidationResource.as_view(
'callback_resolve_validation')
callback_resolve_merge_conflicts = ResolveMergeResource.as_view(
'callback_resolve_merge_conflicts')
callback_resolve_edit_article = ResolveEditArticleResource.as_view(
'callback_resolve_edit_article')
callback_blueprint.add_url_rule(
'/workflows/resolve_validation_errors',
view_func=callback_resolve_validation,
)
callback_blueprint.add_url_rule(
'/workflows/resolve_merge_conflicts',
view_func=callback_resolve_merge_conflicts,
)
workflow_blueprint.add_url_rule(
'/edit_article/<recid>',
view_func=start_edit_article_workflow,
)
callback_blueprint.add_url_rule(
'/workflows/resolve_edit_article',
view_func=callback_resolve_edit_article
)