Context
We have Flink jobs running in a Flink cluster using version 1.15.2. Those jobs are composed of:
- One or several KafkaSource, from which we create a changelog stream with a primary key.
- An SQL query to combine and transform those streams.
We are now trying to upgrade to Flink 1.18. We already managed to do it in the past (from 1.13 to 1.15), using savepoints to save the state and resume the job in the new version.
Description of the problem
When trying to recreate jobs from their savepoint in 1.18, some of them fail with:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint s3p://flink/savepoints/savepoint-301fcc-88035400813b. Cannot map checkpoint/savepoint state for operator a815f069e4ea0e95abe7af3d3eeff8f0 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
After investigation, it seems to happen only when we select fields in the query (so SELECT * works but SELECT a, b FROM ... does not work). We compared the plan for a basic query of the type SELECT field1, field2 FROM source in 1.15.2 and 1.18.0 and we got this.
In 1.15.2:
In 1.18.0:
The only difference is that the Calc moved from after the ChangelogNormalize to before the ChangelogNormalize. We believe that it comes from this change.
Questions
Is this a bug that we should report?
If not, s there anything we can do to migrate our jobs while and keep the state? For example, an option to keep the previous behaviour? A way to adapt our savepoints before restore the jobs in 1.18?


This is not a bug, but it's just not supported directly in open source Flink
This is also documented at https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/upgrading/#application-state-compatibility
There has been work done on FLIP-190 to support version upgrades, with more information available at https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489