In Django 3/Python, what is the preferred way to remove orphaned records?

533 Views Asked by At

I'm using Django 3 and Python 3.8. I have the following model ...

class Coop(models.Model):
    objects = CoopManager()
    name = models.CharField(max_length=250, null=False)
    types = models.ManyToManyField(CoopType, blank=False)
    addresses = models.ManyToManyField(Address)
    enabled = models.BooleanField(default=True, null=False)
    phone = models.ForeignKey(ContactMethod, on_delete=models.CASCADE, null=True, related_name='contact_phone')
    email = models.ForeignKey(ContactMethod, on_delete=models.CASCADE, null=True, related_name='contact_email')
    web_site = models.TextField()

Note the "phone" and "email" foreign key columns. Is there any Django/Python specific way to automatically remove the ContactMethod records once they become orphaned? That is, if I have my model, in which both columns are populated, and then run

coop.phone = None
coop.save(update_fields=['phone'])

Is there anything that will automatically delete the orphaned records? Or I guess what is the standard way to achieve this? I'm running a MySql 8 db, but i would prefer to exclude DB-specific solutions.

2

There are 2 best solutions below

0
willeM_ Van Onsem On

You can define a Django signal, but Django signals do not always run. Especially when you make ORM calls that perform bulk updates or bulk removals. For example if you set the email and/or phone of all Coops with enabled=False with Coop.objects.filter(enabled=False).update(email=None, phone=None), then it will not run post_save signals, and thus some ContactMethods can be orphaned without the signal being triggered.

Even if you could use a signal, it might not be a good idea. One can also make updates to the database without the ORM query, for example the database administrator through the PhpMyAdmin frontend. So even if it was possible in Django, it would mean one can still orphan objects through another way. Furthermore such signal would handle one ContactMethod at the time, which is more expensive than deleting ContactMethods in bulk.

You can make a management command [Django-doc] that you can run periodically, for example once a day. You can define such management command in your app:

app/
    __init__.py
    models.py
    management/
        __init__.py
        commands/
            __init__.py
            remove_orphan_contactmethod.py
    urls.py
    views.py

In the app/management/commands/remove_orphan_contactmethod.py, you can then detect and remove these ContactMethod objects:

from django.core.management.base import BaseCommand, CommandError
from app.models import ContactMethod

class Command(BaseCommand):
    help = 'Remove ContactMethods not referenced through email or phone'

    def handle(self, *args, **options):
        items = ContactMethod.objects.filter(
            contact_phone=None, contact_email=None
        ).delete()
        number = items.get('app.ContactMethod', 0)
        self.stdout.write(self.style.SUCCESS(f'Removed {number} ContactMethod object(s)'))

Then you can run:

python3 manage.py remove_orphan_contactmethods

To run the command. You can for example specify a cronjob [wiki] or some other recurring task mechanism to perform this command at a certain frequency.

0
piagetjonathan On

I personally use django-cleanup (https://github.com/un1t/django-cleanup), but since I added it after releasing to production, I had to create a Django command to clean up orphan files in the media folder of my S3. As it was challenging to find an existing solution, I'm sharing the command here:

import os
import sys

import boto3
from django.apps import apps
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.validators import EMPTY_VALUES
from django.db import models


class Command(BaseCommand):
    help = "Manage orphan files in S3"

    def add_arguments(self, parser):
        parser.add_argument(
            "--list_orphans",
            action="store_true",
            help="List all orphan files in the media folder of the S3     bucket",
        )
    parser.add_argument(
        "--list_bucket_files",
        action="store_true",
        help="List all files in the S3 bucket",
    )
    parser.add_argument(
        "--list_used_files",
        action="store_true",
        help="List all files used by an object in the django application",
    )
    parser.add_argument(
        "--list_fields",
        action="store_true",
        help="Show file fields detected in django application",
    )
    parser.add_argument(
        "--delete",
        action="store_true",
        help="Delete all orphan files in the S3 bucket",
    )
    parser.add_argument(
        "--stats",
        action="store_true",
        help="Show statistics of files in the S3 bucket",
    )

def handle(self, *args, **options):
    custom_args = ["list_orphans", "list_bucket_files", "list_used_files", "list_fields", "delete", "stats"]

    # Check if all custom arguments are False
    if all(options[arg] is False for arg in custom_args):
        print("No custom arguments provided, displaying help text.")
        prog_name = os.path.basename(sys.argv[0])
        subcommand = self.__class__.__name__.lower()
        self.print_help(prog_name, subcommand)
        return

    s3 = get_s3_client()
    aws_storage_bucket_name = get_bucket_name()

    if options["list_orphans"]:
        used_media = get_used_media()
        all_media = list_s3_objects(s3, aws_storage_bucket_name)
        orphan_files = all_media - used_media
        self.stdout.write(f"Number of orphan files: {len(orphan_files)}")
        if orphan_files:
            self.stdout.write("Orphan files:")
            for orphan_file in orphan_files:
                self.stdout.write(f"- {orphan_file}")

    if options["list_bucket_files"]:
        all_media = list_s3_objects(s3, aws_storage_bucket_name)
        self.stdout.write(f"Total files in bucket: {len(all_media)}")
        self.stdout.write("Files in bucket:")
        for file in all_media:
            self.stdout.write(f"- {file}")

    if options["list_used_files"]:
        used_media = get_used_media()
        self.stdout.write(f"Total used files: {len(used_media)}")
        self.stdout.write("Used files:")
        for used_file in used_media:
            self.stdout.write(f"- {used_file}")

    if options["list_fields"]:
        file_fields = get_file_fields()
        self.stdout.write("File fields detected in django application:")
        for field in file_fields:
            self.stdout.write(f"- {field}")

    if options["delete"]:
        used_media = get_used_media()
        all_media = list_s3_objects(s3, aws_storage_bucket_name)
        orphan_files = all_media - used_media
        if orphan_files:
            self.stdout.write(f"Found {len(orphan_files)} orphan files.")
            user_input = input("Do you want to delete these orphan files? (yes/no): ").strip().lower()
            if user_input == "yes":
                self.stdout.write(f"Deleting {len(orphan_files)} orphan files...")
                deleted_files_count = delete_orphan_files(s3, aws_storage_bucket_name, orphan_files)
                self.stdout.write(self.style.SUCCESS(f"Total files deleted: {deleted_files_count}"))
            else:
                self.stdout.write(self.style.WARNING("File deletion aborted by the user."))
        else:
            self.stdout.write("No orphan files detected")

    if options["stats"]:
        self.stdout.write(f"Total files in bucket: {len(all_media)}")
        self.stdout.write(f"Total used files: {len(used_media)}")
        self.stdout.write(f"Total orphan files: {len(orphan_files)}")


def get_s3_client():
    aws_access_key_id = os.getenv("DJANGO_AWS_ACCESS_KEY_ID")
    aws_secret_access_key = os.getenv("DJANGO_AWS_SECRET_ACCESS_KEY")

    if not aws_access_key_id:
        raise CommandError("DJANGO_AWS_ACCESS_KEY_ID environment variable not set")
    elif not aws_secret_access_key:
        raise CommandError("DJANGO_AWS_SECRET_ACCESS_KEY environment variable not set")

    if settings.SETTINGS_MODULE == "config.settings.local":
        s3 = boto3.client(
            service_name="s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            endpoint_url="http://minio:9000",
        )
    else:
       s3 = boto3.client(
            service_name="s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
        )

    return s3


def get_bucket_name():
   aws_storage_bucket_name = os.getenv("DJANGO_AWS_STORAGE_BUCKET_NAME")
    if not aws_storage_bucket_name:
       raise CommandError("DJANGO_AWS_STORAGE_BUCKET_NAME environment variable not set")
   return aws_storage_bucket_name


def get_file_fields():
    all_models = apps.get_models()
    fields = []

    for model in all_models:
        for field in model._meta.get_fields():
            if isinstance(field, models.FileField) or isinstance(field, models.ImageField):
                fields.append(field)

    return fields


def get_used_media():
    """
    Get media which are still used in models
    """
    media = set()

    for field in get_file_fields():
        is_null = {f"{field.name}__isnull": True}
        is_empty = {f"{field.name}": ""}

        for value in (
            field.model._base_manager.values_list(field.name,     flat=True).exclude(**is_empty).exclude(**is_null)
        ):
            if value not in EMPTY_VALUES:
                normalized_value = value if value.startswith("media/") else f"media/{value}"
                media.add(normalized_value)

    return media


def list_s3_objects(s3_client, bucket_name, folder="media/"):
    """
    List all objects in an S3 bucket
   """
    objects = set()
    paginator = s3_client.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket_name, Prefix=folder):
        for obj in page.get("Contents", []):
            objects.add(obj["Key"])
    return objects


def delete_orphan_files(s3_client, bucket_name, orphan_files):
    deleted_count = 0

    for file_key in orphan_files:
        try:
            response = s3_client.delete_object(Bucket=bucket_name, Key=file_key)
             if response["ResponseMetadata"]["HTTPStatusCode"] == 204:
                 print(f"Successfully deleted: {file_key}")
                 deleted_count += 1
            else:
                 print(f"Failed to delete: {file_key}, Response: {response}")
        except Exception as e:
             print(f"Error deleting {file_key}: {e}")

     return deleted_count

I'd appreciate any tips or suggestions to enhance it if you have an