Background: I am working with an Akka.NET cluster using Akka.Streams, implementing a source-sink pattern. My setup involves two actors, SourceActor and SinkActor, where the SourceActor sends messages to the SinkActor over a stream.
Problem: Messages sent from the SourceActor are not being received by the remote SinkActor. The local stream reference in the SourceActor seems to be sending messages, but these messages are not reaching the remote sink.
Code Snippets:
Source Actor:
// Inside SourceActor
using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using Akka.Streams.Dsl;
using Newtonsoft.Json;
using WebAppShared.Messages;
namespace WebAppSource;
public class SourceActor : ReceiveActor {
protected ILogger _logger;
protected IActorRegistry _registry;
private Source<Hi, IActorRef> _streamSource;
private IActorRef _streamRef;
public SourceActor(ILogger<SourceActor> logger, IActorRegistry registry) {
_logger = logger;
_registry = registry;
Become(Ready);
_streamSource = Source.ActorRef<Hi>(bufferSize: 100, OverflowStrategy.DropNew);
// Define the primary sink for processing each event
var sink = Sink.ForEach<Hi>(ProcessEvent);
// I've changed this a few times.
_streamRef =
_streamSource
.To(Sink.ForEach<Hi>(ProcessEvent))
.Run(Context.Materializer());
}
private void Ready() {
_logger.LogDebug("Source is Ready");
Receive<Hi>((msg) => {
_logger.LogDebug("Sink told Source Hi.");
// This Doesn't make it to Remote Ref, but does pop up in "ProcessEvent(Hi evt) locally
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(500), _streamRef, msg, Self);
var sink = StreamRefs.SourceRef<Hi>();
});
ReceiveAsync<RequestStreamRef>(HandleStreamRequest);
}
private void ProcessEvent(Hi evt) {
try {
_logger.LogDebug($"Received Data Event {JsonConvert.SerializeObject(evt)}");
}
catch (Exception ex) {
_logger.LogError(ex.Message);
}
}
private async Task HandleStreamRequest(RequestStreamRef message) {
await _streamSource
.RunWith(StreamRefs.SourceRef<Hi>(), Context.System.Materializer())
.PipeTo(Sender, success: sourceRef => new StreamRefOffered(sourceRef));
}
protected override void Unhandled(object message) {
var theType = message.GetType().FullName;
_logger.LogCritical($"Unhandled at Source: {theType}");
base.Unhandled(message);
}
}
using Akka.Actor;
using Akka.Hosting;
using Akka.Streams;
using WebAppShared.Interfaces;
using WebAppShared.Messages;
namespace WebAppSink;
public class SinkActor : ReceiveActor {
protected ILogger _logger;
protected IActorRegistry _registry;
private bool _subscribed = false;
public SinkActor(ILogger<SinkActor> logger, IActorRegistry registry) {
_logger = logger;
_registry = registry;
Become(Ready);
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), Self, new Hi(), Self);
}
private async Task Subscribe() {
var actor = _registry.Get<IAmTheSource>();
actor.Tell(new RequestStreamRef(), Self);
}
private void Ready() {
_logger.LogDebug("Sink is Ready");
ReceiveAsync<LogsOffer>(async (m) => {
await m.SourceRef.Source.RunForeach(Console.WriteLine, Context.System.Materializer());
});
Receive<Hi>(async (msg) => {
var actor = _registry.Get<IAmTheSource>();
if (!_subscribed) {
await Subscribe();
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(10), Self, msg, Self);
return;
}
_logger.LogDebug("Source told Sink Hi.");
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(1), actor, msg, Self);
});
Receive<StreamRefOffered>(HandleSourceRefOffered);
Receive<string>((message) => {
_logger.LogWarning($"What is this? {message}");
});
}
private void HandleSourceRefOffered(StreamRefOffered offer) {
offer
.SourceRef
.Source
.RunForeach(ProcessDataEvent, Context.System.Materializer());
_subscribed = true;
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(5), Self, new Hi(), Self);
}
private void ProcessDataEvent(Hi dataEvent) {
// THIS NEVER FIRES
_logger.LogInformation($"Sink Received Hi over the Stream.");
}
protected override void Unhandled(object message) {
// NOTHING RECEIVED HERE
var theType = message.GetType().FullName;
_logger.LogCritical($"Unhandled at Sink: {theType}");
base.Unhandled(message);
}
}
These are in two different CSProj files, using a shared class library.
public class StreamRefOffered {
public StreamRefOffered() {
}
public StreamRefOffered(ISourceRef<Hi> sourceRef) {
SourceRef = sourceRef;
}
public ISourceRef<Hi> SourceRef { get; private set; }
}
public class RequestStreamRef {
}
public class Hi {
public string Message { get; set; }
}
Log Files Produced: Source:
[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5551] -> akka.tcp://drew@localhost:5550
[10:29:11 DBG] Drained buffer with maxWriteCount: 50, fullBackoffCount: 1,smallBackoffCount: 0, noBackoffCount: 0,adaptiveBackoff: 1000
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5551] - Welcome from [akka.tcp://drew@localhost:5550]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Watching: [akka://drew/user/sinkProxy -> akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5551] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5550, 1144886708)]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 INF] Singleton manager started singleton actor [akka://drew/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:11 DBG] Source is Ready
[10:29:12 INF] Singleton identified at [akka://drew/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Created SinkRef, pointing to remote Sink receiver: null, local worker: [akka://drew/user/StreamSupervisor-0/$$a#1501495602]
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5550/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] Received cumulative demand [32], consumable demand: [32]
[10:29:18 DBG] Sink told Source Hi.
[10:29:19 DBG] Received Data Event {"Message":null}
[10:29:22 DBG] Sink told Source Hi.
[10:29:23 DBG] Received Data Event {"Message":null}
Sink:
[10:29:10 DBG] Start timer [Akka.Cluster.SBR.SplitBrainResolverBase+Tick] with generation [1]
[10:29:10 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5550] is JOINING itself (with roles [sink], version [1.0.0]) and forming a new cluster
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - is the new leader among reachable nodes (more leaders may exist)
[10:29:10 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5550] to [Up]
[10:29:10 DBG] Creating singleton identification timer...
[10:29:10 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5550, Uid=1144886708 status = Up, role=[sink], upNumber=1, version=1.0.0)]
[10:29:10 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:10 INF] This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[10:29:10 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:10 INF] Singleton manager started singleton actor [akka://drew/user/sink/singleton]
[10:29:10 INF] ClusterSingletonManager state change [Start -> Oldest] Akka.Cluster.Tools.Singleton.Uninitialized
[10:29:10 DBG] Sink is Ready
[10:29:11 DBG] Associated [akka.tcp://drew@localhost:5550] <- akka.tcp://drew@localhost:5551
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Received InitJoin message from [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]] to [akka.tcp://drew@localhost:5550]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Sending InitJoinAck message from node [akka.tcp://drew@localhost:5550] to [[akka.tcp://drew@localhost:5551/system/cluster/core/daemon/joinSeedNodeProcess-1#1924990792]]
[10:29:11 INF] Cluster Node [1.0.0] - Node [akka.tcp://drew@localhost:5551] is JOINING, roles [source], version [1.0.0]
[10:29:11 DBG] SBR add Joining/WeaklyUp [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Joining, role=[source], upNumber=2147483647, version=1.0.0)]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:11 INF] Cluster Node [akka.tcp://drew@localhost:5550] - Leader is moving node [akka.tcp://drew@localhost:5551] to [Up]
[10:29:11 DBG] SBR add Up [Member(address = akka.tcp://drew@localhost:5551, Uid=1545331179 status = Up, role=[source], upNumber=2, version=1.0.0)]
[10:29:11 DBG] SBR reset stable deadline when members/unreachable changed
[10:29:11 DBG] Creating singleton identification timer...
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:11 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5550/user/sink/singleton]
[10:29:11 INF] Singleton identified at [akka://drew/user/sink/singleton]
[10:29:11 DBG] Sending buffered messages to current singleton instance
[10:29:11 DBG] Singleton not available, buffering message type [WebAppShared.Messages.RequestStreamRef]
[10:29:11 DBG] Cluster Node [akka.tcp://drew@localhost:5550] - Receiving gossip from [UniqueAddress: (akka.tcp://drew@localhost:5551, 1545331179)]
[10:29:12 DBG] Trying to identify singleton at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 INF] Singleton identified at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] Sending buffered messages to current singleton instance
[10:29:12 DBG] Watching: [akka://drew/user/sourceProxy -> akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:12 DBG] [SourceRef-0] Allocated receiver: [akka://drew/user/StreamSupervisor-0/$$a#903597150]
[10:29:12 DBG] Received first message from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602], assuming it to be the remote partner for this stage
[10:29:12 DBG] Watching: [akka://drew/user/StreamSupervisor-0/$$a -> akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a]
[10:29:12 DBG] [SourceRef-0] Demanding until [32] (+32)
[10:29:12 DBG] [SourceRef-0] Received handshake Akka.Streams.Implementation.StreamRef.OnSubscribeHandshake from [akka.tcp://drew@localhost:5551/user/StreamSupervisor-0/$$a#1501495602]
[10:29:13 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:14 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:15 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:16 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:17 DBG] Source told Sink Hi.
[10:29:17 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:18 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:18 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:19 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:20 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:21 DBG] Source told Sink Hi.
[10:29:21 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:22 DBG] Forwarding message of type [WebAppShared.Messages.Hi] to current singleton instance at [akka.tcp://drew@localhost:5551/user/source/singleton]
[10:29:22 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
[10:29:23 DBG] [SourceRef-0] Scheduled re-delivery of demand until [32]
// THIS CONTINUES FOREVER
::UPDATE::
Using a Broadcast Hub, I got what I was looking for.
using Akka;
using Akka.Actor;
using Akka.Streams;
using Akka.Streams.Dsl;
using WebAppShared.Messages;
namespace WebAppSource {
public class SourceActor : ReceiveActor {
private readonly ILogger _logger;
private IActorRef _streamInlet;
private ISourceRef<Hi> _sourceRef;
private IRunnableGraph<Source<Hi, NotUsed>> _runnableGraph;
public SourceActor(ILogger<SourceActor> logger) {
_logger = logger;
var materializer = Context.Materializer();
// Create a broadcast hub
var (inlet, source) = Source.ActorRef<Hi>(bufferSize: 100, OverflowStrategy.DropNew)
.PreMaterialize(materializer);
_streamInlet = inlet; // Local IActorRef to send messages to the stream
// Use the source to create a broadcast hub
_runnableGraph = source.ToMaterialized(BroadcastHub.Sink<Hi>(), Keep.Right);
Become(Ready);
}
private void Ready() {
_logger.LogDebug($"Source is Ready at {Self.Path.ToStringWithAddress()}");
Receive<Hi>(msg => {
_logger.LogDebug("Received Hi message.");
_streamInlet.Tell(msg); // Send messages to the stream
});
Receive<RequestStreamRef>(_ => {
_logger.LogDebug("Received request for StreamRef.");
CreateSourceRef();
});
}
private void CreateSourceRef() {
// Materialize the broadcast hub to create a source
var source = _runnableGraph.Run(Context.Materializer());
// Convert the source into a sourceRef
source.RunWith(StreamRefs.SourceRef<Hi>(), Context.Materializer())
.PipeTo(Sender, success: sourceRef => new StreamRefOffered(sourceRef));
}
}
}