Mongodb transaction failed while trying to insert 150k records

184 Views Asked by At

Need help to fix mongodb transaction issue while inserting 150k records

We have car dataset which contain 150k records. While inserting whole data I am dividing data into 2k chunk to reduce the server load. I am using mongodb atlas free trial to store data and server config 4 core CPU and 8 GB RAM.
While inserting chunks I am using mongodb transaction and writable stream to reduce server load. Using socket.io to pass data in chunk to backend

Frontend Code for for uploading data

import { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";

const batchSize = 1000;
const UploadCarForm = ({ socket }) => {
  const [validData] = useState(data);
  const [progress, setProgress] = useState(0);
  const [isLoading, setLoading] = useState(false);
  const [totalRowsProcessed, setTotalRowsProcessed] = useState(0);
  const navigate = useNavigate();

  useEffect(() => {
    // Listen for 'progress' events from the server
    const handleProgressUpdate = async (update) => {
      // Handle error or completion messages received from the backend
      if (update.error) {
        // Display error message to the user
        showSnackbar({
          type: "error",
          message: update.error,
        });
        setLoading(false);
        // setProgress(0);
        setTotalRowsProcessed(0);
      }

      if (update.complete) {
        const processedRows = totalRowsProcessed + update.totalRows;
        setTotalRowsProcessed(processedRows);
        const progress = Math.floor((processedRows / validData.length) * 100);
        setProgress(progress);

        if (processedRows === validData.length) {
          showSnackbar({
            message: update.message,
          });
          setLoading(false);

          setTimeout(() => {
            navigate("/manage-data");
          }, 2000);
        } else {
          // if loading is continue
          if (isLoading) {
            await new Promise((resolve) => setTimeout(resolve, 100));

            const batchRows = validData.slice(
              processedRows,
              processedRows + batchSize
            );

            socket.emit("upload", {
              rows: batchRows,
              rowIndex: processedRows + batchRows.length,
            });
          }
        }
      }
    };

    socket.on("progress", handleProgressUpdate);

    // Clean up the event listener when the component unmounts
    return () => {
      socket.off("progress", handleProgressUpdate);
    };
  }, [
    totalRowsProcessed,
    showSnackbar,
    socket,
    validData,
    navigate,
    isLoading,
  ]);

  const onSubmit =  async () => {
    try {
      setLoading(true);
      setProgress(0);
      setTotalRowsProcessed(0);

      const batchRows = validData.slice(totalRowsProcessed, batchSize);
      socket.emit("upload", {
        rows: batchRows,
        rowIndex: batchRows.length,
      });
    } catch (err) {
      helpers.setStatus({ success: false });
      helpers.setErrors({ submit: err.message });
      helpers.setSubmitting(false);
    }
  }

  return (
    <button
    onClick={onSubmit}
    type="submit"
    >
    Upload File
    </button>
  );
};

export default UploadCarForm;

Backend code for uploading chunk data

export async function insertDataWithStreamSocket(rows, socket) {
 
  let totalRows = rows.length;
  const writableStream = new Writable({
    objectMode: true,
    write: async (chunk, encoding, callback) => {
      const session = await mongoose.startSession({
        sessionOptions: {
          transactionLifetimeLimitSeconds: 3600 // 1 hour
        }
      });
      session.startTransaction();

      try {
        const bulkOps = chunk.map(row => {
          const { make, model, grade, state, fromDate, toDate, engine, filter, price } = row;

          return {
            updateOne: {
              filter: { make, model, grade, state, fromDate, toDate },
              update: { engine, filter, price },
              upsert: true
            }
          };
        });

        await Car.bulkWrite(bulkOps, { session });

        await session.commitTransaction();

        callback();
      } catch (error) {
        console.error('Error during transaction:', error);
        await session.abortTransaction();
        callback(error);
      } finally {
        session.endSession();
      }
    }
  });

  writableStream.on('error', error => {
    socket.emit('progress', {
      error: 'Database operation error occurred. Please try again later.'
    });
  });

  writableStream.write(rows);

  writableStream.end(() => {
    console.log('Data insertion completed.');
    socket.emit('progress', {
      progress: 100,
      message: 'Data set uploaded successfully.',
      complete: true,
      totalRows: totalRows
    });
  });
}

Mongodb Error

Error writing data: MongoBulkWriteError: operation was interrupted because the transaction exceeded the configured 'transactionLifetimeLimitSeconds'

I had already set transactionLifetimeLimitSeconds: 3600 // 1 hour

Sample record :

{
  "fromDate": "2022-12-31T18:30:00.000Z"
  "grade": "Ascent Sport",
  "make": "Toyota",
  "model": "Yaris",
  "state": "VIC",
  "toDate": "2022-12-31T18:30:00.000Z"
  "engine": "1.5 Petrol FWD Auto",
  "filter": "Y",
  "price": 28607,
}

I am expecting to insert all the records without failed.

0

There are 0 best solutions below