I am working on fairly complex project that involves data transfer from ESP32 to PC via TCP/IP protocol. Both codes (I guess) are not worth to investigate on the spot, however, I will provide it down bellow. The main algorithm is as follows:
ESP32 constantly sampling a signal of interest via ADC12020 converter. When ESP32 receives an interrupt (meaning the signal has reached threshold level) it accumulates and forms 256 byte data packet and sends it to the PC via TCP/IP (ESP32 acting as client). This operation continues untill the ESP32 is powered down.
On the PC side I have wrote a python script that emulates a TCP server. Server is listening for incoming data. As soon as it receives a data packet from the client it assigns a timestamp to it and does minimal data processing steps (some bitwise operations) Then it forms some kind of a data unit (with correct formating, timestamp, number format etc.) and stores it in a 100 MB internal buffer. When the ESP32 ends its operation and disconnects, it indicates that there will be no more incoming data packets and the server should write all the accumulated results to a textfile.
All this is achieved successfully, except for the fact that I only get maximum of around 170 voltage pulses per second registered (judging on the timestamps and the result file data.) while the test conditions easily reach 10000 pulses per second. So 170 pulses/second * 256 bytes (one processed pulse digital volume) = 0.34 Mbit/s. This number looks crazy small considering ESP has standard 10/100 Mbit Ethernet interface (I am using ESP32 WROOVER-E ethernet kit) File write speed could have been a huge bottleneck, but now it is only writing file once in the end. Is it the timestamps assignment and bitwise operations that takes so long or the inherent slowness of python that prevents me from reasonable data transfer speeds? Any advices on how to improve it? Thank you in advance.
P.S My ESP32 is tested and really sending packets at actual voltage pulses rates (10-30k / second)
ESP32 client
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdbool.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/queue.h"
#include "freertos/semphr.h"
#include "driver/gpio.h"
#include "freertos/timers.h"
#include "soc/soc.h"
#include "soc/gpio_struct.h"
#include "soc/gpio_reg.h"
#include "esp32/rom/ets_sys.h"
#include "soc/rtc.h"
#include "esp_system.h"
#include "esp_wifi.h"
#include "esp_event.h"
#include "esp_log.h"
#include "nvs_flash.h"
#include "esp_netif.h"
#include "protocol_examples_common.h"
#include "esp_transport.h"
#include "esp_transport_tcp.h"
#include "esp_transport_socks_proxy.h"
#include <sys/time.h>
#include <errno.h>
#include <arpa/inet.h>
#include <math.h>
#define TARGET_ADDR "169.254.157.27"
#define TARGET_PORT 5190
#ifdef CONFIG_EXAMPLE_ENABLE_PROXY
#define PROXY_ADDR CONFIG_EXAMPLE_PROXY_ADDR
#define PROXY_PORT CONFIG_EXAMPLE_PROXY_PORT
#endif
#define MAX_RETRY_COUNT 5
#define MAX_POLL_TIMEOUT_ERRORS 5
#define INTERRUPT_INPUT 15
#define GPIO_CLOCK 14
#define NUM_GPIO_PINS 12
#define NUM_CYCLES 64
#define GPIO_PIN_12 12
#define GPIO_PIN_13 13
#define GPIO_PIN_2 2
#define GPIO_PIN_4 4
#define GPIO_PIN_16 16
#define GPIO_PIN_17 17
#define GPIO_PIN_32 32
#define GPIO_PIN_33 33
#define GPIO_PIN_34 34
#define GPIO_PIN_35 35
#define GPIO_PIN_36 36
#define GPIO_PIN_39 39
uint64_t get_time_us() {
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t)tv.tv_sec * 1000000 + tv.tv_usec;
}
static const char *TAG = "tcp_transport_client";
esp_transport_handle_t transport = NULL;
static bool is_connected = false;
static int unsuccessful_retries = 0;
static int consecutive_poll_timeout_errors = 0;
static bool received_from_server = false; // Flag to indicate if a message has been received from the server
static const gpio_num_t GPIO_PINS[NUM_GPIO_PINS] = {
GPIO_PIN_12, GPIO_PIN_13, GPIO_PIN_2, GPIO_PIN_4,
GPIO_PIN_16, GPIO_PIN_17, GPIO_PIN_32, GPIO_PIN_33,
GPIO_PIN_34, GPIO_PIN_35, GPIO_PIN_36, GPIO_PIN_39
};
bool trigger = false;
uint32_t data_array[NUM_CYCLES] = {0};
TaskHandle_t tcpTaskHandle;
SemaphoreHandle_t tcpMutex;
SemaphoreHandle_t dataReceivedSemaphore;
QueueHandle_t tcpDataQueue;
TimerHandle_t timerHandle;
int counter = 0; // Global variable to count interrupts
bool global_digit = false; // Global variable to store the least significant integer bit
int received_integer = 0;
bool timer_started = false;
// Function prototypes
void establish_connection();
void disconnect_if_connected();
void Impulse_Count();
void close_connection();
void send_data_over_tcp(uint32_t *data_array, size_t num_elements);
void send_data_task(void *pvParameters);
void receive_data_task(void *pvParameters);
static void IRAM_ATTR gpio_interrupt_handler(void *args);
void timerCallback(TimerHandle_t xTimer) {
gpio_intr_disable(GPIO_NUM_15);
vTaskDelay(100 / portTICK_PERIOD_MS);
ESP_LOGI(TAG, "Timer expired. Disconnecting from server...");
vTaskDelay(5000 / portTICK_PERIOD_MS);
close_connection();
ESP_LOGI(TAG, "Interrupt count: %d", counter);
}
void Impulse_Count(){
vTaskDelay(50 / portTICK_PERIOD_MS);
ESP_LOGI(TAG, "Impulse target was reached. Disconnecting from server...");
vTaskDelay(5000 / portTICK_PERIOD_MS);
close_connection();
ESP_LOGI(TAG, "Interrupt count: %d", counter);
}
float ntohf(float netfloat) {
union {
uint32_t n;
float f;
} tmp;
tmp.n = htonl(*(uint32_t *)&netfloat);
global_digit = (int)tmp.f & 1;
int intPart = (int)tmp.f;
float fracPart = tmp.f - intPart;
if (intPart != 0) {
intPart >>= 1;
} else {
fracPart /= 2.0;
}
// Reconstruct the float
tmp.f = intPart + fracPart;
return tmp.f;
}
void establish_connection() {
if (transport == NULL) {
ESP_LOGE(TAG, "Error: TCP transport not initialized");
return;
}
const int connection_timings[MAX_RETRY_COUNT] = {1, 1, 1, 2, 3, 3, 5, 5, 5, 0}; // 0 indicates no further attempts
for (int i = 0; i < MAX_RETRY_COUNT; i++) {
if (!is_connected) {
vTaskDelay(connection_timings[i] * 1000 / portTICK_PERIOD_MS); // Convert seconds to milliseconds
int err = esp_transport_connect(transport, TARGET_ADDR, TARGET_PORT, 30000); // 30 second timeout
if (err == 0) {
ESP_LOGI(TAG, "Connected successfully");
is_connected = true; // Mark the connection as successful
break; // Exit loop on successful connection
} else {
ESP_LOGE(TAG, "Connection failed: errno %d", errno);
}
}
}
}
void disconnect_if_connected() {
if (is_connected) {
int err = esp_transport_close(transport);
if (err == 0) {
ESP_LOGI(TAG, "Closed the connection successfully");
} else {
ESP_LOGE(TAG, "Error closing the connection: errno %d", errno);
}
is_connected = false; // Mark the connection as unsuccessful
}
}
void close_connection() {
disconnect_if_connected();
}
void send_data_over_tcp(uint32_t *data_array, size_t num_elements) {
if (transport == NULL) {
ESP_LOGE(TAG, "Error: TCP transport not initialized");
return;
}
if (!is_connected || !received_from_server) { // Check if not connected or no message received from server
ESP_LOGE(TAG, "Error: Not connected to server or message not received from server");
return;
}
size_t total_bytes_sent = 0;
size_t remaining_elements = num_elements;
uint8_t *data_ptr = (uint8_t *)data_array;
int retry_count = 0;
bool retry_exceeded = false;
while (remaining_elements > 0 && !retry_exceeded) {
int bytes_written = esp_transport_write(transport, (char *)data_ptr, sizeof(uint32_t) * remaining_elements, 0);
if (bytes_written < 0) {
int error_code = errno;
ESP_LOGE(TAG, "Error occurred during sending: esp_transport_write() returned %d, errno %d", bytes_written, error_code);
if (error_code == ECONNRESET) {
ESP_LOGE(TAG, "Connection reset by peer. Attempting to reconnect...");
is_connected = false;
esp_transport_close(transport);
} else {
ESP_LOGE(TAG, "Unhandled error. Closing the transport.");
esp_transport_close(transport);
}
consecutive_poll_timeout_errors++; // Increment the error counter
// Reset retry variables
vTaskDelay(1000 / portTICK_PERIOD_MS); // Add a small delay before retrying
continue; // Retry without decrementing remaining_elements
}
if (bytes_written == 0) {
ESP_LOGW(TAG, "Zero bytes written to TCP. Retrying...");
vTaskDelay(1000 / portTICK_PERIOD_MS); // Add a small delay before retrying
retry_count++;
if (retry_count >= MAX_RETRY_COUNT) {
ESP_LOGE(TAG, "Maximum retry count exceeded. Disconnecting...");
disconnect_if_connected();
retry_exceeded = true; // Set the flag to true
}
continue; // Retry without decrementing remaining_elements
}
remaining_elements -= bytes_written / sizeof(uint32_t);
data_ptr += bytes_written;
total_bytes_sent += bytes_written;
consecutive_poll_timeout_errors = 0; // Reset the error counter on successful write
ESP_LOGI(TAG, "Sent %d bytes over TCP", bytes_written); // Log only on successful send
if(global_digit && (received_integer == counter)){
//gpio_intr_disable(GPIO_NUM_15);
Impulse_Count();
}
counter++;
//if(((received_integer > counter) && global_digit) || timer_started)
//{
//counter++;
//}
}
}
void receive_data_task(void *pvParameters) {
char recv_buffer[256];
int recv_len;
//bool timer_started = false;
while (1) {
if (is_connected) {
recv_len = esp_transport_read(transport, recv_buffer, sizeof(recv_buffer), 0);
if (recv_len > 0) {
float received_float;
memcpy(&received_float, recv_buffer, sizeof(float));
received_float = ntohf(received_float); // Convert float from network byte order to host byte order
// Adjust integer part
//int intPart = (int)received_float;
//intPart >>= 1; // Reduce by one bit
//received_float = intPart + (received_float - intPart); // Reconstruct the float
received_from_server = true;
//counter = 0;
if(!global_digit){
ESP_LOGI(TAG, "Received measurement time from server: %.3f s", received_float);
uint32_t seconds = (uint32_t)received_float; // Integer part represents seconds
float fraction = received_float - seconds; // Fractional part represents fractions of a second
uint32_t microseconds = fraction * 1000000; // Convert fraction to microseconds
if (!timer_started) {
if (timerHandle != NULL) {
xTimerDelete(timerHandle, 0);
}
timerHandle = xTimerCreate("Timer", pdMS_TO_TICKS(seconds * 1000 + microseconds / 1000), pdFALSE, NULL, timerCallback);
if (timerHandle != NULL) {
xTimerStart(timerHandle, 0);
timer_started = true;
}
}
}
else {
received_integer = (int)received_float; // Directly cast to integer
ESP_LOGI(TAG, "Received impulse target from server: %d ", received_integer);
}
xSemaphoreGive(dataReceivedSemaphore);
}
}
vTaskDelay(100 / portTICK_PERIOD_MS); // Delay before retrying to receive data
}
}
void send_data_task(void *pvParameters) {
uint32_t received_data[NUM_CYCLES];
int retry_delay = 1000; // Initial retry delay in milliseconds
while (1) {
if (xSemaphoreTake(tcpMutex, portMAX_DELAY)) {
if (xQueueReceive(tcpDataQueue, received_data, portMAX_DELAY)) {
int retries = 0;
int retry_delay = 0; // Initial retry delay
while (retries < 5) {
send_data_over_tcp(received_data, NUM_CYCLES);
if (is_connected) {
//ESP_LOGI(TAG, "Sent %d bytes over TCP", sizeof(uint32_t) * NUM_CYCLES);
retry_delay = 0; // Reset retry delay on successful send
break; // Data sent successfully
} else {
// Connection failed, wait and retry with the specified delays
vTaskDelay(retry_delay / portTICK_PERIOD_MS);
retries++;
// Set the next retry delay according to the pattern
if (retries == 1) {
retry_delay = 1000; // 1 second
} else if (retries == 2) {
retry_delay = 2000; // 2 seconds
} else if (retries == 3) {
retry_delay = 3000; // 3 seconds
} else if (retries == 4) {
retry_delay = 5000; // 5 seconds
}
}
}
if (retries == 5) {
ESP_LOGE(TAG, "Failed to send data after 5 retries. Performing a software reset...");
// Log the intentional reset
ESP_LOGW(TAG, "Intentional reset triggered by software (esp_restart())");
esp_restart(); // Perform a software reset
}
} else {
if (is_connected) {
ESP_LOGI(TAG, "Connection lost. Attempting to reconnect...");
esp_transport_close(transport);
is_connected = false;
//last_reconnection_attempt_time = get_time_us(); // Record the time of the last reconnection attempt
}
}
xSemaphoreGive(tcpMutex);
}
}
}
static void IRAM_ATTR gpio_interrupt_handler(void *args) {
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
uint32_t local_data_array[NUM_CYCLES];
for (int i = 0; i < NUM_CYCLES; ++i) {
local_data_array[i] = data_array[i];
}
xQueueSendFromISR(tcpDataQueue, local_data_array, &xHigherPriorityTaskWoken);
//counter++; // Increment the counter each time an interrupt occurs
if (xHigherPriorityTaskWoken == pdTRUE) {
portYIELD_FROM_ISR();
}
}
void app_main() {
vTaskDelay(1000 / portTICK_PERIOD_MS); // Initial delay
esp_log_level_set("transport", ESP_LOG_INFO);
esp_log_level_set("transport_base", ESP_LOG_INFO);
esp_log_level_set("transport_proxy", ESP_LOG_INFO);
ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
ESP_ERROR_CHECK(example_connect());
transport = esp_transport_tcp_init();
establish_connection();
#ifdef CONFIG_EXAMPLE_ENABLE_PROXY
esp_transport_handle_t parent = transport;
esp_transport_socks_proxy_config_t proxy_config = {.port = PROXY_PORT, .address = PROXY_ADDR, .version = SOCKS4};
transport = esp_transport_socks_proxy_init(parent, &proxy_config);
#endif
dataReceivedSemaphore = xSemaphoreCreateBinary();
xTaskCreatePinnedToCore(receive_data_task, "TCP_Task", 8192, NULL, 3, &tcpTaskHandle, APP_CPU_NUM); // Increased priority
if (xSemaphoreTake(dataReceivedSemaphore, portMAX_DELAY) == pdTRUE) {
tcpDataQueue = xQueueCreate(1, sizeof(data_array));
xTaskCreatePinnedToCore(send_data_task, "TCP_Task", 8192, NULL, 4, &tcpTaskHandle, APP_CPU_NUM); // Increased priority
tcpMutex = xSemaphoreCreateMutex();
gpio_config_t io_conf_output = {
.pin_bit_mask = (1ULL << GPIO_CLOCK),
.mode = GPIO_MODE_OUTPUT,
};
gpio_config(&io_conf_output);
gpio_config_t io_conf;
for (int i = 0; i < NUM_GPIO_PINS; ++i) {
io_conf = (gpio_config_t){
.pin_bit_mask = (1ULL << GPIO_PINS[i]),
.mode = GPIO_MODE_INPUT,
.intr_type = GPIO_INTR_DISABLE,
.pull_up_en = GPIO_PULLUP_DISABLE,
.pull_down_en = GPIO_PULLDOWN_ENABLE,
};
gpio_config(&io_conf);
}
esp_rom_gpio_pad_select_gpio(INTERRUPT_INPUT);
gpio_set_direction(INTERRUPT_INPUT, GPIO_MODE_INPUT);
gpio_pulldown_en(INTERRUPT_INPUT);
gpio_pullup_dis(INTERRUPT_INPUT);
gpio_set_intr_type(INTERRUPT_INPUT, GPIO_INTR_NEGEDGE);
gpio_install_isr_service(0);
gpio_isr_handler_add(INTERRUPT_INPUT, gpio_interrupt_handler, (void *)INTERRUPT_INPUT);
if(global_digit){
counter = 0;
}
}
uint8_t k = 0;
while (1) {
REG_WRITE(GPIO_OUT_W1TS_REG, (1 << GPIO_CLOCK));
data_array[k] = REG_READ(GPIO_IN_REG);
data_array[k + 1] = REG_READ(GPIO_IN1_REG);
k = (k + 2) % NUM_CYCLES;
REG_WRITE(GPIO_OUT_W1TC_REG, (1 << GPIO_CLOCK));
}
close_connection();
#ifdef CONFIG_EXAMPLE_ENABLE_PROXY
esp_transport_destroy(parent);
#endif
}
Python server
import socket
import struct
import numpy as np
import time
import os
import threading
TARGET_IP = "169.254.157.27"
TARGET_PORT = 5190
BUFFER_SIZE = 256
NUM_TO_PRINT = 32
BUFFER_LIMIT = 100 * 1024 * 1024 # 100 MB
accumulated_buffer = []
timestamps = []
cps_values = []
active_connection = False
total_logs_received = 0
client_connection = None
def process_data_chunk(data):
result_array = (
(((data[0] >> 12) & 1) << 0) |
(((data[0] >> 13) & 1) << 1) |
(((data[0] >> 2) & 1) << 2) |
(((data[0] >> 4) & 1) << 3) |
((((data[0] >> 16) & 1) ^ 1) << 4) |
(((data[0] >> 17) & 1) << 5) |
(((data[1] >> 0) & 1) << 6) |
(((data[1] >> 1) & 1) << 7) |
(((data[1] >> 2) & 1) << 8) |
(((data[1] >> 3) & 1) << 9) |
(((data[1] >> 4) & 1) << 10) |
(((data[1] >> 7) & 1) << 11)
)
return result_array
def process_data(data, timestamp):
result_array = np.array([process_data_chunk(struct.unpack("<II", data[i:i+8])) for i in range(0, len(data), 8)])
max_index = np.argmax(result_array)
lower_indices = np.arange(max_index, max_index + NUM_TO_PRINT // 2) % len(result_array)
higher_indices = np.arange(max_index + NUM_TO_PRINT // 2, max_index + NUM_TO_PRINT) % len(result_array)
lower_elements = result_array[lower_indices]
higher_elements = result_array[higher_indices]
rearranged_buffer = np.concatenate([higher_elements, lower_elements])
max_index_rearranged = np.argmax(rearranged_buffer)
rearranged_buffer = np.roll(rearranged_buffer, NUM_TO_PRINT // 2 - max_index_rearranged)
timestamps.append(timestamp)
return rearranged_buffer
def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values):
global total_logs_received
try:
# Append timestamp to the filename
timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
filename = os.path.splitext(file_path)[0] + " " + timestamp_str + os.path.splitext(file_path)[1]
with open(filename, "a+") as file:
for i in range(0, len(data), NUM_TO_PRINT):
timestamp = timestamps.pop(0)
cps = cps_values.pop(0)
formatted_timestamp = time.strftime('%Y-%m-%d %H:%M:%S.', time.localtime(timestamp)) + f"{timestamp:.6f}"[11:]
formatted_cps = f"{max(0, int(cps) - 1):d}" if cps is not None else "N/A"
file.write(f"{formatted_timestamp} - {formatted_cps} CPS - ")
line_data = data[i:i + NUM_TO_PRINT]
formatted_line = " ".join(f"{num:4d}" for num in line_data)
file.write(formatted_line)
file.write("\n")
total_logs_received += 1 # Increment the total_logs_received
except Exception as write_error:
print(f"Error writing to file: {write_error}")
def count_formatted_logs_in_last_second(timestamps, current_timestamp):
count = 0
for ts in timestamps:
if current_timestamp - ts < 1 and current_timestamp >= ts:
count += 1
return count
def calculate_cps(timestamps):
cps_values = []
for i in range(len(timestamps)):
cps = count_formatted_logs_in_last_second(timestamps[:i+1], timestamps[i])
cps_values.append(cps)
return cps_values
def write_accumulated_data_to_file():
global accumulated_buffer
global timestamps
global cps_values
if accumulated_buffer and timestamps:
try:
cps_values = calculate_cps(timestamps)
print_decimal_chunks_to_file_and_terminal(accumulated_buffer, "Rezultatai.txt", timestamps, cps_values)
accumulated_buffer = []
timestamps = []
cps_values = []
print("Baigta")
except Exception as write_error:
print(f"Error writing to file: {write_error}")
else:
print("No data to write.")
def handle_client(connection, address):
global active_connection
global accumulated_buffer
global timestamps
global cps_values
try:
buffer = bytearray(BUFFER_SIZE)
accumulated_data_size = 0
while active_connection:
try:
total_received = 0
while total_received < BUFFER_SIZE:
additional_data = connection.recv_into(buffer[total_received:])
if additional_data == 0:
print(f"Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
active_connection = False
break
total_received += additional_data
if not active_connection:
break
timestamp = time.time()
data = process_data(buffer, timestamp)
#print(f"Received 256 bytes from {address}")
accumulated_buffer.extend(data)
accumulated_data_size += len(data)
if accumulated_data_size >= BUFFER_LIMIT:
print("Buffer size limit reached. Closing connection and writing data to file.")
write_accumulated_data_to_file()
active_connection = False
break
cps = count_formatted_logs_in_last_second(timestamps, timestamp)
cps_values.append(cps)
except Exception as receive_error:
print(f"Error during data reception: {receive_error}")
break
# Handle unexpected disconnection
if active_connection:
print("Unexpected disconnection detected. Closing connection and writing data to file.")
write_accumulated_data_to_file()
active_connection = False
finally:
connection.close()
print("Connection closed.")
def receive_data():
global active_connection
global client_connection
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
server_socket.bind((TARGET_IP, TARGET_PORT))
server_socket.listen()
print(f"Server listening on {TARGET_IP}:{TARGET_PORT}")
while True:
if not active_connection:
client_connection, client_address = server_socket.accept()
print(f"Connected by {client_address}")
active_connection = True
# Request user input to determine the most significant bit
while True:
input_bit = input("Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: ")
if input_bit in ['0', '1']:
break
else:
print("Invalid choice. Please enter 0 or 1.")
user_input = float(input("Enter the parameter: "))
# Calculate final_float based on input bit
integer_part = int(user_input)
fractional_part = user_input - integer_part
if input_bit == '0':
final_float = 2 * integer_part + fractional_part
else:
final_float = 2 * integer_part + 1 + fractional_part
print("Final float number:", final_float)
# Send user input to the client immediately after receiving it
user_input_packed = struct.pack("!f", final_float)
client_connection.sendall(user_input_packed)
print("Message sent")
handle_client(client_connection, client_address)
except Exception as bind_error:
print(f"Error during socket bind: {bind_error}")
finally:
write_accumulated_data_to_file()
server_socket.close()
if __name__ == "__main__":
try:
receive_data()
except KeyboardInterrupt:
print("Server shutting down gracefully.")