Spring Integration Kotlin DSL aggregate configuration

39 Views Asked by At

I'm trying to aggregate messages in a list in order to use AmqpOutboundEndpoint.multiSend option. I followed this solution but I use Kotlin DSL instead of XML. Here is the code sample:

@Configuration
@EnableIntegration
class SampleConfiguration {

  @Bean
  fun sampleFlow(amqpTemplate: AmqpTemplate): StandardIntegrationFlow {
    return IntegrationFlow
      .from("inputChannel")
      .aggregate(Consumer<AggregatorSpec> {
        it.releaseExpression("size() == 100")
          .groupTimeout(1000)
          .sendPartialResultOnExpiry(true)
          .correlationExpression("T(Thread).currentThread().id")
          .poller { p: PollerFactory -> p.fixedRate(1000).maxMessagesPerPoll(100) }
      })
      .handle(Amqp.outboundAdapter(amqpTemplate).exchangeName("sampleExchange").multiSend(true))
      .get()
  }
}

I'm getting a compilation error:

Overload resolution ambiguity: 
public open fun aggregate(aggregator: Consumer<AggregatorSpec!>?): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder
public open fun aggregate(aggregatorProcessor: Any): IntegrationFlowBuilder defined in org.springframework.integration.dsl.IntegrationFlowBuilder

I couldn't find any way to overcome this problem. How can I configure an Aggregator using Kotlin DSL? I couldn't find any examples of a working code on the Internet.

Spring Integration version: 6.2.1 Kotlin version: 1.9.22

1

There are 1 best solutions below

0
Artem Bilan On BEST ANSWER

What you have so far is not a Kotlin DSL, but rather a Java API used in Kotlin code. There is indeed could be a problem with compatibility between those two languages, especially when we deal with lambdas.

And that's why we developed a dedicated Kotlin DSL for Spring Integration a while ago: https://docs.spring.io/spring-integration/reference/kotlin-dsl.html

So, something like this must work for you:

    @Bean
    fun sampleFlow(amqpTemplate: AmqpTemplate) =
            integrationFlow("inputChannel") {
                aggregate {
                    releaseExpression("size() == 100")
                    groupTimeout(1000)
                    sendPartialResultOnExpiry(true)
                    correlationExpression("T(Thread).currentThread().id")
                    poller { it.fixedRate(1000).maxMessagesPerPoll(100) }
                }
                handle(Amqp.outboundAdapter(amqpTemplate)
                        .exchangeName("sampleExchange")
                        .multiSend(true))
            }