How can I send byte array message to kafka broker in c++ on windows?

88 Views Asked by At

I want to protobuf message to kafka broker c++ in windows. .proto file is like this :

syntax = "proto3";
message MyMessage {
    bytes data = 1;

}

I took cpp out of proto file. Actually I want to set to data field in .proto file from my cpp application. After that I want to send proto message to kafka broker. But message in Kafka broker ,is like "%/���������������". Why my message is printed like this in kafka broker? I'm running on windows nad I am using visual studio 2022. protobuf is compiled using vcpkg. librdkafka also compile using vcpkg. these libraries are linked as dynamic library to my cpp application. A piece of code is below:

#include <rdkafkacpp.h>
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include "example.pb.h"

int main()
{
    std::string kafka_broker = "kafka-srv:9092";
    std::string kafka_topic = "EventLog";        // Replace with your Kafka topic name

    std::string errstr;

    // Producer configuration
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    conf->set("bootstrap.servers", kafka_broker, errstr);

    // Create a producer instance
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        return 1;
    }

    // Create a topic instance
    RdKafka::Topic* topic = RdKafka::Topic::create(producer, kafka_topic, tconf, errstr);
    if (!topic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return 1;
    }

    std::string jsonString = "{\"User\":\"4001\",\"Event\":\"Reachable\"}";
    std::vector<uint8_t> byte_array(jsonString.begin(), jsonString.end());

    MyMessage myMessage;
    myMessage.set_data(byte_array.data(), byte_array.size());

    std::string serialized_data = "";
    if (myMessage.SerializeToString(&serialized_data)) {
        std::cout << "serialization is susccessfull" << std::endl;
        RdKafka::ErrorCode resp = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, const_cast<char*>(serialized_data.c_str()), serialized_data.size(), nullptr, nullptr);
        if (resp != RdKafka::ERR_NO_ERROR) {
            std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
            return 0;
        }
        else
            std::cout << " Successfully produced." << std::endl;
    }
    else {
        std::cerr << "Failed to serialize protobuf message." << std::endl;
    }
    producer->flush(5000);
    while (1)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }


    return 0;
}
0

There are 0 best solutions below