Data Preprocessing for splitting into multiple records with AWS Lambda at AWS Data Firehose

19 Views Asked by At

I need to split the data coming from AWS IoT Core by time and publish them into s3 in real-time.
For this, I searched other stack overflow pages and also read AWS's reference page but an error occurred.

The most important thing in this task is that the data loaded every second would be received as json data every minute, and it must be stored in s3 by dividing it into time periods of 1 second. Below is an example of the data format received from IoT Core.

{
   "car_id": "example1",
   "gps_data": [
      {"timestamp": 100000, "latitude": 1.0000, "longitude": 1.0000},
      {"timestamp": 100001, "latitude": 1.0001, "longitude": 1.0000},
      {"timestamp": 100002, "latitude": 1.0001, "longitude": 1.0001},
      {"timestamp": 100003, "latitude": 1.0002, "longitude": 1.0002}
   ]
}

The data in the above example need to be changed as follows.

[
   {"car_id": "example1", "timestamp": 100000, "latitude": 1.0000, "longitude": 1.0000},
   {"car_id": "example1", "timestamp": 100001, "latitude": 1.0001, "longitude": 1.0000},
   {"car_id": "example1", "timestamp": 100002, "latitude": 1.0001, "longitude": 1.0001},
   {"car_id": "example1", "timestamp": 100003, "latitude": 1.0001, "longitude": 1.0002}
]

So, I implemented the code to preprocess the data as above, and modified it once more as follows, referring to that JSON must be stringified with line delimeter.

let record_data_string = "";
for(let data of json_data) {
    record_data_string += JSON.stringify(data) + "\n";
}
{"car_id": "example1", "timestamp": 100000, "latitude": 1.0000, "longitude": 1.0000}
{"car_id": "example1", "timestamp": 100001, "latitude": 1.0001, "longitude": 1.0000}
{"car_id": "example1", "timestamp": 100002, "latitude": 1.0001, "longitude": 1.0001}
{"car_id": "example1", "timestamp": 100003, "latitude": 1.0001, "longitude": 1.0002}

After that, the data was bundled with output_record.

let output = [];
let output_record = {
  recordId: record.recordId,
  result: 'Ok',
  data: Buffer.from(record_data_string, 'utf8').toString('base64')
};
output.push(output_record);
let response = {
  'records': output
};
return response;

As a result of checking the log through Cloudwatch, I was able to check the following response value.

{
  records: [
    {
      recordId: '123456789123456789000000000',
      result: 'Ok',
      data: '...'
    }
  ]
}

Even when checking the data value in the response, when decoding, it was confirmed that it was the same as the record_data_string value.

However, the following error was confirmed to occur in the file in bucket/errors-metadata-extraction-failed/ s3 path.

{"attemptsMade":1,"arrivalTimestamp":123456789123456789,"errorCode":"DynamicPartitioning.MetadataExtractionFailed","errorMessage":"Non JSON record provided", ...

What additional settings are needed to solve this problem?

0

There are 0 best solutions below