Improve program/system performance of program run in parallel

43 Views Asked by At

I have a Python script I'm running to retrieve data from an external API.

The script takes an arbitrary ID number and a product name as inputs, and prints and saves to file data that is returned by the API. Data throughput varies by product name, but is returned at roughly the rate of 1 data row per second (slower at different times in the day).

I'm able to run the script in parallel for multiple products but notice CPU load creeps up as the programs continue to run over time (with limited load used by other software). Have seen CPU load go as high as 4 on an 8-core machine. Currently running on Ubuntu 22.04 on an i7-8650 Thinkpad.

Wonder if anyone can shed some light on how to improve the below's performance? Imagine the write to disk (eg, df.to_csv(...) as a primary bottleneck, but not sure how to modify to be written to file as the data is streamed continuously.

from client import *
from wrapper import *
from contract import *
import time
import pandas as pd
import datetime
import threading
import sys
import os

global cl
cl = sys.argv[1]

global sym
sym = sys.argv[2]

global df 
df = pd.DataFrame()


class TestApp(EClient, EWrapper):

    def __init__(self):
        EClient.__init__(self, wrapper=self)

    def reqIds(self, numIds: int):
        return super().reqIds(numIds)
        
    def updateMktDepth(self, reqId: TickerId, position: int, operation: int, side: int, price: float, size: int):
        global df
        super().updateMktDepth(reqId, position, operation, side, price, size)
        
        time = datetime.datetime.now()
        cols = ['Time','Symbol','Position','Operation','Side','Price','Size']
        data = [time,sym,position,operation,side,price,size]        
        print(f'{time}: ReqId: {reqId} Sym: {sym} Position: {position} Operation: {operation} Side: {side}, Price: {price} Size {size}')

        d2 = pd.DataFrame(data, cols)
        d2 = d2.T
        df = pd.concat([df, d2]) #df.concat(d2)
        
        df.to_csv(f'~/data/{sym}_l2.csv')




def main():
    try:        
        
        app = TestApp()
        app.connect("127.0.0.1", 7496, cl)
        print('Connection successful')

        t = threading.Thread(name="API_worker", target=app.run)
        t.start()
        print("Returned from run()")

      
        c = Contract()
        c.localSymbol = sym
        c.secType = 'FUT'
        c.exchange = 'CME'
        c.currency = 'USD'     
        time.sleep(1)

        # clean up prior to call
        f = f'/home/chris/data/{sym}_l2'
        t = datetime.datetime.now().strftime('%Y%m%d%H%M')


        if os.path.isfile(f+'.csv'):
            os.rename(f+'.csv', f'{f}_{t}.csv')

        app.reqMktDepth(cl, c, 20, 0, [])
        

    except KeyboardInterrupt:
        print('Keyboard interrup, processing ended')
        app.disconnect()
       

if __name__ == "__main__":
    main()



        

Edit:

Amended version per comment below:

class TestApp(EClient, EWrapper):

    def __init__(self):
        EClient.__init__(self, wrapper=self)

    def reqIds(self, numIds: int):
        return super().reqIds(numIds)

    def contractDetails(self, reqId: int, contractDetails: ContractDetails):
        super().contractDetails(reqId, contractDetails)
        print(f"contract details: {contractDetails}")

        #return super().contractDetails(reqId, contractDetails)

    def contractDetailsEnd(self, reqId: int):
        print("End of contract details")
        self.disconnect()
        
    def updateMktDepth(self, reqId: TickerId, position: int, operation: int, side: int, price: float, size: int):
        
        super().updateMktDepth(reqId, position, operation, side, price, size)
                
        time = datetime.datetime.now()
        data = [time,sym,position,operation,side,price,size]        
        print(f'{time}: ReqId: {reqId} Sym: {sym} Position: {position} Operation: {operation} Side: {side}, Price: {price} Size {size}')
               
        with open(f'/home/chris/data/{sym}_l2.csv', mode='a', newline='') as file:
            wr = csv.writer(file)
            wr.writerow(data)



def main():

    try:            
            app = TestApp()
            app.connect("127.0.0.1", 7496, cl)
            print('Connection successful')

            t = threading.Thread(name=f'API_worker_{sym}', target=app.run)
            t.start()
            # app.run()
            print("Returned from run()")
        
            c = Contract()
            c.localSymbol = sym
            c.secType = 'FUT'
            c.exchange = 'CME'
            c.currency = 'USD'     
            time.sleep(1)

            # clean up prior to call
            f = f'/home/chris/data/{sym}_l2'
            t = datetime.datetime.now().strftime('%Y%m%d%H%M')
            cols = ['Time','Symbol','Position','Operation','Side','Price','Size']

            # if file exists, rename and create new file with headers
            if os.path.isfile(f+'.csv'):
                os.rename(f+'.csv', f'{f}_{t}.csv')
                
                with open(f+'.csv', 'w', newline='') as file:
                    wr = csv.writer(file)
                    wr.writerow(cols)

            # else just create new files with headers
            else:
                with open(f+'.csv', 'w', newline='') as file:
                    wr = csv.writer(file)
                    wr.writerow(cols)

            app.reqMktDepth(cl, c, 20, 0, [])  
0

There are 0 best solutions below