NetMQ Unity Pub/Sub

264 Views Asked by At

I'm running into some issues trying to get NetMQ to work. My code compiles fine but I'm not achieving my desired outcome. Testing both Pub and Sub side from one PC.

I have a UI Handler that is linked to four buttons, start/stop pub/sub. Would love if someone could shed some light into the issue.

Desired outcome: To be able to transfer test.wav from "Sending Folder" to "Receiving Folder"

Publisher Logic

using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using System.IO;

public class Publisher
{
    private readonly Thread _publisherThread;
    private readonly Thread _subscriberThread;

    public Publisher()
    {
        _publisherThread = new Thread(PublisherWork);
        _publisherThread.Start();

        _subscriberThread = new Thread(SubscriberWork);
        _subscriberThread.Start();
    }

    public void Start()
    {
        using (var pubSocket = new PublisherSocket())
        {
            pubSocket.Bind("tcp://*:5557");

            string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav";
            byte[] fileBytes = File.ReadAllBytes(filePath);

            pubSocket.SendMoreFrame("File").SendFrame(fileBytes);
        }
    }

    private void PublisherWork()
    {
        using (var pubSocket = new PublisherSocket())
        {
            pubSocket.Bind("tcp://*:5557"); // [C] Port

            string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav"; // [C] Filepath
            byte[] fileBytes = File.ReadAllBytes(filePath);

            while (true)
            {
                pubSocket.SendMoreFrame("File").SendFrame(fileBytes); // [C] Topic
                Thread.Sleep(1000);
            }
        }
    }

    private void SubscriberWork()
    {
        using (var subSocket = new SubscriberSocket())
        {
            subSocket.Connect("tcp://localhost:5557"); // [C] Port
            subSocket.Subscribe("File"); // [C] Topic

            while (true)
            {
                var topic = subSocket.ReceiveFrameString();
                var fileBytes = subSocket.ReceiveFrameBytes();

                SaveWavFile(fileBytes);
            }
        }
    }

    private void SaveWavFile(byte[] fileBytes)
    {
        string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav";
        File.WriteAllBytes(filePath, fileBytes);
    }
}

Subscriber Logic

using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using UnityEngine;
using UnityEngine.UI;
using System.IO;

public class Subscriber : MonoBehaviour
{
    public Button startButton;
    public Button stopButton;

    private Thread _publisherThread;
    private Thread _subscriberThread;

    private bool _isRunning;

    private void Start()
    {
        startButton.onClick.AddListener(StartThreads);
        stopButton.onClick.AddListener(StopThreads);
    }

    private void StartThreads()
    {
        _isRunning = true;

        _publisherThread = new Thread(PublisherWork);
        _publisherThread.Start();

        _subscriberThread = new Thread(SubscriberWork);
        _subscriberThread.Start();
    }

    private void StopThreads()
    {
        _isRunning = false;

        _publisherThread.Join();
        _subscriberThread.Join();
    }

    private void PublisherWork()
    {
        using (var pubSocket = new PublisherSocket())
        {
            pubSocket.Bind("tcp://*:5557"); // [C] Port

            string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav"; // [C] Filepath
            byte[] fileBytes = File.ReadAllBytes(filePath);

            while (_isRunning)
            {
                pubSocket.SendMoreFrame("File").SendFrame(fileBytes); // [C] Topic
                Thread.Sleep(1000);
            }
        }
        NetMQConfig.Cleanup();
    }

    private void SubscriberWork()
    {
        using (var subSocket = new SubscriberSocket())
        {
            subSocket.Connect("tcp://localhost:5557"); // [C] Port
            subSocket.Subscribe("File"); // [C] Topic

            while (_isRunning)
            {
                var topic = subSocket.ReceiveFrameString();
                var fileBytes = subSocket.ReceiveFrameBytes();

                SaveWavFile(fileBytes);
            }
        }
        NetMQConfig.Cleanup();
    }

    private void SaveWavFile(byte[] fileBytes)
    {
        string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav"; // [C] 
        File.WriteAllBytes(filePath, fileBytes);
    }
}

UI Handler

using System.Threading;
using NetMQ;
using NetMQ.Sockets;
using System.IO;
using UnityEngine;
using UnityEngine.UI;

public class UIHandler : MonoBehaviour
{
    public Button startSubscriberButton;
    public Button stopSubscriberButton;
    public Button startPublisherButton;
    public Button stopPublisherButton;
    public Text pubStartStatus;
    public Text pubStopStatus;
    public Text subStartStatus;
    public Text subStopStatus;


    private Publisher _publisher;

    private readonly Thread _publisherThread;
    private readonly Thread _subscriberThread;
    private bool _publisherRunning = false;
    private bool _subscriberRunning = false;

    public UIHandler()
    {
    // Initializing the threads
    _publisherThread = new Thread(PublisherWork);
    _subscriberThread = new Thread(SubscriberWork);
    }


    private void Start()
    {
    _publisher = new Publisher();
    startPublisherButton.onClick.AddListener(StartPublisher);
    stopPublisherButton.onClick.AddListener(StopPublisher);
    startSubscriberButton.onClick.AddListener(StartSubscriber);
    stopSubscriberButton.onClick.AddListener(StopSubscriber);
    }
    public void StartPublisher()
    {
    // Starting the publisher thread
    _publisherThread.Start();
    _publisherRunning = true;
    pubStartStatus.text = "pubStartStatus: Started";
    }

    public void StopPublisher()
    {
    // Stopping the publisher thread
    _publisherRunning = false;
    _publisherThread.Join();
    pubStopStatus.text = "pubStartStatus: Stopped";
    }

    public void StartSubscriber()
    {
    // Starting the subscriber thread
    _subscriberThread.Start();
    _subscriberRunning = true;
    subStartStatus.text = "subStartStatus: Stopped";
    }

    public void StopSubscriber()
    {
    // Stopping the subscriber thread
    _subscriberRunning = false;
    _subscriberThread.Join();
    subStopStatus.text = "subStartStatus: Stopped";
    }

    private void PublisherWork()
    {
    // Creating a publisher socket and binding it to the specified address
    using (var pubSocket = new PublisherSocket())
    {
        pubSocket.Bind("tcp://*:5557");

        // Reading the file bytes from the specified location
        string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Sending Folder/test.wav";
        byte[] fileBytes = File.ReadAllBytes(filePath);

        // Publishing the file bytes continuously
        while (_publisherRunning)
        {
            pubSocket.SendMoreFrame("File").SendFrame(fileBytes);
            Thread.Sleep(1000);
        }
    }
    NetMQConfig.Cleanup();
    }

    private void SubscriberWork()
    {
    // Creating a subscriber socket and connecting it to the specified address
    using (var subSocket = new SubscriberSocket())
    {
        subSocket.Connect("tcp://localhost:5557");
        subSocket.Subscribe("File");

        // Receiving and saving the file bytes continuously
        while (_subscriberRunning)
        {
            var topic = subSocket.ReceiveFrameString();
            var fileBytes = subSocket.ReceiveFrameBytes();

            SaveWavFile(fileBytes);
        }
    }
    NetMQConfig.Cleanup();
    }

    private void SaveWavFile(byte[] fileBytes)
    {
        // Saving the received file bytes to the specified location
        string filePath = "C:/Users/XXXXX/Desktop/0MQ Demo/Receiving Folder/test.wav";
        File.WriteAllBytes(filePath, fileBytes);
    }
}

0

There are 0 best solutions below