Source code for aiida.cmdline.utils.common

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""Common utility functions for command line commands."""
# pylint: disable=import-error

import os
import sys

import click
from tabulate import tabulate


[docs]def get_env_with_venv_bin(): """Create a clone of the current running environment with the AIIDA_PATH variable set directory of the config.""" from aiida.manage.configuration import get_config config = get_config() currenv = os.environ.copy() currenv['PATH'] = os.path.dirname(sys.executable) + ':' + currenv['PATH'] currenv['AIIDA_PATH'] = config.dirpath currenv['PYTHONUNBUFFERED'] = 'True' return currenv
[docs]def format_local_time(timestamp, format_str='%Y-%m-%d %H:%M:%S'): """ Format a datetime object or UNIX timestamp in a human readable format :param timestamp: a datetime object or a float representing a UNIX timestamp :param format_str: optional string format to pass to strftime """ from aiida.common import timezone if isinstance(timestamp, float): return timezone.datetime.fromtimestamp(timestamp).strftime(format_str) return timestamp.strftime(format_str)
[docs]def get_node_summary(node): """Return a multi line string with a pretty formatted summary of a Node. :param node: a Node instance :return: a string summary of the node """ from plumpy import ProcessState from aiida.orm import ProcessNode table_headers = ['Property', 'Value'] table = [] if isinstance(node, ProcessNode): table.append(['type', node.process_label]) try: process_state = ProcessState(node.process_state) except (AttributeError, ValueError): pass else: process_state_string = process_state.value.capitalize() if process_state == ProcessState.FINISHED and node.exit_message: table.append(['state', '{} [{}] {}'.format(process_state_string, node.exit_status, node.exit_message)]) elif process_state == ProcessState.FINISHED: table.append(['state', '{} [{}]'.format(process_state_string, node.exit_status)]) elif process_state == ProcessState.EXCEPTED: table.append(['state', '{} <{}>'.format(process_state_string, node.exception)]) else: table.append(['state', process_state_string]) else: table.append(['type', node.__class__.__name__]) table.append(['pk', str(node.pk)]) table.append(['uuid', str(node.uuid)]) table.append(['label', node.label]) table.append(['description', node.description]) table.append(['ctime', node.ctime]) table.append(['mtime', node.mtime]) try: computer = node.computer except AttributeError: pass else: if computer is not None: table.append(['computer', '[{}] {}'.format(node.computer.pk, node.computer.name)]) return tabulate(table, headers=table_headers)
[docs]def get_node_info(node, include_summary=True): """Return a multi line string of information about the given node, such as the incoming and outcoming links. :param include_summary: boolean, if True, also include a summary of node properties :return: a string summary of the node including a description of all its links and log messages """ from aiida.common.links import LinkType from aiida import orm if include_summary: result = get_node_summary(node) else: result = '' nodes_caller = node.get_incoming(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)) nodes_called = node.get_outgoing(link_type=(LinkType.CALL_CALC, LinkType.CALL_WORK)) nodes_input = node.get_incoming(link_type=(LinkType.INPUT_CALC, LinkType.INPUT_WORK)) nodes_output = node.get_outgoing(link_type=(LinkType.CREATE, LinkType.RETURN)) if nodes_caller: result += '\n' + format_nested_links(nodes_caller.nested(), headers=['Called by', 'PK', 'Type']) if nodes_input: result += '\n' + format_nested_links(nodes_input.nested(), headers=['Inputs', 'PK', 'Type']) if nodes_output: result += '\n' + format_nested_links(nodes_output.nested(), headers=['Outputs', 'PK', 'Type']) if nodes_called: result += '\n' + format_flat_links(nodes_called.all(), headers=['Called', 'PK', 'Type']) log_messages = orm.Log.objects.get_logs_for(node) if log_messages: table = [] table_headers = ['Log messages'] table.append(['There are {} log messages for this calculation'.format(len(log_messages))]) table.append(["Run 'verdi process report {}' to see them".format(node.pk)]) result += '\n\n{}'.format(tabulate(table, headers=table_headers)) return result
[docs]def get_calcjob_report(calcjob): """ Return a multi line string representation of the log messages and output of a given calcjob :param calcjob: the calcjob node :return: a string representation of the log messages and scheduler output """ from aiida import orm from aiida.common.datastructures import CalcJobState log_messages = orm.Log.objects.get_logs_for(calcjob) scheduler_out = calcjob.get_scheduler_stdout() scheduler_err = calcjob.get_scheduler_stderr() calcjob_state = calcjob.get_state() scheduler_state = calcjob.get_scheduler_state() report = [] if calcjob_state == CalcJobState.WITHSCHEDULER: state_string = '{}, scheduler state: {}'.format( calcjob_state, scheduler_state if scheduler_state else '(unknown)' ) else: state_string = '{}'.format(calcjob_state) label_string = ' [{}]'.format(calcjob.label) if calcjob.label else '' report.append('*** {}{}: {}'.format(calcjob.pk, label_string, state_string)) if scheduler_out is None: report.append('*** Scheduler output: N/A') elif scheduler_out: report.append('*** Scheduler output:\n{}'.format(scheduler_out)) else: report.append('*** (empty scheduler output file)') if scheduler_err is None: report.append('*** Scheduler errors: N/A') elif scheduler_err: report.append('*** Scheduler errors:\n{}'.format(scheduler_err)) else: report.append('*** (empty scheduler errors file)') if log_messages: report.append('*** {} LOG MESSAGES:'.format(len(log_messages))) else: report.append('*** 0 LOG MESSAGES') for log in log_messages: report.append('+-> {} at {}'.format(log.levelname, log.time)) for message in log.message.splitlines(): report.append(' | {}'.format(message)) return '\n'.join(report)
[docs]def get_process_function_report(node): """ Return a multi line string representation of the log messages and output of a given process function node :param node: the node :return: a string representation of the log messages """ from aiida import orm report = [] for log in orm.Log.objects.get_logs_for(node): report.append('{time:%Y-%m-%d %H:%M:%S} [{id}]: {msg}'.format(id=log.id, msg=log.message, time=log.time)) return '\n'.join(report)
[docs]def get_workchain_report(node, levelname, indent_size=4, max_depth=None): """ Return a multi line string representation of the log messages and output of a given workchain :param node: the workchain node :return: a nested string representation of the log messages """ # pylint: disable=too-many-locals import itertools from aiida import orm from aiida.common.log import LOG_LEVELS def get_report_messages(uuid, depth, levelname): """Return list of log messages with given levelname and their depth for a node with a given uuid.""" node_id = orm.load_node(uuid).id filters = {'dbnode_id': node_id} entries = orm.Log.objects.find(filters) entries = [entry for entry in entries if LOG_LEVELS[entry.levelname] >= LOG_LEVELS[levelname]] return [(_, depth) for _ in entries] def get_subtree(uuid, level=0): """ Get a nested tree of work calculation nodes and their nesting level starting from this uuid. The result is a list of uuid of these nodes. """ builder = orm.QueryBuilder() builder.append(cls=orm.WorkChainNode, filters={'uuid': uuid}, tag='workcalculation') builder.append( cls=orm.WorkChainNode, project=['uuid'], # In the future, we should specify here the type of link # for now, CALL links are the only ones allowing calc-calc # (we here really want instead to follow CALL links) with_incoming='workcalculation', tag='subworkchains' ) result = list(itertools.chain(*builder.distinct().all())) # This will return a single flat list of tuples, where the first element # corresponds to the WorkChain pk and the second element is an integer # that represents its level of nesting within the chain return [(uuid, level)] + list(itertools.chain(*[get_subtree(subuuid, level=level + 1) for subuuid in result])) workchain_tree = get_subtree(node.uuid) if max_depth: report_list = [ get_report_messages(uuid, depth, levelname) for uuid, depth in workchain_tree if depth < max_depth ] else: report_list = [get_report_messages(uuid, depth, levelname) for uuid, depth in workchain_tree] reports = list(itertools.chain(*report_list)) reports.sort(key=lambda r: r[0].time) if not reports: return 'No log messages recorded for this entry' log_ids = [entry[0].id for entry in reports] levelnames = [len(entry[0].levelname) for entry in reports] width_id = len(str(max(log_ids))) width_levelname = max(levelnames) report = [] for entry, depth in reports: line = '{time:%Y-%m-%d %H:%M:%S} [{id:<{width_id}} | {levelname:>{width_levelname}}]:{indent} {message}'.format( id=entry.id, levelname=entry.levelname, message=entry.message, time=entry.time, width_id=width_id, width_levelname=width_levelname, indent=' ' * (depth * indent_size) ) report.append(line) return '\n'.join(report)
[docs]def get_num_workers(): #pylint: disable=inconsistent-return-statements """ Get the number of active daemon workers from the circus client """ from aiida.common.exceptions import CircusCallError from aiida.manage.manager import get_manager manager = get_manager() client = manager.get_daemon_client() if client.is_daemon_running: response = client.get_numprocesses() if response['status'] != 'ok': if response['status'] == client.DAEMON_ERROR_TIMEOUT: raise CircusCallError('verdi thought the daemon was alive, but the call to the daemon timed-out') elif response['status'] == client.DAEMON_ERROR_NOT_RUNNING: raise CircusCallError('verdi thought the daemon was running, but really it is not') else: raise CircusCallError try: return response['numprocesses'] except KeyError: raise CircusCallError('Circus did not return the number of daemon processes')
[docs]def check_worker_load(active_slots): """ Check if the percentage usage of the daemon worker slots exceeds a threshold. If it does, print a warning. The purpose of this check is to warn the user if they are close to running out of worker slots which could lead to their processes becoming stuck indefinitely. :param active_slots: the number of currently active worker slots """ from aiida.cmdline.utils import echo from aiida.common.exceptions import CircusCallError from aiida.manage.configuration import get_config warning_threshold = 0.9 # 90% config = get_config() slots_per_worker = config.get_option('daemon.worker_process_slots', config.current_profile.name) try: active_workers = get_num_workers() except CircusCallError: echo.echo_critical('Could not contact Circus to get the number of active workers') if active_workers is not None: available_slots = active_workers * slots_per_worker percent_load = (active_slots / available_slots) if percent_load > warning_threshold: echo.echo('') # New line echo.echo_warning('{:.0f}% of the available daemon worker slots have been used!'.format(percent_load * 100)) echo.echo_warning("Increase the number of workers with 'verdi daemon incr'.\n")