OS: vxWorks
I'm using select() system call to enforce timeout on serial port read functionality.
I am working on an IMU with data rate of 1000 Hz (IMU sends data every 1 ms).
The baud rate is 921600.
Every packet transfer is 36 bytes long. I calculate data transmission of 388-400 us per data transfer.
I calculated if no new bytes are received for 400 us then the packet is said to have been transferred.
The problem is as stated in the title select() always returns 0 and no data being read.
I've to use select() system call as in vxWorks epoll() is only supported for file descriptors of INET type.
Code :
int return_value;
fd_set select_fd;
struct timeval tv;
//Run continuously till reader thread is to be executed
this->data.clear();
while (not this->exitReaderThread.load())
{
try
{
this->readBuffer.fill(0);
FD_ZERO(&select_fd);
FD_SET(this->fd, &select_fd);
tv.tv_sec = 0;
tv.tv_usec = 400;
return_value = select(this->fd+1, &select_fd, nullptr, nullptr, &tv);
std::cerr << "Select Return Value : " << return_value << std::endl;
if(return_value == -1) //Error
{
perror("SerialPort select()");
//TODO Log Error
continue;
}
else if(return_value > 0) //Data Available
{
if(FD_ISSET(this->fd, &select_fd) == true)
{
ssize_t returnValue = ::read(this->fd, this->readBuffer.data(), this->readBuffer.size());
if(returnValue == -1)
{
//TODO Log Error
std::cout << "Error sending data on serial port " << this->portNumber <<". Error : " << errorToString(errno) << std::endl;
continue;
}
this->data.append((char *)this->readBuffer.data(), returnValue);
std::cerr << "Data Receive in progress\n";
}
}
else if(return_value == 0) //No Data Available
{
if(this->data.empty())
{
continue;
}
std::cerr << "Data Receive Complete\n";
this->push_receive_data_in_queue("", this->portNumber, this->data);
this->mCallbackSerialPort(this->portNumber, this->data);
this->data.clear();
}
}
}
Code to open and configure serial port
this->fd = ::open("/usb2ttyS/0", O_RDWR);
if(this->fd == -1)
{
std::cout << "Error open serial port. Error : " << errorToString(errno) << std::endl;
return false;
}
struct termios tty;
returnValue = tcgetattr(this->fd, &tty);
tty.c_iflag &= ~(IGNBRK | BRKINT | PARMRK | ISTRIP
| INLCR | IGNCR | ICRNL | IXON | IXOFF);
tty.c_oflag &= ~OPOST;
tty.c_lflag &= ~(ECHO | ECHONL | ICANON | ISIG | IEXTEN);
tty.c_cflag &= ~(CSIZE | PARENB);
switch(this->dataBits)
{
case 5:
tty.c_cflag |= CS5;
break;
case 6:
tty.c_cflag |= CS6;
break;
case 7:
tty.c_cflag |= CS7;
break;
case 8 :
default:
tty.c_cflag |= CS8;
break;
}
switch(this->stopBits)
{
case 2:
tty.c_cflag |= CSTOPB;
break;
case 1:
default:
tty.c_cflag &= ~CSTOPB;
break;
}
switch(this->parity)
{
case 'o':
tty.c_cflag |= PARENB; //enable parity
break;
case 'e':
tty.c_cflag |= PARENB; //enable parity
tty.c_cflag &= ~PARODD; //disable odd parity, therefore-> even parity
break;
case 'n':
default:
tty.c_cflag &= ~PARENB; // No-Parity
break;
}
returnValue = cfsetispeed(&tty, 921600);
returnValue = cfsetospeed(&tty, 921600);
returnValue = tcsetattr(this->fd, TCSANOW, &tty);
Complete Source Code
common.hpp
#pragma once
#include <cerrno>
#include <string>
#include <clocale>
#include <cstring>
#include <iostream>
#ifdef __linux__
std::string errorToString(error_t errorNumber);
#else //For vxWorks
std::string errorToString(errno_t errorNumber);
#endif
common.cpp
#include "common.hpp"
#ifdef __linux__
std::string errorToString(error_t errorNumber)
#else
std::string errorToString(errno_t errorNumber)
#endif
{
const std::uint16_t errorStringLength {1024};
char errorDataCharArray[errorStringLength];
#ifdef __linux__
std::strncpy(errorDataCharArray,
::strerror_l(errno, ::uselocale((locale_t)0)),
errorStringLength);
#else //For vxWOrks
strerror_s(errorDataCharArray, errorStringLength, errno);
#endif
std::string errorString(errorDataCharArray);
return errorString;
}
communication_interface.hpp
#pragma once
#include <array>
#include <queue>
#include <mutex>
#include <thread>
#include <atomic>
#include <string>
#include <cstddef>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <optional>
#include <functional>
typedef struct receive_data
{
std::string source_ip;
std::uint16_t source_port;
std::string data;
}receive_data_t;
class CommunicationInterface
{
protected:
static const std::uint16_t READ_BUFFER_LENGTH {1024};
static const std::uint16_t WRITE_BUFFER_LENGTH {1024};
std::array<std::uint8_t, READ_BUFFER_LENGTH> readBuffer;
std::array<std::uint8_t, WRITE_BUFFER_LENGTH> writeBuffer;
int fd {-1};
std::uint16_t portNumber {6666};
std::atomic_bool exitReaderThread {false};
std::thread readerThread;
virtual void reader(void) = 0;
public:
using callback_ethernet_t = std::function<void(std::string ip, std::uint16_t portNumber, std::string data)>;
using callback_serial_port_t = std::function<void(std::uint8_t portNumber, std::string data)>;
virtual bool open(void) = 0;
virtual bool open(std::uint16_t portNumber) = 0;
virtual bool close(void) = 0;
virtual bool read(void) = 0;
virtual bool write(const std::string& ipAddress, std::uint16_t portNumber, const std::string& data) = 0;
virtual bool write(const std::string& data) = 0;
bool registerEthernetCallback(callback_ethernet_t callback);
bool registerSerialPortCallback(callback_serial_port_t callback);
//push the received data in the queue
inline bool push_receive_data_in_queue(const std::string& source_ip, std::uint16_t source_port, const std::string& data)
{
try
{
std::lock_guard<std::mutex> lock (this->receive_data_queue_mutex);
receive_data_t receiveData = {
.source_ip = source_ip,
.source_port = source_port,
.data = data
};
this->receive_data_queue.push(receiveData);
return true;
}
catch(const std::exception& e)
{
std::cerr << e.what() << std::endl;
//TODO Log error to logging module
return false;
}
}
//Receive data and pop the first element
inline std::optional<receive_data_t> get_received_data_and_pop(void)
{
try
{
receive_data_t receiveData;
std::lock_guard<std::mutex> lock (this->receive_data_queue_mutex);
if(this->receive_data_queue.empty() == true) //Queue is empty
{
//TODO Log warning
std::cout << "get_received_data_and_pop requested but queue empty\n";
return std::nullopt;
}
receiveData = this->receive_data_queue.front();
this->receive_data_queue.pop();
return receiveData;
}
catch(const std::exception& e)
{
std::cerr << e.what() << std::endl;
//TODO Log error to logging module
return std::nullopt;
}
}
//Receive data but don't pop the first element
inline std::optional<receive_data_t> get_received_data(void)
{
try
{
receive_data_t receiveData;
std::lock_guard<std::mutex> lock (this->receive_data_queue_mutex);
if(this->receive_data_queue.empty() == true) //Queue is empty
{
//TODO Log warning
std::cout << "get_received_data requested but queue empty\n";
return std::nullopt;
}
receiveData = this->receive_data_queue.front();
return receiveData;
}
catch(const std::exception& e)
{
std::cerr << e.what() << std::endl;
//TODO Log error to logging module
return std::nullopt;
}
}
inline bool is_data_available(void)
{
try
{
std::lock_guard<std::mutex> lock (this->receive_data_queue_mutex);
if(this->receive_data_queue.empty() == true) //Queue is empty
{
return false;
}
else
{
return true;
}
}
catch(const std::exception& e)
{
std::cerr << e.what() << std::endl;
//TODO Log error to logging module
return false;
}
}
CommunicationInterface(/* args */) {}
~CommunicationInterface() {}
protected:
callback_ethernet_t mCallbackEthernet;
callback_serial_port_t mCallbackSerialPort;
std::queue<receive_data_t> receive_data_queue;
std::mutex receive_data_queue_mutex;
};
communication_interface.cpp
#include "communication_interface.hpp"
bool CommunicationInterface::registerEthernetCallback(callback_ethernet_t callback)
{
try
{
this->mCallbackEthernet = callback;
return true;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
return false;
}
bool CommunicationInterface::registerSerialPortCallback(callback_serial_port_t callback)
{
try
{
this->mCallbackSerialPort = callback;
return true;
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
return false;
}
serial_port.hpp
#pragma once
#include <thread>
#include <sys/select.h>
#include "posix_timer.hpp"
#include "communication_interface.hpp"
class SerialPort : public CommunicationInterface
{
private:
PosixTimer timer;
std::string portName;
void reader(void) override;
std::uint32_t baudRate {115200};
std::uint8_t dataBits {8};
std::uint8_t stopBits {1};
char parity {'n'};
std::uint64_t timer_interval {200}; //Set the timeout interval in nanoseconds
PosixTimer messageTimeOutTimer;
std::string data;
void timeOutCallback(const std::any& callback_data);
public:
SerialPort ();
~SerialPort ();
bool open(void) override;
bool open(std::uint16_t portNumber) override;
bool close(void) override;
bool read(void) override;
bool write(const std::string& ipAddress, std::uint16_t portNumber, const std::string& data) override;
bool write(const std::string& data) override;
std::uint32_t getBaudRate(void) const;
std::uint32_t setBaudRate(std::uint32_t baudRate);
std::uint8_t getDataBits(void) const;
std::uint8_t setDataBits(std::uint8_t dataBits);
std::uint8_t getStopBits(void) const;
std::uint8_t setStopBits(std::uint8_t stopBits);
char getParity(void) const;
char setParity(char parity);
};
serial_port.cpp
#include "serial_port.hpp"
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include "common.hpp"
#include <termios.h>
SerialPort::SerialPort()
{
this->data.reserve(4096);
this->timer.registerCallback(std::bind(&SerialPort::timeOutCallback, this, std::placeholders::_1), this->timer_interval);
this->timer.create();
}
SerialPort::~SerialPort()
{
this->timer.destroy();
if(this->fd < 0) //if port is not closed i.e. this->fd != -1 then close the port
{
this->close();
}
}
void SerialPort::reader(void)
{
int return_value;
fd_set select_fd;
struct timeval tv;
//Run continuously till reader thread is to be executed
this->data.clear();
while (not this->exitReaderThread.load())
{
try
{
this->readBuffer.fill(0);
FD_ZERO(&select_fd);
FD_SET(this->fd, &select_fd);
tv.tv_sec = 1;
tv.tv_usec = 0;
return_value = select(this->fd+1, &select_fd, nullptr, nullptr, &tv);
std::cerr << "Select Return Value : " << return_value << std::endl;
if(return_value == -1) //Error
{
perror("SerialPort select()");
//TODO Log Error
continue;
}
else if(return_value > 0) //Data Available
{
if(FD_ISSET(this->fd, &select_fd) == true)
{
ssize_t returnValue = ::read(this->fd, this->readBuffer.data(), this->readBuffer.size());
if(returnValue == -1)
{
//TODO Log Error
std::cout << "Error sending data on serial port " << this->portNumber <<". Error : " << errorToString(errno) << std::endl;
continue;
}
this->data.append((char *)this->readBuffer.data(), returnValue);
std::cerr << "Data Receive in progress\n";
}
}
else if(return_value == 0) //No Data Available
{
if(this->data.empty())
{
continue;
}
std::cerr << "Data Receive Complete\n";
this->push_receive_data_in_queue("", this->portNumber, this->data);
this->mCallbackSerialPort(this->portNumber, this->data);
this->data.clear();
}
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
//TODO Log Error
}
}
}
void SerialPort::timeOutCallback(const std::any& callback_data)
{
std::cout << "Callback Interval : " << this->timer_interval << std::endl;
this->push_receive_data_in_queue("", this->portNumber, data);
this->mCallbackSerialPort(this->portNumber, data);
data.clear();
}
bool SerialPort::open(void)
{
try
{
int returnValue;
std::cout << "Opening Port : " << this->portName.c_str() << std::endl;
this->fd = ::open(this->portName.c_str(), O_RDWR);
if(this->fd == -1)
{
std::cout << "Error open serial port. Error : " << errorToString(errno) << std::endl;
return false;
}
struct termios tty;
returnValue = tcgetattr(this->fd, &tty);
tty.c_iflag &= ~(IGNBRK | BRKINT | PARMRK | ISTRIP
| INLCR | IGNCR | ICRNL | IXON | IXOFF);
tty.c_oflag &= ~OPOST;
tty.c_lflag &= ~(ECHO | ECHONL | ICANON | ISIG | IEXTEN);
tty.c_cflag &= ~(CSIZE | PARENB);
switch(this->dataBits)
{
case 5:
tty.c_cflag |= CS5;
break;
case 6:
tty.c_cflag |= CS6;
break;
case 7:
tty.c_cflag |= CS7;
break;
case 8 :
default:
tty.c_cflag |= CS8;
break;
}
switch(this->stopBits)
{
case 2:
tty.c_cflag |= CSTOPB;
break;
case 1:
default:
tty.c_cflag &= ~CSTOPB;
break;
}
switch(this->parity)
{
case 'o':
tty.c_cflag |= PARENB; //enable parity
break;
case 'e':
tty.c_cflag |= PARENB; //enable parity
tty.c_cflag &= ~PARODD; //disable odd parity, therefore-> even parity
break;
case 'n':
default:
tty.c_cflag &= ~PARENB; // No-Parity
break;
}
returnValue = cfsetispeed(&tty, this->baudRate);
returnValue = cfsetospeed(&tty, this->baudRate);
returnValue = tcsetattr(this->fd, TCSANOW, &tty);
return true;
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
bool SerialPort::open(std::uint16_t portNumber)
{
this->portName.clear();
this->portNumber = portNumber;
//TODO Currently USB-Serial Names are used. Need to be changed to PCIe-Serial Names when hardware is received
switch(portNumber)
{
case 1:
this->portName = "/usb2ttyS/0";
break;
case 2:
this->portName = "/usb2ttyS/1";
break;
case 3:
this->portName = "/usb2ttyS/2";
break;
case 4:
this->portName = "/usb2ttyS/3";
break;
default:
this->portNumber = UINT16_MAX;
std::cout << "Invalid port number in SerialPort::open call. Port Number : " << portNumber << std::endl;
return false;
}
return this->open();
}
bool SerialPort::close(void)
{
try
{
int returnValue = ::close(this->fd);
if(returnValue == -1)
{
std::cout << "Error closing serial port " << this->portNumber << ". Error : " << errorToString(errno) << std::endl;
return false;
}
this->fd = -1;
return true;
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
bool SerialPort::read(void)
{
try
{
this->exitReaderThread.store(false); //Set the tread exit event to false
this->readerThread = std::thread(&SerialPort::reader, this);
return true;
}
catch(const std::exception& ex)
{
std::cout << ex.what() << '\n';
return false;
}
}
bool SerialPort::write(const std::string& ipAddress, std::uint16_t portNumber, const std::string& data)
{
std::cout << "Multiple Parameter (with IP Address) write function can't be called on SerialPort class" << std::endl;
return false;
}
std::uint32_t SerialPort::getBaudRate(void) const
{
return this->baudRate;
}
std::uint32_t SerialPort::setBaudRate(std::uint32_t baudRate)
{
switch(baudRate)
{
case 1200:
this->baudRate = 1200; // Used for testing
break;
case 9600:
this->baudRate = 9600; // Used for Meteorological sensor
break;
case 115200:
this->baudRate = 115200; // Used for HCP
break;
case 921600:
this->baudRate = 921600; // Used for IMU
break;
default:
this->baudRate = 115200; //Default to Baud Rate 115200
break;
}
return this->getBaudRate();
}
std::uint8_t SerialPort::getDataBits(void) const
{
return this->dataBits;
}
std::uint8_t SerialPort::setDataBits(std::uint8_t dataBits)
{
switch(dataBits)
{
case 7:// [[fallthrough]];
case 8:
this->dataBits = dataBits;
break;
default:
this->dataBits = 8; //Default to 1 Data Bit
break;
}
return this->getDataBits();
}
std::uint8_t SerialPort::getStopBits(void) const
{
return this->stopBits;
}
std::uint8_t SerialPort::setStopBits(std::uint8_t stopBits)
{
switch(stopBits)
{
case 1:
case 2:
this->stopBits = stopBits;
break;
default:
this->stopBits = 1; //Default to 1 Stop Bit
break;
}
return this->getStopBits();
}
char SerialPort::getParity(void) const
{
return this->parity;
}
char SerialPort::setParity(char parity)
{
switch(parity)
{
case 'n':
case 'o':
case 'e':
this->parity = parity;
break;
default:
this->parity = 'n'; //Default to None parity
break;
}
return this->getParity();
}
bool SerialPort::write(const std::string& data)
{
try
{
ssize_t returnValue = ::write(this->fd, data.c_str(), data.length());
if(returnValue == -1)
{
//Log error
std::cout << "Error sending data on serial port " << this->portNumber <<". Error : " << errorToString(errno) << std::endl;
return false;
}
return true;
}
catch(const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
The value of file descriptor (this->fd) is 4.
I have run the program several times and the value of file descriptor is always 4.