I am trying to bootstrap an akka cluster for my akka typed actorsystem, but the seed node fails to join the cluster for some reason. I get this error
Cluster Node [akka://[email protected]:25520] - Joining of seed-nodes [akka://[email protected]:25520] was unsuccessful after configured shutdown-after-unsuccessful-join-seed-nodes [20000 milliseconds]. Running CoordinatedShutdown.
I am trying to use akka's actor model in my OCR pipeline. I have three actors and a guardian actor all defined in the actor system. Below is my actor system.
package io.tajji.kycpipeline.actor
import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.Behaviors
import akka.cluster.typed.Cluster
import io.tajji.apis.kyc.events.*
import io.tajji.kycpipeline.model.Command.*
import io.tajji.kycpipeline.model.Message
import io.tajji.kycpipeline.model.Message.*
import io.tajji.kycpipeline.service.KYCValidationService
import io.tajji.kycpipeline.service.TextExtractionService
import io.tajji.kycpipeline.service.TextParsingService
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.GenericEventMessage
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class KYCSystem(
private val eventBus: EventBus,
private val textExtractor: TextExtractionService,
private val detailsParser: TextParsingService,
private val kycValidator: KYCValidationService
) {
private val system: ActorSystem<Message> = ActorSystem.create(
Behaviors.setup<Message> { context ->
val validator =
context.spawn(KYCValidator.create(kycValidator),
"DocumentValidatorActor")
val documentExtractorActor =
context.spawn(DocumentExtractor.create(textExtractor),
"TextExtractorActor")
val detailsParserActor =
context.spawn(DocumentParser.create(detailsParser),
"DocumentParserActor")
Behaviors.receiveMessage { message ->
when(message) {
is LandlordKYC -> {
val validateLandlordDocuments = ValidateLandlordDocuments(
message.event.accountId,
message.event,
context.self
)
validator.tell(validateLandlordDocuments)
}
is ResidentKYC -> {
val validateResidentDocuments = ValidateResidentDocuments(
message.event.accountId,
message.event,
context.self
)
validator.tell(validateResidentDocuments)
}
is ValidLandlordDocuments -> {
val extractLandlordDetails = ExtractLandlordDetails(
message.accountId,
message.idFront,
message.idBack,
message.pinCertificateData,
context.self
)
documentExtractorActor.tell(extractLandlordDetails)
}
is ValidResidentDocuments -> {
val extractResidentDetails = ExtractResidentDetails(
message.accountId,
message.idFront,
message.idBack,
context.self
)
documentExtractorActor.tell(extractResidentDetails)
}
is ExtractedLandlordDetails -> {
val parseLandlordDetails = ParseLandlordDetails(
message.accountId,
message,
context.self
)
detailsParserActor.tell(parseLandlordDetails)
}
is ExtractedResidentDetails -> {
val parseResidentDetails = ParseResidentDetails(
message.accountId,
message,
context.self
)
detailsParserActor.tell(parseResidentDetails)
}
is NationalIdInvalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.reason
)))
}
is PinCertificateInvalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.reason
)))
}
is ParsedResidentDetails -> {
message.nationalIDData.subscribe { data ->
eventBus.publish(GenericEventMessage
.asEventMessage<ResidentKYCPassed>(ResidentKYCPassed(
message.accountId,
data
)))
}
}
is ParsedLandlordDetails -> {
val accountId = message.accountId
Mono.zip(
message.nationalIDData,
message.taxData
).subscribe { tuple ->
val nationalId = tuple.t1
val taxData = tuple.t2
eventBus.publish(GenericEventMessage
.asEventMessage<LandlordKYCPassed>(LandlordKYCPassed(
accountId,
nationalId,
taxData
)))
}
}
is Invalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.failureReason
)))
}
}
Behaviors.same()
}
}, "KYCSystem"
)
private val kycCluster = Cluster.get(system)
fun processLandlordKYC(event: LandlordKYCRequested) {
system.tell(LandlordKYC(event))
}
fun processResidentKYC(event: ResidentKYCRequested) {
system.tell(ResidentKYC(event))
}
}
Below is my cluster config file
akka {
cluster {
seed-nodes = ["akka://[email protected]:25520"]
shutdown-after-unsuccessful-join-seed-nodes = 20s
seed-node-timeout = 15s
log-info-verbose = off
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
actor {
provider = cluster
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 25520
}
artery {
}
}
coordinated-shutdown {
exit-jvm = on
}
}
As per the error log given,
In here this is the ActorSystem you started akka://[email protected]:25520
So you have to configure the same in Seed-nodes configurations.And host and port has to be match.