On receiving a request in a fast api endpoint, I am adding an async task to BackgroundTasks, which in turns uses a boto client to connect to kinesis and dumps the data onto the stream.

The endpoint method is as follows:

@app.post("/endpoint")
async def base_api(req: RequestAPIModel, background_tasks: BackgroundTasks):
    try:
        jsonObj = req.model_dump()
        resp = prepareResponse(jsonObj)
        return resp
    finally:
        background_tasks.add_task(stream_data, jsonObj, resp)

The stream_data method is as follows:

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

async def stream_data(req, apiResp):
    timestamp = datetime.now()
    dt_stamp = timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
    dt = timestamp.date().strftime('%Y-%m-%d')
    data = {'request': req, 'response': apiResp, 'timestamp': dt_stamp}
    
    record = json.dumps(data).encode('utf-8')
    
    try:
        response = kinesis_client.put_record(StreamName=delivery_stream_name, Data=record, PartitionKey=dt)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            print(f"Record streamed successfully with stream SequenceNumber {response['SequenceNumber']}")
        else:
            print("Error sending Record")
    except Exception as ex:
        print(traceback.format_exc())

The cpu utilisation is not spiking with the initial config of 4 pods on EKS, but to increase the number of executers, I have increased the number of pods to 8 on EKS worker group where this API Application is hosted, but that did not help as well.

0

There are 0 best solutions below