Having Issues With Thread Pool Executor

367 Views Asked by At

i made a python bot that receive cmds from users and perfom the task . i am taking multiple ids ( as example ) and sending them to an api and result is storing and then sending the results to the user .

i am not using for loops cause its too slow i am using thread pool executor . for every user i gave max_workers = 5 .

when 1-2 people trying that commands its working perfectly . but when 50-80 users trying that commands they always getting "Checking..." and then no further improvement .

( the api i am sending request is perfectly fine it has no issue . it return the results in 1-1.5s even if you are making 10k requets at a time . )

this is my code . have a look . when someone send cmd it executed like this

from pyrogram import Client, filters
import  requests
import concurrent.futures
import threading
import asyncio
import concurrent.futures
def do_work(id):
     while True:
        result = requests.get(f"example.com/api.php?id={id}")
        if result.status_code == 200:
            return result.text
        else:
            continue
@Client.on_message(filters.command("getresult", [".", "/"]))

def multi(Client, message):
    t1 = threading.Thread(target=bcall, args=(Client, message))
    t1.start()

def bcall(Client, message):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(thread(Client, message))
    loop.close()

async def thread(Client, message):
    ids = message.text.split("\n")
    ALL_RESULT = ""
    await message.reply_text("Checking...")
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = []
        for i in ids:
            future = executor.submit(do_work , i)
            futures.append(future)
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            ALL_RESULT += result + "\n"

    await message.reply_text(ALL_RESULT)
    await message.reply_text(f"Done Checking {len(ids)} IDS")

( while loop ends in 2-3 sec . also i am using max_retry = 2) to the while loop then it breaks ) and when 50-80 user send cmd they always stuck at "Checking..." message . maybe the threads created are in waiting or something i dont know but i checked my cpu usage and its only using 25-30% . i am frustrated please any one help me fix this .

1

There are 1 best solutions below

0
Nusab19 On

Your approach was too complicated. Here's a fully asynchronous way to achieve the same result. Explanation is given after the code.

from pyrogram import Client, filters
import httpx
import asyncio

# Bot Token
key = "6332785134:AAHIeQIMZwbnPqYk6cViTFKZjQ3E79u9QT0"
api_id = "14349382"
api_hash = "27a19c661ab3d393a068a55d55446078"


APP = Client("Test", api_id=api_id, api_hash=api_hash, bot_token=key, in_memory=1)


testApi = "https://official-joke-api.appspot.com/jokes/programming/random"

async def do_work(id):
     ses = httpx.AsyncClient(timeout=10)
     errorCount = 1
     
     while True:
        if errorCount>=3:break
        try:
            result = await ses.get(testApi)
            
            if result.status_code == 200:
                return result.text
        
        except httpx.TimeoutException:
            print("Request Time Out")
        except:pass
         
        errorCount += 1


@APP.on_message(filters.command("getresult", [".", "/"]))
async def getResults(_, message):
    await message.reply("Checking...")
    # starting from second item.
    # because first item is the /getresult command
    ids = message.text.split()[1:]
    
    works = [do_work(i) for i in ids ]
    allResults = ""
    per = 10  # how many id to retrive at once
    
    while works:
        a = works[:per]
        a = await asyncio.gather(*a)
        for i in a:
            
            if i != None: # if do_work returned the result
                allResults+=i + "\n"
            
            else: # if do_work's loop broke. so None was returned.
                allResults += "Failed\n"
        
        works = works[per:]
    
    await message.reply(allResults)
    await message.reply(f"Done checking {len(ids)} ID")


APP.run()

Note: Bot token is revoked. So no worries.

instead of using requests, I used httpx. So I can make asynchronous http requests.

And we kept a count of errors in the while loop. So, the loop will not continue for more than 3 times.

And in the getResults function, I used asyncio.gather to retrieve the results concurrently. As you were using max_workers=10, so I also implemented it by the per variable and a loop. So, the CPU will have less pressure. You can modify the per variable as your need.

Let me know if this solved your problem.