I have a simple pipeline that reads from pubsub within a fixed window, parses messages and groups them by a specific property. However if I map after the groupBy my function doesn't seem to get executed.
Am I missing something?
sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
.withFixedWindow(Duration.standardSeconds(windowSeconds))
.map(parseMessage)
.groupBy(_.ip_address)
.map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
I was able to reproduce the issue with the
DirectRunnerand a simple pipeline that reads from Pub/Sub, uses the first word of the message as the key, applies theGroupByKeyand then logs the entries. Seems like the GBK step waits for all the data to arrive and, as it's an unbounded source, does not emit any result. What worked for me is to define a windowing strategy with triggering such as: