# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
""" Utility functions for import/export of AiiDA entities """
# pylint: disable=inconsistent-return-statements,too-many-branches,too-many-return-statements
# pylint: disable=too-many-nested-blocks,too-many-locals
import urllib.request
import urllib.parse
from html.parser import HTMLParser
from aiida.tools.importexport.common.config import (
NODE_ENTITY_NAME, GROUP_ENTITY_NAME, COMPUTER_ENTITY_NAME, USER_ENTITY_NAME, LOG_ENTITY_NAME, COMMENT_ENTITY_NAME
)
[docs]def schema_to_entity_names(class_string):
"""
Mapping from classes path to entity names (used by the SQLA import/export)
This could have been written much simpler if it is only for SQLA but there
is an attempt the SQLA import/export code to be used for Django too.
"""
if class_string is None:
return None
if class_string in ('aiida.backends.djsite.db.models.DbNode', 'aiida.backends.sqlalchemy.models.node.DbNode'):
return NODE_ENTITY_NAME
if class_string in ('aiida.backends.djsite.db.models.DbGroup', 'aiida.backends.sqlalchemy.models.group.DbGroup'):
return GROUP_ENTITY_NAME
if class_string in (
'aiida.backends.djsite.db.models.DbComputer', 'aiida.backends.sqlalchemy.models.computer.DbComputer'
):
return COMPUTER_ENTITY_NAME
if class_string in ('aiida.backends.djsite.db.models.DbUser', 'aiida.backends.sqlalchemy.models.user.DbUser'):
return USER_ENTITY_NAME
if class_string in ('aiida.backends.djsite.db.models.DbLog', 'aiida.backends.sqlalchemy.models.log.DbLog'):
return LOG_ENTITY_NAME
if class_string in (
'aiida.backends.djsite.db.models.DbComment', 'aiida.backends.sqlalchemy.models.comment.DbComment'
):
return COMMENT_ENTITY_NAME
[docs]class HTMLGetLinksParser(HTMLParser):
"""
If a filter_extension is passed, only links with extension matching
the given one will be returned.
"""
# pylint: disable=abstract-method
[docs] def __init__(self, filter_extension=None): # pylint: disable=super-on-old-class
self.filter_extension = filter_extension
self.links = []
super().__init__()
[docs] def handle_starttag(self, tag, attrs):
"""
Store the urls encountered, if they match the request.
"""
if tag == 'a':
for key, value in attrs:
if key == 'href':
if (self.filter_extension is None or value.endswith('.{}'.format(self.filter_extension))):
self.links.append(value)
[docs] def get_links(self):
"""
Return the links that were found during the parsing phase.
"""
return self.links
[docs]def get_valid_import_links(url):
"""
Open the given URL, parse the HTML and return a list of valid links where
the link file has a .aiida extension.
"""
request = urllib.request.urlopen(url)
parser = HTMLGetLinksParser(filter_extension='aiida')
parser.feed(request.read().decode('utf8'))
return_urls = []
for link in parser.get_links():
return_urls.append(urllib.parse.urljoin(request.geturl(), link))
return return_urls
[docs]def export_shard_uuid(uuid):
"""Sharding of the UUID for the import/export
:param uuid: UUID to be sharded (v4)
:type uuid: str
:return: Sharded UUID as a subfolder path
:rtype: str
"""
import os
return os.path.join(uuid[:2], uuid[2:4], uuid[4:])