# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# pylint: disable=no-member
"""Django Group entity"""
from collections.abc import Iterable, Iterator, Sized
# pylint: disable=no-name-in-module,import-error
from django.db import transaction
from django.db.models import Q
from aiida.backends.djsite.db import models
from aiida.common.lang import type_check
from aiida.orm.implementation.groups import BackendGroup, BackendGroupCollection
from . import entities
from . import users
from . import utils
__all__ = ('DjangoGroup', 'DjangoGroupCollection')
[docs]class DjangoGroup(entities.DjangoModelEntity[models.DbGroup], BackendGroup): # pylint: disable=abstract-method
"""The Django group object"""
MODEL_CLASS = models.DbGroup
[docs] def __init__(self, backend, label, user, description='', type_string=''):
"""Construct a new Django group"""
type_check(user, users.DjangoUser)
super().__init__(backend)
self._dbmodel = utils.ModelWrapper(
models.DbGroup(label=label, description=description, user=user.dbmodel, type_string=type_string)
)
@property
def label(self):
return self._dbmodel.label
@label.setter
def label(self, label):
"""
Attempt to change the label of the group instance. If the group is already stored
and the another group of the same type already exists with the desired label, a
UniquenessError will be raised
:param label : the new group label
:raises aiida.common.UniquenessError: if another group of same type and label already exists
"""
self._dbmodel.label = label
@property
def description(self):
return self._dbmodel.description
@description.setter
def description(self, value):
self._dbmodel.description = value
@property
def type_string(self):
return self._dbmodel.type_string
@property
def user(self):
return self._backend.users.from_dbmodel(self._dbmodel.user)
@user.setter
def user(self, new_user):
type_check(new_user, users.DjangoUser)
assert new_user.backend == self.backend, 'User from a different backend'
self._dbmodel.user = new_user.dbmodel
@property
def uuid(self):
return str(self._dbmodel.uuid)
[docs] def __int__(self):
if not self.is_stored:
return None
return self._dbnode.pk
[docs] def store(self):
if not self.is_stored:
with transaction.atomic():
if self.user is not None and not self.user.is_stored:
self.user.store()
# We now have to reset the model's user entry because
# django will have assigned the user an ID but this
# is not automatically propagated to us
self._dbmodel.user = self.user.dbmodel
self._dbmodel.save()
# To allow to do directly g = Group(...).store()
return self
[docs] def count(self):
"""Return the number of entities in this group.
:return: integer number of entities contained within the group
"""
return self._dbmodel.dbnodes.count()
[docs] def clear(self):
"""Remove all the nodes from this group."""
self._dbmodel.dbnodes.clear()
@property
def nodes(self):
"""Get an iterator to the nodes in the group"""
class NodesIterator(Iterator, Sized):
"""The nodes iterator"""
def __init__(self, dbnodes, backend):
super().__init__()
self._backend = backend
self._dbnodes = dbnodes
self.generator = self._genfunction()
def _genfunction(self):
# Best to use dbnodes.iterator() so we load entities from the database as we need them
# see: http://blog.etianen.com/blog/2013/06/08/django-querysets/
for node in self._dbnodes.iterator():
yield self._backend.get_backend_entity(node)
def __iter__(self):
return self
def __len__(self):
return len(self._dbnodes)
def __getitem__(self, value):
if isinstance(value, slice):
return [self._backend.get_backend_entity(n) for n in self._dbnodes[value]]
return self._backend.get_backend_entity(self._dbnodes[value])
def __next__(self):
return next(self.generator)
return NodesIterator(self._dbmodel.dbnodes.all(), self._backend)
[docs] def add_nodes(self, nodes, **kwargs):
from .nodes import DjangoNode
super().add_nodes(nodes)
node_pks = []
for node in nodes:
if not isinstance(node, DjangoNode):
raise TypeError('invalid type {}, has to be {}'.format(type(node), DjangoNode))
if not node.is_stored:
raise ValueError('At least one of the provided nodes is unstored, stopping...')
node_pks.append(node.pk)
self._dbmodel.dbnodes.add(*node_pks)
[docs] def remove_nodes(self, nodes):
from .nodes import DjangoNode
super().remove_nodes(nodes)
node_pks = []
for node in nodes:
if not isinstance(node, DjangoNode):
raise TypeError('invalid type {}, has to be {}'.format(type(node), DjangoNode))
if not node.is_stored:
raise ValueError('At least one of the provided nodes is unstored, stopping...')
node_pks.append(node.pk)
self._dbmodel.dbnodes.remove(*node_pks)
[docs]class DjangoGroupCollection(BackendGroupCollection):
"""The Django Group collection"""
ENTITY_CLASS = DjangoGroup
[docs] def query(
self,
label=None,
type_string=None,
pk=None,
uuid=None,
nodes=None,
user=None,
node_attributes=None,
past_days=None,
label_filters=None,
**kwargs
): # pylint: disable=too-many-arguments
# pylint: disable=too-many-branches,too-many-locals
from .nodes import DjangoNode
# Analyze args and kwargs to create the query
queryobject = Q()
if label is not None:
queryobject &= Q(label=label)
if type_string is not None:
queryobject &= Q(type_string=type_string)
if pk is not None:
queryobject &= Q(pk=pk)
if uuid is not None:
queryobject &= Q(uuid=uuid)
if past_days is not None:
queryobject &= Q(time__gte=past_days)
if nodes is not None:
pk_list = []
if not isinstance(nodes, Iterable):
nodes = [nodes]
for node in nodes:
if not isinstance(node, (DjangoNode, models.DbNode)):
raise TypeError(
'At least one of the elements passed as '
'nodes for the query on Group is neither '
'a Node nor a DbNode'
)
pk_list.append(node.pk)
queryobject &= Q(dbnodes__in=pk_list)
if user is not None:
if isinstance(user, str):
queryobject &= Q(user__email=user)
else:
queryobject &= Q(user=user.id)
if label_filters is not None:
label_filters_list = {'name__' + key: value for (key, value) in label_filters.items() if value}
queryobject &= Q(**label_filters_list)
groups_pk = set(models.DbGroup.objects.filter(queryobject, **kwargs).values_list('pk', flat=True))
if node_attributes is not None:
for k, vlist in node_attributes.items():
if isinstance(vlist, str) or not isinstance(vlist, Iterable):
vlist = [vlist]
for value in vlist:
# This will be a dictionary of the type
# {'datatype': 'txt', 'tval': 'xxx') for instance, if
# the passed data is a string
base_query_dict = models.DbAttribute.get_query_dict(value)
# prepend to the key the right django string to SQL-join
# on the right table
query_dict = {'dbnodes__dbattributes__{}'.format(k2): v2 for k2, v2 in base_query_dict.items()}
# I narrow down the list of groups.
# I had to do it in this way, with multiple DB hits and
# not a single, complicated query because in SQLite
# there is a maximum of 64 tables in a join.
# Since typically one requires a small number of filters,
# this should be ok.
groups_pk = groups_pk.intersection(
models.DbGroup.objects.filter(pk__in=groups_pk, dbnodes__dbattributes__key=k,
**query_dict).values_list('pk', flat=True)
)
retlist = []
# Return sorted by pk
for dbgroup in sorted(groups_pk):
retlist.append(DjangoGroup.from_dbmodel(models.DbGroup.objects.get(id=dbgroup), self._backend))
return retlist
[docs] def delete(self, id): # pylint: disable=redefined-builtin
models.DbGroup.objects.filter(id=id).delete()