from django.contrib.auth.models import Group from django.core.exceptions import ValidationError from django.db import models from django.utils.translation import gettext_lazy as _ from log_manager.trace_log import trace_log from .integer_choices_field import IntegerChoicesField class CustomGroupQuerySet(models.QuerySet): """カスタムグループ用マネージャ。""" @trace_log def create(self, **kwargs): """CustomGroup を生成します。 CustomGroup.objects.create(name="New Group") を呼び出すと、自動的に Group も作成されます。 Arguments kwargs: パラメータ Return: 生成したオブジェクト """ if "group" not in kwargs: # group 指定がない場合、name より、Group を作成する。 name = kwargs.pop("name", None) if name: group = Group.objects.create(name=name) kwargs["group"] = group else: raise ValueError(_("The 'name' parameter is required to create a Group.")) # group_type と parent は kwargs から取得(指定がなければデフォルト値) group_type = kwargs.pop("group_type", self.model.GroupType.OTHER) kwargs["group_type"] = group_type instance = self.model(**kwargs) instance.full_clean() instance.save(force_insert=True) return instance def get(self, **kwargs): lookup_kwargs = kwargs.copy() if "name" in lookup_kwargs: lookup_kwargs["group__name"] = lookup_kwargs.pop("name") return super().get(**lookup_kwargs) @trace_log def update(self, **kwargs): """CustomGroup を更新します。 name キーが指定されている場合は、Group.name も更新します。 ※ bulk update であるため、シグナル等は発火しません。 """ if "name" in kwargs: new_name = kwargs.pop("name") for obj in self: obj.group.name = new_name obj.group.save() return super().update(**kwargs) @trace_log def get_or_create(self, defaults=None, **kwargs): """指定された、CustomGroup を取得します。 取得できない場合は、新しく作成します。 """ defaults = defaults or {} lookup_kwargs = kwargs.copy() if "name" in lookup_kwargs: lookup_kwargs["group__name"] = lookup_kwargs.pop("name") try: obj = self.get(**lookup_kwargs) return obj, False except self.model.DoesNotExist: if "name" not in kwargs and "group__name" in lookup_kwargs: kwargs["name"] = lookup_kwargs["group__name"] del kwargs["group__name"] return self.create(**{**kwargs, **defaults}), True @trace_log def update_or_create(self, defaults=None, **kwargs): """指定された、CustomGroup を更新します。""" defaults = defaults or {} obj, created = self.get_or_create(defaults=defaults, **kwargs) if not created: if "name" in defaults: new_name = defaults.pop("name") obj.group.name = new_name obj.group.save() # 残りの defaults で obj を更新する。 for attr, value in defaults.items(): setattr(obj, attr, value) obj.full_clean() obj.save() return obj, created @trace_log def filter(self, *args, **kwargs): """指定された、CustomGroup を取得します。""" if "name" in kwargs: kwargs["group__name"] = kwargs.pop("name") return super().filter(*args, **kwargs) @trace_log def delete(self): """CustomGroup を削除します。 QuerySet.delet() は、bulk 操作で delete() が呼ばれないため、 個々のインスタンスの delete を呼び出すことで、関連 Group の削除も実行します。 """ for obj in self: obj.delete() return super().delete() class CustomGroupManager(models.Manager): @trace_log def get_queryset(self): return CustomGroupQuerySet(self.model, using=self._db) @trace_log def create(self, **kwargs): return self.get_queryset().create(**kwargs) @trace_log def update(self, **kwargs): """CusgomGroup オブジェクトを更新します。 name が指定されている場合は、Group.name も更新します。 一括更新を実行するため、シグナルはトリガーされません。 Arguments: kwargs: 更新用パラメータ Return: 更新後オブジェクト数 """ return self.get_queryset().update(**kwargs) @trace_log def update_or_create(self, defaults=None, **kwargs): return self.get_queryset().update_or_create(defaults=defaults, **kwargs) @trace_log def get_or_create(self, defaults=None, **kwargs): return self.get_queryset().get_or_create(defaults=defaults, **kwargs) @trace_log def filter(self, *args, **kwargs): return self.get_queryset().filter(*args, **kwargs) @trace_log def delete(self): return self.get_queryset().delete() class CustomGroup(models.Model): objects = CustomGroupManager() """ カスタムグループ用マネージャ。 """ class GroupType(models.IntegerChoices): """グループ種別。""" ROOT = 0, _("Root") """ ルート。 """ ORGANIZATION = 1, _("Organization") """ 組織。 """ DEPARTMENT = 2, _("Department") """ 部門。 """ TEAM = 3, _("Team") """ チーム。 """ OTHER = 99, _("Other") """ その他。 """ group = models.OneToOneField( Group, on_delete=models.CASCADE, related_name="info", verbose_name="Group" ) """ グループ """ group_type = IntegerChoicesField( # group_type = models.IntegerField( choices=GroupType.choices, default=GroupType.OTHER, ) """ グループ種別 """ parent = models.ForeignKey( "self", blank=True, null=True, on_delete=models.CASCADE, related_name="children", verbose_name="Parent", ) """ 親グループ """ created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: verbose_name = _("Group") """ 本モデルの名称。 """ verbose_name_plural = _("Groups") """ 本モデルの複数形の名称 """ @trace_log def clean(self): """parent が自分自身を参照していないか確認する。""" if self.parent: self._check_parent_loop(self.parent) def save(self, *args, **kwargs): """save の前に、clean によるチェックを実施する。""" self.full_clean() return super().save(*args, **kwargs) def _check_parent_loop(self, parent): """親がさらに上の親としてグループを参照していないかチェックする。""" if parent == self: raise ValidationError(_("A group cannot be its own parent.")) if parent.parent: self._check_parent_loop(parent.parent) def __str__(self) -> str: """ 文字列表現を返します。 Return: 本モデルの文字列表現 """ if self.parent: return f"{str(self.parent)}/{self.group.name}" return self.group.name