I want to protobuf message to kafka broker c++ in windows. .proto file is like this :
syntax = "proto3";
message MyMessage {
bytes data = 1;
}
I took cpp out of proto file. Actually I want to set to data field in .proto file from my cpp application. After that I want to send proto message to kafka broker. But message in Kafka broker ,is like "%/���������������". Why my message is printed like this in kafka broker? I'm running on windows nad I am using visual studio 2022. protobuf is compiled using vcpkg. librdkafka also compile using vcpkg. these libraries are linked as dynamic library to my cpp application. A piece of code is below:
#include <rdkafkacpp.h>
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include "example.pb.h"
int main()
{
std::string kafka_broker = "kafka-srv:9092";
std::string kafka_topic = "EventLog"; // Replace with your Kafka topic name
std::string errstr;
// Producer configuration
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("bootstrap.servers", kafka_broker, errstr);
// Create a producer instance
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}
// Create a topic instance
RdKafka::Topic* topic = RdKafka::Topic::create(producer, kafka_topic, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
return 1;
}
std::string jsonString = "{\"User\":\"4001\",\"Event\":\"Reachable\"}";
std::vector<uint8_t> byte_array(jsonString.begin(), jsonString.end());
MyMessage myMessage;
myMessage.set_data(byte_array.data(), byte_array.size());
std::string serialized_data = "";
if (myMessage.SerializeToString(&serialized_data)) {
std::cout << "serialization is susccessfull" << std::endl;
RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char*>(serialized_data.c_str()), serialized_data.size(), nullptr, nullptr);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
return 0;
}
else
std::cout << " Successfully produced." << std::endl;
}
else {
std::cerr << "Failed to serialize protobuf message." << std::endl;
}
producer->flush(5000);
while (1)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 0;
}