Source code for inspirehep.modules.refextract.utils
# -*- coding: utf-8 -*-
#
# This file is part of INSPIRE.
# Copyright (C) 2017 CERN.
#
# INSPIRE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# INSPIRE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
"""Refextract utils."""
from __future__ import absolute_import, division, print_function
import re
import codecs
from tempfile import TemporaryFile
from fs.opener import fsopen
from inspirehep.utils.url import copy_file
RE_ALPHANUMERIC = re.compile('\W+', re.UNICODE)
[docs]class KbWriter(object):
def __init__(self, kb_path):
self.kb_path = kb_path
[docs] def add_entry(self, value, kb_key):
kb_line = self._get_kb_line(
raw_title=value,
kb_key=kb_key,
)
if kb_line:
self.local_file.write(kb_line)
def __enter__(self):
self.local_file = TemporaryFile(prefix='inspire')
return self
def __exit__(self, *exc):
return self._close()
def _close(self):
try:
self.local_file.seek(0)
with fsopen(self.kb_path, mode='wb') as kb_file:
copy_file(self.local_file, kb_file)
finally:
self.local_file.close()
@classmethod
def _get_kb_line(cls, raw_title, kb_key):
encoded_title = None
encode = codecs.getencoder(encoding='utf-8')
normalized_title = cls._normalize(raw_title)
if normalized_title:
encoded_title, _ = encode(
u'{}---{}\n'.format(
normalized_title,
kb_key,
)
)
return encoded_title
@staticmethod
def _normalize(s):
if not s:
return
result = RE_ALPHANUMERIC.sub(' ', s)
result = ' '.join(result.split())
result = result.upper()
if not result:
return
return result