Subsequent join operations in Kafka Streams topology yield unexpected result

14 Views Asked by At

Assume a Kafka Streams topology that performs multiple subsequent KTable-KTable joins and FK joins in order to build aggregate objects.

A simplified version of the partaking source entities might look like this:

class Parent {
  Long id;
  Long childCId; // FK, may be empty
  // ...
}

class ChildA {
  Long id;
  Long parentId; // FK -> Parent
  // ...
}

class ChildB {
  Long id;
  Long parentId; // FK -> Parent
  // ...
}

class ChildC {
  Long id;
  // ...
}

To represent the Parent -> ChildA and Parent -> ChildB relations (1:n, perhaps empty), assume the following intermediate objects used for re-keying:

class GroupedChildA {

  private List<ChildA> children = new ArrayList<>();

  public GroupedChildA add(ChildA child) {
    children.add(child);
    return this;
  }

  public GroupedChildA remove(ChildA child) {
    children.removeIf(c -> Objects.equals(c.id(), child.id()));
    return this;
  }

}

// Same implementation for ChildB

The object used for aggregation might look like this:

class AggregateParent {

  Long id;
  ChildC childC;
  GroupedChildA groupedChildA;
  GroupedChildB groupedChildB;

  // ...

  public AggregateParent(Parent parent, ChildC childC) {
    id = parent.id();
    // ...
    this.childC = childC;
  }

  public AggregateParent withGroupedChildA(GroupedChildA groupedChildA) {
    this.groupedChildA = groupedChildA;
    return this;
  }

  public AggregateParent withGroupedChildB(GroupedChildB groupedChildB) {
    this.groupedChildB = groupedChildB;
    return this;
  }

}

Now in order to build an aggregate object containing all associated entities, a topology might look like this:

KTable<Long, Parent> parent = streamsBuilder.table(...);
KTable<Long, ChildC> childC = streamsBuilder.table(...);

// Re-key for join operation
KTable<Long, GroupedChildA> groupedChildA = streamsBuilder.table(...)
  .groupBy((k, v) -> KeyValue.pair(v.parentId(), v))
  .aggregate(GroupedChildA::new,
    (k, v, a) -> a.add(v),
    (k, v, a) -> a.remove(v));

// Re-key for join operation
KTable<Long, GroupedChildB> groupedChildB = streamsBuilder.table(...)
  .groupBy((k, v) -> KeyValue.pair(v.parentId(), v))
  .aggregate(GroupedChildB::new,
    (k, v, a) -> a.add(v),
    (k, v, a) -> a.remove(v));

parent
  .leftJoin(childC, Parent::childCId, AggregateParent::new) // KTable-KTable FK join
  .leftJoin(groupedChildB, AggregateParent::withGroupedChildB) // KTable-KTable join
  .leftJoin(groupedChildC, AggregateParent::withGroupedChildC) // KTable-KTable join
  .toStream()
  .to(...);

Now when testing this topology while subsequently pushing objects of the different child types into their respective topics, the aggregate object does not reflect the expected results.

For example, when pushing an object of type ChildB, then an object of type ChildC and then again an object of type ChildB, the eventually resulting aggregate does only hold a GroupedChildB object with one object of ChildB instead of the expected two objects. I'm quite new to Kafka and Kafka Streams, so I'm sure I'm missing something here...

How is a complex topology like the one above (multiple subsequent joins, i.e. joining two or more KTables with various join types into an aggregate) supposed to be built correctly?

0

There are 0 best solutions below