Cannot upgrade Flink SQL job to 1.18 because the Calc and ChangelogNormalize order changed

85 Views Asked by At

Context

We have Flink jobs running in a Flink cluster using version 1.15.2. Those jobs are composed of:

  • One or several KafkaSource, from which we create a changelog stream with a primary key.
  • An SQL query to combine and transform those streams.

We are now trying to upgrade to Flink 1.18. We already managed to do it in the past (from 1.13 to 1.15), using savepoints to save the state and resume the job in the new version.

Description of the problem

When trying to recreate jobs from their savepoint in 1.18, some of them fail with:

java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3p://flink/savepoints/savepoint-301fcc-88035400813b. Cannot map checkpoint/savepoint state for operator a815f069e4ea0e95abe7af3d3eeff8f0 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

After investigation, it seems to happen only when we select fields in the query (so SELECT * works but SELECT a, b FROM ... does not work). We compared the plan for a basic query of the type SELECT field1, field2 FROM source in 1.15.2 and 1.18.0 and we got this.

In 1.15.2:

plan in 1.15.2

In 1.18.0:

plan in 1.18.0

The only difference is that the Calc moved from after the ChangelogNormalize to before the ChangelogNormalize. We believe that it comes from this change.

Questions

Is this a bug that we should report?

If not, s there anything we can do to migrate our jobs while and keep the state? For example, an option to keep the previous behaviour? A way to adapt our savepoints before restore the jobs in 1.18?

2

There are 2 best solutions below

0
Martijn Visser On

This is not a bug, but it's just not supported directly in open source Flink

Due to the declarative nature of Table API & SQL programs, the underlying operator topology and state representation are mostly determined and optimized by the table planner.

Be aware that any change to both the query and the Flink version could lead to state incompatibility. Every new major-minor Flink version (e.g. 1.12 to 1.13) might introduce new optimizer rules or more specialized runtime operators that change the execution plan. However, the community tries to keep patch versions state-compatible (e.g. 1.13.1 to 1.13.2).

This is also documented at https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/upgrading/#application-state-compatibility

There has been work done on FLIP-190 to support version upgrades, with more information available at https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489

2
David Anderson On

You could try to compile the plan while running in Flink 1.15.2, and then load that compiled plan into Flink 1.18.1. For an example of this, see the Upgrading Flink recipe in Confluent's Flink Cookbook.

Disclaimer: I wrote this recipe, and I work for Confluent.