Newer
Older
pydwiki / accounts / models / custom_group.py
from django.contrib.auth.models import Group
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.translation import gettext_lazy as _

from log_manager.trace_log import trace_log

from .integer_choices_field import IntegerChoicesField


class CustomGroupQuerySet(models.QuerySet):
    """カスタムグループ用マネージャ。"""

    @trace_log
    def create(self, **kwargs):
        """CustomGroup を生成します。
        CustomGroup.objects.create(name="New Group") を呼び出すと、自動的に Group も作成されます。

        Arguments
            kwargs: パラメータ

        Return:
            生成したオブジェクト
        """
        if "group" not in kwargs:
            # group 指定がない場合、name より、Group を作成する。
            name = kwargs.pop("name", None)
            if name:
                group = Group.objects.create(name=name)
                kwargs["group"] = group
            else:
                raise ValueError(_("The 'name' parameter is required to create a Group."))

        # group_type と parent は kwargs から取得(指定がなければデフォルト値)
        group_type = kwargs.pop("group_type", self.model.GroupType.OTHER)
        kwargs["group_type"] = group_type

        instance = self.model(**kwargs)
        instance.full_clean()
        instance.save(force_insert=True)
        return instance

    def get(self, **kwargs):
        lookup_kwargs = kwargs.copy()
        if "name" in lookup_kwargs:
            lookup_kwargs["group__name"] = lookup_kwargs.pop("name")
        return super().get(**lookup_kwargs)

    @trace_log
    def update(self, **kwargs):
        """CustomGroup を更新します。
        name キーが指定されている場合は、Group.name も更新します。
        ※ bulk update であるため、シグナル等は発火しません。
        """
        if "name" in kwargs:
            new_name = kwargs.pop("name")
            for obj in self:
                obj.group.name = new_name
                obj.group.save()
        return super().update(**kwargs)

    @trace_log
    def get_or_create(self, defaults=None, **kwargs):
        """指定された、CustomGroup を取得します。
        取得できない場合は、新しく作成します。
        """
        defaults = defaults or {}
        lookup_kwargs = kwargs.copy()

        if "name" in lookup_kwargs:
            lookup_kwargs["group__name"] = lookup_kwargs.pop("name")

        try:
            obj = self.get(**lookup_kwargs)
            return obj, False
        except self.model.DoesNotExist:
            if "name" not in kwargs and "group__name" in lookup_kwargs:
                kwargs["name"] = lookup_kwargs["group__name"]
                del kwargs["group__name"]
            return self.create(**{**kwargs, **defaults}), True

    @trace_log
    def update_or_create(self, defaults=None, **kwargs):
        """指定された、CustomGroup を更新します。"""
        defaults = defaults or {}
        obj, created = self.get_or_create(defaults=defaults, **kwargs)
        if not created:
            if "name" in defaults:
                new_name = defaults.pop("name")
                obj.group.name = new_name
                obj.group.save()
            # 残りの defaults で obj を更新する。
            for attr, value in defaults.items():
                setattr(obj, attr, value)
            obj.full_clean()
            obj.save()
        return obj, created

    @trace_log
    def filter(self, *args, **kwargs):
        """指定された、CustomGroup を取得します。"""
        if "name" in kwargs:
            kwargs["group__name"] = kwargs.pop("name")
        return super().filter(*args, **kwargs)

    @trace_log
    def delete(self):
        """CustomGroup を削除します。
        QuerySet.delet() は、bulk 操作で delete() が呼ばれないため、
        個々のインスタンスの delete を呼び出すことで、関連 Group の削除も実行します。
        """
        for obj in self:
            obj.delete()

        return super().delete()


class CustomGroupManager(models.Manager):
    @trace_log
    def get_queryset(self):
        return CustomGroupQuerySet(self.model, using=self._db)

    @trace_log
    def create(self, **kwargs):
        return self.get_queryset().create(**kwargs)

    @trace_log
    def update(self, **kwargs):
        """CusgomGroup オブジェクトを更新します。
        name が指定されている場合は、Group.name も更新します。
        一括更新を実行するため、シグナルはトリガーされません。

        Arguments:
            kwargs: 更新用パラメータ

        Return:
            更新後オブジェクト数
        """
        return self.get_queryset().update(**kwargs)

    @trace_log
    def update_or_create(self, defaults=None, **kwargs):
        return self.get_queryset().update_or_create(defaults=defaults, **kwargs)

    @trace_log
    def get_or_create(self, defaults=None, **kwargs):
        return self.get_queryset().get_or_create(defaults=defaults, **kwargs)

    @trace_log
    def filter(self, *args, **kwargs):
        return self.get_queryset().filter(*args, **kwargs)

    @trace_log
    def delete(self):
        return self.get_queryset().delete()


class CustomGroup(models.Model):

    objects = CustomGroupManager()
    """ カスタムグループ用マネージャ。 """

    class GroupType(models.IntegerChoices):
        """グループ種別。"""

        ROOT = 0, _("Root")
        """ ルート。 """

        ORGANIZATION = 1, _("Organization")
        """ 組織。 """

        DEPARTMENT = 2, _("Department")
        """ 部門。 """

        TEAM = 3, _("Team")
        """ チーム。 """

        OTHER = 99, _("Other")
        """ その他。 """

    group = models.OneToOneField(
        Group, on_delete=models.CASCADE, related_name="info", verbose_name="Group"
    )
    """ グループ """

    group_type = IntegerChoicesField(
        # group_type = models.IntegerField(
        choices=GroupType.choices,
        default=GroupType.OTHER,
    )
    """ グループ種別 """

    parent = models.ForeignKey(
        "self",
        blank=True,
        null=True,
        on_delete=models.CASCADE,
        related_name="children",
        verbose_name="Parent",
    )
    """ 親グループ """

    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        verbose_name = _("Group")
        """ 本モデルの名称。 """

        verbose_name_plural = _("Groups")
        """ 本モデルの複数形の名称 """

    @trace_log
    def clean(self):
        """parent が自分自身を参照していないか確認する。"""
        if self.parent:
            self._check_parent_loop(self.parent)

    def save(self, *args, **kwargs):
        """save の前に、clean によるチェックを実施する。"""
        self.full_clean()
        return super().save(*args, **kwargs)

    def _check_parent_loop(self, parent):
        """親がさらに上の親としてグループを参照していないかチェックする。"""
        if parent == self:
            raise ValidationError(_("A group cannot be its own parent."))
        if parent.parent:
            self._check_parent_loop(parent.parent)

    def __str__(self) -> str:
        """
        文字列表現を返します。

        Return:
            本モデルの文字列表現
        """
        if self.parent:
            return f"{str(self.parent)}/{self.group.name}"
        return self.group.name