im making a neural network that will allow me to connect to the coinbase api and trade but ive been having authentication errors so far. how do i fix it here is the code?
import configparser
import http.client
import json
import hashlib
import hmac
import time
from datetime import datetime, timezone
from typing import Tuple, List, Dict, Any
import requests
class API:
config = configparser.ConfigParser()
config.read(r"D:\repos\user\Coinbase_neural_network\config.ini")
api_key = config.get('coinbase', 'API_KEY')
secret_key = config.get('coinbase', 'SECRET_KEY')
@staticmethod
def generate_signature(timestamp: str, method: str, request_path: str, body: str,
secret_key: str = secret_key) -> str:
signature_string = timestamp + method.upper() + request_path + body
hmac_object = hmac.new(secret_key.encode(), signature_string.encode(), hashlib.sha256)
return hmac_object.hexdigest()
@staticmethod
def get_best_bid_ask(api_key: str = api_key, secret_key: str = secret_key) -> str:
payload: str = ''
timestamp = datetime.now(timezone.utc).isoformat()
headers: Dict[str, str] = {
'Content-Type': 'application/json',
'CB-ACCESS-KEY': api_key,
'CB-ACCESS-SIGN': API.generate_signature(secret_key, timestamp, "GET", "/api/v3/brokerage/best_bid_ask",
payload),
'CB-ACCESS-TIMESTAMP': timestamp,
}
with http.client.HTTPSConnection("api.coinbase.com") as conn:
conn.request("GET", "/api/v3/brokerage/best_bid_ask", payload, headers)
res: http.client.HTTPResponse = conn.getresponse()
data: bytes = res.read()
conn.close()
return data.decode("utf-8")
@staticmethod
def get_brokerage_accounts(limit: int, cursor: str, api_key: str = api_key, secret_key: str = secret_key) -> str:
payload: str = ''
timestamp = datetime.now(timezone.utc).isoformat()
headers: Dict[str, str] = {
'Content-Type': 'application/json',
'CB-ACCESS-KEY': api_key,
'CB-ACCESS-SIGN': API.generate_signature(secret_key, timestamp, "GET",
f"/api/v3/brokerage/accounts?limit={limit}&cursor={cursor}",
payload),
'CB-ACCESS-TIMESTAMP': timestamp,
}
with http.client.HTTPSConnection("api.coinbase.com") as conn:
conn.request("GET", f"/api/v3/brokerage/accounts?limit={limit}&cursor={cursor}", payload, headers)
res: http.client.HTTPResponse = conn.getresponse()
data: bytes = res.read()
conn.close()
return data.decode("utf-8")
@staticmethod
def get_product_candles(product_id: str, start: str, end: str, granularity: str, api_key: str = api_key,
secret_key: str = secret_key) -> List[Dict[str, str]]:
timestamp = str(int(time.time()))
url = f"https://api.coinbase.com/api/v3/brokerage/products/{product_id}/candles"
params = {
"start": start,
"end": end,
"granularity": granularity
}
headers = {
"CB-ACCESS-KEY": api_key,
"CB-ACCESS-SIGN": API.generate_signature(secret_key, timestamp, "GET", url, ""),
"CB-ACCESS-TIMESTAMP": timestamp
}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
candles: List[List[float]] = response.json()
candles_data: List[Dict[str, str]] = [dict(zip(["start", "low", "high", "open",
"close", "volume"], c)) for c in candles]
print(candles_data)
return candles_data
@staticmethod
def get_market_trades(product_id: str, limit: int, api_key: str = api_key,
secret_key: str = secret_key) -> Tuple[List[Dict[str, str]], str, str]:
payload: str = ''
timestamp = datetime.now(timezone.utc).isoformat()
headers: Dict[str, str] = {
'Content-Type': 'application/json',
'CB-ACCESS-KEY': api_key,
'CB-ACCESS-SIGN':
API.generate_signature(secret_key, timestamp, "GET",
f"/api/v3/brokerage/products/{product_id}/ticker?limit={limit}", payload),
'CB-ACCESS-TIMESTAMP': timestamp,
}
conn = http.client.HTTPSConnection("api.coinbase.com")
conn.request("GET", f"/api/v3/brokerage/products/{product_id}/ticker?limit={limit}", payload, headers)
res: http.client.HTTPResponse = conn.getresponse()
data: bytes = res.read()
trades: Dict[str, Any] = json.loads(data.decode("utf-8"))
trade_data: List[Dict[str, str]] = [dict(zip(["trade_id", "product_id", "price", "size", "time", "side", "bid",
"ask"], t)) for t in trades["trades"]]
best_bid: str = trades["best_bid"]
best_ask: str = trades["best_ask"]
conn.close()
return trade_data, best_bid, best_ask
@staticmethod
def create_buy_market_order(client_order_id: str, product_id: str, side: str, order_configuration: str,
api_key: str = api_key, secret_key: str = secret_key) -> str:
payload: str = json.dumps({
"client_order_id": client_order_id,
"product_id": product_id,
"side": side,
"order_configuration": order_configuration
})
timestamp = datetime.now(timezone.utc).isoformat()
headers: Dict[str, str] = {
'Content-Type': 'application/json',
'CB-ACCESS-KEY': api_key,
'CB-ACCESS-SIGN': API.generate_signature(secret_key, timestamp, "POST", "/api/v3/brokerage/orders",
payload),
'CB-ACCESS-TIMESTAMP': timestamp,
}
with http.client.HTTPSConnection("api.coinbase.com") as conn:
conn.request("POST", "/api/v3/brokerage/orders", payload, headers)
res: http.client.HTTPResponse = conn.getresponse()
data: bytes = res.read()
conn.close()
return data.decode("utf-8")
@staticmethod
def create_sell_market_order(client_order_id: str, product_id: str, side: str, order_configuration: str,
api_key: str = api_key, secret_key: str = secret_key) -> str:
payload: str = json.dumps({
"client_order_id": client_order_id,
"product_id": product_id,
"side": side,
"order_configuration": order_configuration
})
timestamp = datetime.now(timezone.utc).isoformat()
headers: Dict[str, str] = {
'Content-Type': 'application/json',
'CB-ACCESS-KEY': api_key,
'CB-ACCESS-SIGN': API.generate_signature(secret_key, timestamp, "POST", "/api/v3/brokerage/orders",payload),
'CB-ACCESS-TIMESTAMP': timestamp,
}
with http.client.HTTPSConnection("api.coinbase.com") as conn:
conn.request("POST", "/api/v3/brokerage/orders", payload, headers)
res: http.client.HTTPResponse = conn.getresponse()
data: bytes = res.read()
conn.close()
return data.decode("utf-8")
ive tried iso strtime uninx etc and it all comes out wrong. am i acesing it wrong. im trying to get this done a decent amount of time before black friday so i can buy a new laptop for school and pay off some of my loans and need help.