I have a Python script that downloads and saves AWS CloudWatch logs.
I utilize aioboto3 and asyncio for concurrency to maximize my runtime results. However, I am not sure about my approach and would love to get more recommendations for improvement and optimization.
I think AWS's max throttling is around 30 requests per second. I found that I have the best results with running around 25 requests concurrently. Running over 25 coroutines yields a ThrottelingException.
I configured the max_attempts to be 6 in the retries dict inside the config. Running 25 coroutines seems to do the job; however, when running more coroutines, an exception is raised, and I effectively lose data since no more retries will be made in the context of that request.
Also, I use a semaphore to control the concurrent requests as best as possible, mainly to avoid getting ThrottlingExceptions. I also use batch processing of 25 coroutines per batch. Each coroutine is a daily query search inside the destined log group.
Why daily? because I found out that it is way faster than querying ranges of days, weeks, months, etc. Also, it gives me more granular control over when I can stop running batches (for example, from a certain batch and forward, I do not have any logs, so I can stop once the first batch fails...)
I also tried increasing the value of max_pool_connections, but it seems to give the best result when being set to 10 (which I think is the default value anyway), so it is pretty weird.
Lastly, I thought about spawning a process for each log group, but it seems redundant since AWS will still limit my requests to 30 per second (I think), and Python's GIL will not allow me to run more than two threads at the same time anyways, so it seems like this has no use at all.
I would love to get some idea
Here's a short demonstration of my script:
import asyncio
import aioboto3
from botocore.config import Config
from botocore.exceptions import ClientError
async def fetch_logs(semaphore, session, start_time, end_time, log_group_name):
async with session.client(service_name='logs', region_name='us-east-1',
config=Config(
{"retries": {"max_attempts": 6, "mode": "standard"},
"max_pool_connections": 10})) as logs_client:
async with semaphore:
try:
start_query_response = await logs_client.start_query(
logGroupName=log_group_name,
startTime=int(start_time.timestamp()),
endTime=int(end_time.timestamp()),
queryString="some_query")
query_id = start_query_response['queryId']
response = None
while response is None or response['status'] == 'Running':
# Simple polling:
await asyncio.sleep(1)
response = await logs_client.get_query_results(queryId=query_id)
except ClientError as e:
print(f"An error occurred: {e}")
async def main():
session = aioboto3.Session(profile_name="profile_name", region_name="us-east-1")
log_groups = get_log_groups() # Get log groups from somewhere
if log_groups:
for log_group in log_groups:
# Define the end_date as today and start going backwards
today = datetime.now()
absolute_start_date = today - timedelta(days=730) # Go 2 years back
current_end_date = today
semaphore = asyncio.Semaphore(25)
while current_end_date > absolute_start_date:
batch_coroutines = []
for _ in range(25):
current_start_date = current_end_date - timedelta(days=1)
if current_start_date <= absolute_start_date:
break
batch_coroutines.append(
fetch_logs(semaphore, session, current_start_date, current_end_date, log_group_name)
)
current_end_date = current_start_date
await asyncio.gather(*batch_coroutines)
if __name__ == "__main__":
asyncio.run(main()) ```