Fetching data from API takes longer time to run because the data is large

36 Views Asked by At

I have this function that extracts data from an API, but the API returns about 505,000 rows of data, this takes longer time to run .

Here is my function, is there a better way to optimize it, it slows down my Airflow dag run

def extract_people():
    all_data = []
    offset = 500

    # Continue fetching data until there are no more results left
    while True:
        url = f"https://api.apilayer.com/unogs/search/people?person_type=Director"
        payload = {}
        headers = {"apikey": '******'}

        response = requests.request("GET", url, headers=headers, data=payload)
        data = response.json()

        # Check if there are no more results left
        if not data["results"]:
            break

        # Extract relevant fields from each result and append to all_data
        for result in data["results"]:
            all_data.append({
                "netflix_id": result["netflix_id"],
                "full_name": result["full_name"],
                "person_type": result["person_type"],
                "title": result["title"]
            })
        
        # Increment the offset for the next request
        offset += limit
        print(f'loaded {offset} data')

    # Create a DataFrame from all_data
    df = pd.DataFrame(all_data)

    # Save the DataFrame as a new CSV file with timestamp
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    csv_file_name = f'Netflix_people_{timestamp}.csv'
    df.to_csv(csv_file_name, index=False)
    print(f"CSV file {csv_file_name} saved successfully.")

    return df

The function takes time to run, don't know the best way optimize the code.

1

There are 1 best solutions below

1
J W On

I have improved your code, but because there is no API, it cannot be tested, so there may be errors in the code. Please use it as a reference only.

Method:

  1. Paging processing

  2. Concurrent requests

  3. Throttling

Code:

import requests
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_data(offset, limit):
    url = f"https://api.apilayer.com/unogs/search/people?person_type=Director&offset={offset}&limit={limit}"
    headers = {"apikey": '******'}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.json()['results']
    else:
        print(f"Error fetching data: {response.status_code}")
        return []

def extract_people():
    all_data = []
    offset = 0
    limit = 500
    threads = []

    with ThreadPoolExecutor(max_workers=10) as executor:
        while True:
            threads.append(executor.submit(fetch_data, offset, limit))
            offset += limit
            if len(threads) == 10:
                break
        
        for task in as_completed(threads):
            results = task.result()
            if not results:
                break

            for result in results:
                all_data.append({
                    "netflix_id": result["netflix_id"],
                    "full_name": result["full_name"],
                    "person_type": result["person_type"],
                    "title": result["title"]
                })

    df = pd.DataFrame(all_data)
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    csv_file_name = f'Netflix_people_{timestamp}.csv'
    df.to_csv(csv_file_name, index=False)
    print(f"CSV file {csv_file_name} saved successfully.")

    return df