Long running task (return handler function and continue executing code in goroutine)

698 Views Asked by At

I am building a REST API in golang using Echo. I have a handler function Temp that handles a Post request with some form-data.

func NewBatchHandler(e *echo.Echo, us domain.BatchUsecase) {
    handler := &BatchHandler{
        BUsecase: us,
    }
    e.POST("/upload", handler.Temp)
}
func (a *BatchHandler) Temp(c echo.Context) error {
    email, organization, file, err := validateFormInput(c)
    if err != nil {
        return c.JSON(http.StatusUnprocessableEntity, err.Error())
    }

    filePath := getFilePath(homeDir)

    err = a.BUsecase.Save(file, homeDir, filePath)
    if err != nil {
        return c.JSON(getStatusCode(err), ResponseError{Message: err.Error()})
    }

    err = a.BUsecase.ValidateFile(filePath)
    if err != nil {
        return c.JSON(getStatusCode(err), ResponseError{Message: err.Error()})
    }

    ctx := c.Request().Context()
    ca := make(chan []domain.Variant, file.Size)
    go func() {
        res, err := a.BUsecase.DoSomethingForEveryLong(ctx, filePath)
        if err != nil {
            logrus.Error("Error while annotating file: ", err)
            return
        }
        ca <- res
    }()

    select {
    case result := <-ca:
        return c.String(http.StatusAccepted, fmt.Sprintf("Successfully Annotated the file", len(result)))
    case <-c.Request().Context().Done(): // Check context.
        // If it reaches here, this means that context was canceled (a timeout was reached, etc.).
        return nil
    }
}

I want the Temp function to return with a 202 statuscode after ValidateFile returns but I want to execute some long running functions and dont want the client to wait for them. How can I implement this?

func (a *batchUsecase) DoSomethingForEveryLong(ctx context.Context, filePath string) (res []domain.Variant, err error) {
    ctx, cancel := context.WithTimeout(ctx, a.contextTimeout)
    defer cancel()

    file, err := os.Open(filePath)
    if err != nil {
        return nil, fmt.Errorf("failed to open file: %w", err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    for scanner.Scan() {
        variantStr := scanner.Text()
        logrus.Info("Preparing query for variant: ", variantStr)
        variant, err := a.batchRepo.Fetch(ctx, variantStr)
        if err != nil {
            return nil, fmt.Errorf("failed to fetch variant: %w", err)
        }
        res = append(res, variant)
    }

    if err := scanner.Err(); err != nil {
        return nil, fmt.Errorf("failed to scan file: %w", err)
    }
    logrus.Info("Annotated file successfully", res)
    return res, nil
}

Here's a DoSomethingForVeryLong function for reference. Am I missing something very trivial?

So far I have tried looking into channels and goroutines

1

There are 1 best solutions below

0
Zeke Lu On

In the case that you don't need to include the result from the long running task in the response, you can do it like the demo at the bottom of this answer.

The demo dedicates a big part in the main func to make sure the process is not terminated immediately after the echo server is shutdown. So the long running tasks can run to the finish correctly.

You don't have to write your code exactly the same as this one. This is just a demo to show the idea.

package main

import (
    "context"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/labstack/echo/v4"
)

type handler struct {
    ch chan string
}

func (h *handler) upload(c echo.Context) error {
    // do some job to initialize filePath
    filePath := "file path to the uploaded file"

    // send filePath to the chan, another goroutine will read from the chan
    // and process it. The time-consuming task is processed in that goroutine.
    h.ch <- filePath

    // This handler func returns immediately so the client does not need to wait.
    return c.String(http.StatusAccepted, "Successfully Annotated the file")
}

func doSomethingForEveryLong(filePath string) {
    time.Sleep(10 * time.Second)
}

func main() {
    // Choose the chan size according to your need.
    ch := make(chan string, 10)

    h := &handler{ch: ch}
    e := echo.New()
    e.POST("/upload", h.upload)

    go func() {
        err := e.Start(":1323")
        // Close the chan once e.Start returns.
        close(ch)

        if err != http.ErrServerClosed {
            e.Logger.Fatal(err)
        }
    }()

    ctx, _ := signal.NotifyContext(context.Background(),
        os.Interrupt,
        syscall.SIGTERM,
    )

    // This goroutine listens for the signals and shutdown the echo server.
    go func() {
        <-ctx.Done()

        shutDownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        if err := e.Shutdown(shutDownCtx); err != nil {
            e.Logger.Error(err)
        }
    }()

    // Reads from the chan and processes the time-consuming tasks.
    // The chan will be closed when the echo server is shutdown.
    for filePath := range ch {
        doSomethingForEveryLong(filePath)
    }
}