Source code for aiida_jutools.util_group

# -*- coding: utf-8 -*-
###############################################################################
# Copyright (c), Forschungszentrum Jülich GmbH, IAS-1/PGI-1, Germany.         #
#                All rights reserved.                                         #
# This file is part of the aiida-jutools package.                             #
# (AiiDA JuDFT tools)                                                         #
#                                                                             #
# The code is hosted on GitHub at https://github.com/judftteam/aiida-jutools. #
# For further information on the license, see the LICENSE.txt file.           #
# For further information please visit http://judft.de/.                      #
#                                                                             #
###############################################################################
"""Tools for working with aiida Group entities."""
import logging
import sys
import typing
from datetime import datetime

import aiida.orm
import aiida.tools.groups
import pytz

# _all__: visible namespace of the module on import. prevents namespace pollution.
# reference: https://stackoverflow.com/a/7424390/8116031.
__all__ = ("verdi_group_list", "move_nodes", "get_nodes", "group_new_nodes", "delete_groups",
           "delete_groups_with_nodes")


class GroupsFromDict:
    TEMPLATE = {
        "INSERT_IN_ALL": {
            "TO_DEPTH": sys.maxsize,
            "INSERT": {
                "extras": {
                    "version": "",
                    "generating_code_urls": {},
                    "comment": [""],
                },
            }
        },  # group
        "TEMPLATE": {
            "description": "",
            "SUBGROUPS": {
            }  # subgroups
        }  # group
    }
    _ignore_labels = ["TEMPLATE", "INSERT_IN_ALL"]
    _insert_label = "INSERT_IN_ALL"

    def get_template_dict(self, with_example_group: bool = True, print_dict: bool = True, indent: int = 4) -> dict:
        """Print a valid example dict with nested groups as input for load_or_create().

        :param with_example_group: add a valid example group structure to template dict
        :param print_dict: pretty print dict as well as returning it
        :param indent: indent for the printed dict
        :return: template dict
        """
        import copy
        import json
        template = copy.deepcopy(GroupsFromDict.TEMPLATE)
        if with_example_group:
            template["my_base_group1"] = {
                "description": "Short description of this group.",
                "SUBGROUPS": {
                    "my_subgroupA": {
                        "description": "Short description of this group.",

                    },  # subgroup
                    "my_subgroupB": {
                        "description": "Short description of this group.",
                    }  # subgroup
                }  # base subgroups
            }  # base group
        if print_dict:
            print(json.dumps(template, indent=indent))
        return template

    def load_or_create(self, group_labeling: dict, overwrite_extras: bool = True) -> list:
        """Given a dict describing a group structure, create or load these groups.

        If group(s), exist, will just be loaded. But extras will be modified according to dict.

        :param group_labeling: group structure. See GroupFromDict.TEMPLATE for valid template.
        :param overwrite_extras: replace if True, add if False
        :type group_labeling: dict
        :return: list of created or loaded groups
        """
        # TODO validate dict structure against template dict
        self._to_insert = group_labeling[GroupsFromDict._insert_label]["INSERT"]
        self._insert_to_depth = group_labeling[GroupsFromDict._insert_label]["TO_DEPTH"]
        self._overwrite_extras = overwrite_extras

        depth = 0
        group_path_str = ""
        dict_of_groups = group_labeling
        groups = []
        self._create_or_load(depth, group_path_str, group_labeling, groups)
        return groups

    def _create_or_load(self, depth: int, group_path_str: str, dict_of_groups: dict, groups: list):
        """Recursively creates groups from possibly nested dict according to GroupFromDict.TEMPLATE.
        """
        base_path = group_path_str
        for group_label, attrs in dict_of_groups.items():
            if group_label in GroupsFromDict._ignore_labels:
                continue
            group_path_str = base_path + group_label
            group_path = aiida.tools.groups.GroupPath(group_path_str)
            group, created = group_path.get_or_create_group()
            group.description = attrs["description"]
            if "extras" in self._to_insert and depth <= self._insert_to_depth:
                if self._overwrite_extras:
                    group.set_extra_many(self._to_insert["extras"])
                else:
                    for k, v in self._to_insert["extras"].items():
                        group.set_extra(k, v)
            # let override by local extras
            if "extras" in attrs:
                if self._overwrite_extras:
                    group.set_extra_many(attrs["extras"])
                else:
                    for k, v in attrs["extras"].items():
                        group.set_extra(k, v)
            if "SUBGROUPS" in attrs:
                self._create_or_load(depth + 1, group_path_str + "/", attrs["SUBGROUPS"], groups)
            groups.append(group)


[docs]def verdi_group_list(projection: typing.List[str] = ['label', 'id', 'type_string'], with_header: bool = True, label_filter: str = None) -> list: """Equivalent to CLI "verdi group list -a" (minus user mail address). :param projection: query projection :param with_header: True: first list in return argument is the projection argument :param label_filter: optional: only include groups with this substring in their label :return: list of lists, one entry per projection value, for each group """ qb = aiida.orm.QueryBuilder() group_list = qb.append(aiida.orm.Group, project=projection).all() if 'label' in projection and label_filter: index_of_label = projection.index('label') group_list = [item for item in group_list if label_filter in item[index_of_label]] group_list.sort(key=lambda item: item[0].lower()) if with_header: group_list.insert(0, projection) if len(projection) == 1: group_list = [singlelist[0] for singlelist in group_list] return group_list
def get_subgroups(group: aiida.orm.Group) -> typing.List[aiida.orm.Group]: """Get all subgroups of a group. In accordance with aiida GroupPath, the group with label "foo/bar" is a valid subgroup of a group with label "foo". :param group: a group with possible subgroups :return: subgroups """ from aiida.orm import Group group_labels = [group.label for group in Group.objects.all()] subgroup_labels = [label for label in group_labels if label.startswith(group.label) and len(label) > len(group.label)] subgroups = [Group.get(label=label) for label in subgroup_labels] return subgroups
[docs]def move_nodes(origin: aiida.orm.Group, destination: aiida.orm.Group): """Move all nodes from one group to another, possibly sub/supergroup. :param origin: origin group :param destination: destination group Note: if the new group does not exit yet, prefer relabling the group with group.label = new_label. """ destination.add_nodes(list(origin.nodes)) origin.remove_nodes(list(origin.nodes))
[docs]def get_nodes(group_label: str): """Get all nodes from given group (or subgroup) by label (path). Deprecated: just use group.nodes, or list(group.nodes). :param group_label: e.g. for a subgroup, "groupA/subgroupB/subgroupC". :return: nodes as generator for efficient iteration (convert via list() to list) """ group = aiida.orm.Group.get(label=group_label) return group.nodes
[docs]def group_new_nodes(new_group_label: str, blacklist: typing.List[aiida.orm.Node] = [aiida.orm.Code, aiida.orm.Computer], right_date: datetime = None, left_date: datetime = None): """Groups new nodes with ctime in timerange (left_date,right_date] into new group If you're working on one project at a time, everytime you finish a project you can use this function to group your nodes. I.e. this is a utility function for a time-linear sequential grouping strategy. Letting the function find the appropriate time range is the standard / recommended usage. If the group already exists and the intended nodes are already added, repeated calls will change nothing. :param new_group_label: label of new group/subgroup :param blacklist: nodes in timerange to exclude from grouping. Normally Code, Computer. :param right_date: if not given (usually as datetime.now()), will take right_date = newest ctime, > left_date, of any node, ungrouped nodes included. :param left_date: if not given, will take left_date=newest ctime of any grouped node :return: the new populated, stored, group, or None if no new nodes found :rtype: Group or None """ timezone = pytz.UTC ## step1: find d7=infdate from all *grouped* nodes # new group to create or add new_path = aiida.tools.groups.GroupPath(path=new_group_label) new_group = new_path.get_or_create_group()[0] # get all groups, exclude new group if present group_labels = verdi_group_list(projection=['label'], with_header=False) try: group_labels.remove(new_group_label) except ValueError: pass groups = [aiida.orm.Group.get(label=label) for label in group_labels] # find tuple (group,node) with largest node.ctime across all groups left_date_computed = datetime(year=1, month=1, day=1, tzinfo=pytz.UTC) for group in groups: for node in group.nodes: left_date_computed = max(left_date_computed, node.ctime) if left_date is not None: left_date = timezone.localize(left_date) if left_date < left_date_computed: print( f"WARNING: left_date {left_date} < computed left date from groups {left_date_computed}, grouping overlap likely.") elif left_date > left_date_computed: print( f"WARNING: left_date {left_date} > computed left date from groups {left_date_computed}, leftover ungrouped nodes likely.") else: left_date = left_date_computed ## step2: find d8=maxdate from all nodes newer than d7 qb = aiida.orm.QueryBuilder() new_nodes = qb.append(aiida.orm.Node, filters={'ctime': {'>': left_date}}).all(flat=True) if not new_nodes: print(f"Info: found no nodes newer than last grouped at date {left_date}. " f"Attempting to delete group '{new_group_label}' if empty.") delete_groups([new_group_label]) return None else: right_date_computed = max([node.ctime for node in new_nodes]) if right_date is not None: right_date = timezone.localize(right_date) if right_date < right_date_computed: print( f"WARNING: right_date {right_date} < computed right date from groups {right_date_computed}, leftover ungrouped nodes likely.") else: right_date = right_date_computed ## step3: query all nodes in daterange (d7,d8], optional: exclude blacklist daterange_filter = { 'and': [ {'ctime': {'>': left_date}}, # newer than {'ctime': {'<=': right_date}} # older than ] } qb = aiida.orm.QueryBuilder() new_nodes = qb.append(aiida.orm.Node, filters=daterange_filter).distinct().all(flat=True) drops = [] for i, node in enumerate(new_nodes): for blacktype in blacklist: if isinstance(node, blacktype): drops.append(i) for drop in drops: new_nodes.pop(drop) ## step4: add new nodes to new group new_group.add_nodes(new_nodes) new_group.store() return new_group
[docs]def delete_groups(group_labels: typing.List[str], skip_nonempty_groups: bool = True, silent: bool = False): """Delete group(s). Does not delete nodes in group(s). Use delete_groups_with_nodes() for that. :param group_labels: list of group labels :param skip_nonempty_groups: True: skip them. False: don't skip. Nodes get removed from group, not deleted. :param silent: True: do not print information. """ for label in group_labels: try: group = aiida.orm.Group.get(label=label) except aiida.common.exceptions.NotExistent: print(f"Warning: group to delete '{label}' does not exist.") else: if group.count() > 0 and skip_nonempty_groups: if not silent: print(f"Info: Skipping non-empty group<{label}>: contains {group.count()} nodes.") else: group.clear() # remove nodes from group aiida.orm.Group.objects.delete(group.pk) if not silent: print(f"Group '{label}' deleted.")
[docs]def delete_groups_with_nodes(group_labels: typing.List[str], dry_run: bool = True, verbosity: int = logging.INFO, leave_groups: bool = False): """Delete all nodes in each group (including repo files), then delete the groups themselves. :param group_labels: list of group labels :param dry_run: perform test run. if output looks good, set to false and repeat. :param verbosity: 20 = logging.INFO (show node count) (default), 10 = DEBUG (show all uuids), all other: silent. :param leave_groups: True: Leave empty groups as is after deleting all nodes in them. """ # get the node delete function used by 'verdi node delete -v -n/-f' # DEVNOTE: deprecation warning: aiida-core v1.6.0: replaced aiida.manage.database.delete.nodes.delete_nodes # aiida.tools.delete_nodes # (reference: https://github.com/aiidateam/aiida-core/blob/develop/CHANGELOG.md#v160---2021-03-15). # get full periodic table pandas dataframe from mendeleev # DEVNOTE: breaking change in mendeleev v0.7.0: replaced get_table with fetch.fetch_table. version = aiida.__version__ version_info = tuple([int(num) for num in version.split(".")]) is_aiida_v160_plus = version_info >= (1, 6, 0) if is_aiida_v160_plus: from aiida.tools import delete_nodes # deprecation warning: verbosity argument replaced with setting logger from aiida.common.log import AIIDA_LOGGER DELETE_LOGGER = AIIDA_LOGGER.getChild('delete') DELETE_LOGGER.setLevel(verbosity) else: from aiida.manage.database.delete.nodes import delete_nodes print("Deleting nodes in groups...") # get the groups of all stated group labels groups = [aiida.orm.Group.get(label=label) for label in group_labels] # get the pks of all nodes in all groups pks = [] for group in groups: pks += [node.pk for node in group.nodes] # delete nodes if is_aiida_v160_plus: delete_nodes(pks=pks, dry_run=dry_run) else: delete_nodes(pks=pks, dry_run=dry_run, force=True, verbosity=verbosity) # now delete groups print(f"Deleting groups: {not leave_groups}...") if leave_groups: if dry_run: print("Dry run: groups unchanged.") else: are_empty = {group.label: group.is_empty for group in groups} print(f"Groups are now empty: {are_empty}") else: if dry_run: print("Dry run: Skipping deleting groups.") else: delete_groups(group_labels=group_labels)
def get_nodes_by_group(group_label: str = None, node_type: aiida.orm.Node = aiida.orm.Node, return_query: bool = False, return_iter: bool = True, node_tag: str = ""): """Get all nodes from given group (or subgroup) by label (path). DEVNOTE: this is how it is done on the aiida cheatsheet, via query. but don't see why not simply use group.nodes instead. :param group_label: e.g. for a subgroup, "groupA/subgroupB/subgroupC". :param node_type: filter out nodes in group of this type :param return_query: True: return QueryBuilder object instead of result :param return_iter: True: return generator, False: return list :param node_tag: if return_query, can add tag to node_type for further query building. :return: nodes """ qb = aiida.orm.QueryBuilder() qb.append(aiida.orm.Group, filters={'label': group_label}, tag='group') qb.append(node_type, with_group='group', tag=node_tag, project='*') # DEVNOTE: equivalent: # qb.append(node_type, tag="nodes", project="*") # qb.append(aiida.orm.Group, with_node="nodes", filters={"label": group_label}) if return_query: return qb if return_iter: return qb.iterall() else: return qb.all(flat=True)