Hey im trying to handle file upload with use of celery tasks. However after all tasks complete, file appears to be in storage and not in database when i query /files. Thumbnail (which is generated as second task, but is much faster) is both saved in storage and database. What could be the issue?
views.py
@extend_schema(
description="Upload file",
request=FileSerializer,
responses={201: FileSerializer},
)
def post(self, request, *args, **kwargs):
file = request.data["file"]
total_sum = File.objects.filter(owner=request.user).aggregate(models.Sum("size"))
total_sum = total_sum["size__sum"] if total_sum["size__sum"] else 0
if total_sum + file.size > request.user.storage_limit:
return Response(
{"error": "Storage limit exceeded"}, status=status.HTTP_400_BAD_REQUEST
)
request.data["size"] = file.size
serializer = FileSerializer(data=request.data)
if serializer.is_valid():
serializer.save(owner=request.user, size=file.size)
FileService.upload_file(serializer.data["id"], file.read(), file.content_type)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
services.py
import base64
import mimetypes
from common.service import Service
from storage.tasks import UploadHandler, ThumbnailHandler
class FileService(Service): # Service is empty class, just class Service: ...
handlers = [UploadHandler, ThumbnailHandler]
handler = handlers[0]()
for next_handler in handlers[1:]:
handler.set_next(next_handler())
@staticmethod
def get_file_extension(mimetype: str):
return mimetypes.guess_extension(mimetype)
@staticmethod
def upload_file(file_id: str, file: bytes, mimetype: str):
file_bytes = base64.b64encode(file)
extension = FileService.get_file_extension(mimetype)
FileService.handler.handle((file_id, extension, file_bytes))
tasks.py
import base64
from celery import shared_task
from django.conf import settings
from django.core.files.base import ContentFile
from services.grpc_client import Client
from common.chain import AbstractHandler
from storage.models import File
def next_status(file: File):
file.status += 1
file.save()
def update_file_status(func):
def wrapper(self, request):
file_id, *args = request
file_object = File.objects.get(id=file_id)
next_status(file_object)
return func(self, request)
return wrapper
class UploadAbstractHandler(AbstractHandler):
@staticmethod
def get_file(file_id: str):
return File.objects.get(id=file_id)
@staticmethod
def get_file_data(content: bytes, decode=False):
if decode:
content = base64.b64decode(content.decode(encoding="utf-8"))
file_data = ContentFile(content)
return file_data
class UploadHandler(UploadAbstractHandler):
@staticmethod
@shared_task
def handle_file_upload(file_id: str, extension: str, content: bytes):
file = UploadAbstractHandler.get_file(file_id)
file_data = UploadAbstractHandler.get_file_data(content, decode=True)
file.file.save(f"{file.id}.{extension}", file_data)
@update_file_status
def handle(self, request):
file_id, extension, content = request
self.handle_file_upload.delay(file_id, extension, content)
return super().handle(request)
class ThumbnailHandler(UploadAbstractHandler):
@staticmethod
@shared_task
def handle_thumbnail_generation(file_id: str, content: bytes):
file = UploadAbstractHandler.get_file(file_id)
file_data = UploadAbstractHandler.get_file_data(content, decode=True)
try:
value = Client(settings.GRPC_ADDR).generate_thumbnail(file_data.read())
file_data = UploadAbstractHandler.get_file_data(value, decode=False)
file.thumbnail.save(f"{file.id}_thumb.png", file_data)
except Exception as e:
print(e)
@update_file_status
def handle(self, request):
file_id, _, content = request
self.handle_thumbnail_generation.delay(file_id, content)
return super().handle(request)
serializers.py
class FileSerializer(serializers.ModelSerializer):
class Meta:
model = File
fields = [
"id",
"created_at",
"updated_at",
"name",
"group",
"description",
"tags",
"status",
"file",
"size",
"thumbnail",
]
read_only_fields = [
"id",
"created_at",
"updated_at",
"status",
"size",
"thumbnail",
]
def _get_file_ext(self, name: str):
return name.split(".")[-1]
def _get_valid_filename(self, name: str, file_obj):
ext_from_file = self._get_file_ext(file_obj.name)
if not name.lower().endswith(ext_from_file.lower()):
name = f"{name}.{ext_from_file}"
return name
def validate(self, attrs):
for key in attrs:
if attrs[key] == "":
attrs[key] = None
if attrs[key] == [""]:
attrs[key] = []
attrs["name"] = self._get_valid_filename(attrs["name"], attrs["file"])
return super().validate(attrs)
def save(self, **kwargs):
self.validated_data.pop("file")
return super().save(**kwargs)
``` I pop file in serializer to handle file upload in celery task