On receiving a request in a fast api endpoint, I am adding an async task to BackgroundTasks, which in turns uses a boto client to connect to kinesis and dumps the data onto the stream.
The endpoint method is as follows:
@app.post("/endpoint")
async def base_api(req: RequestAPIModel, background_tasks: BackgroundTasks):
try:
jsonObj = req.model_dump()
resp = prepareResponse(jsonObj)
return resp
finally:
background_tasks.add_task(stream_data, jsonObj, resp)
The stream_data method is as follows:
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
async def stream_data(req, apiResp):
timestamp = datetime.now()
dt_stamp = timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
dt = timestamp.date().strftime('%Y-%m-%d')
data = {'request': req, 'response': apiResp, 'timestamp': dt_stamp}
record = json.dumps(data).encode('utf-8')
try:
response = kinesis_client.put_record(StreamName=delivery_stream_name, Data=record, PartitionKey=dt)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print(f"Record streamed successfully with stream SequenceNumber {response['SequenceNumber']}")
else:
print("Error sending Record")
except Exception as ex:
print(traceback.format_exc())
The cpu utilisation is not spiking with the initial config of 4 pods on EKS, but to increase the number of executers, I have increased the number of pods to 8 on EKS worker group where this API Application is hosted, but that did not help as well.