I have a java apache beam pipeline that I'm trying to convert into a pyflink pipeline.
The beam pipeline jobs run on a flink emr+eks kubernetes cluster, using the flink kubernetes operator.
I'd prefer to work with python, and figure I might as well run flink directly instead of beam translated to flink with FlinkRunner.
The job takes a few standard parameters, mainly a compressed json input file, and sends it to an elasticsearch6 index. The index seems to be made automatically based on the input(I'm unclear exactly how).
The input file contains several json objects, each representing a different sale event, with several fields and subfields per sale event.
public class IndexToEs {
private static final Logger LOG = LoggerFactory.getLogger(IndexToEs.class);
public interface IndexToEsOptions extends DataflowPipelineOptions {
@Description("Path of the gzip index file to read from")
String getInputIndexFile();
void setInputIndexFile(String value);
@Description("Index name to index with")
@Default.String("")
String getIndexName();
void setIndexName(String value);
@Description("Index template name")
@Default.String("")
String getIndexTemplate();
void setIndexTemplate(String value);
@Description("URI for es")
@Default.String("********")
String getEsUri();
void setEsUri(String value);
}
public static void main(String[] args) {
System.out.println("Registering Pipeline options Factory");
PipelineOptionsFactory.register(IndexToEsOptions.class);
LOG.info("Defining options variable");
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(Arrays.copyOfRange(args, 1, args.length)).
withValidation().as(IndexToEsOptions.class);
options.as(S3Options.class).setAwsCredentialsProvider(new DefaultAWSCredentialsProviderChain());
options.as(S3Options.class).setAwsRegion("us-east-1");
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri()},
options.getIndexName(),
options.getIndexTemplate())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
This manages to hit elastic search and create a nice index, it looks something like this(heavily abridged for brevity and privacy)
{
"_index" : "test-testindexname",
"_type" : "test-testindextype",
"_id" : "some_id",
"_score" : 1.0,
"_source" : {
"data_field1" : "361",
"data_field2" : "some_string",
"data_field3" : "some_date",
"data_field4" : "test_string",
"data_field5" : "google",
"data_field6" : "(organic)",
"data_field7" : true,
...
}
}
Trying to follow available examples, I have this pyflink code that tries to do the same thing, read in a json file and send it to an elasticsearch6 index.
def file_to_es(
input_path,
index_name,
es_host=None,
connector_jar_filepath='file:///usr/local/flink-sql-connector-elasticsearch6-3.0.1-1.16.jar'
):
print("Building Stream Environment\n")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
if es_host is None:
print("Setting ES host to default\n")
es_host = '*****'
print(f"Connector Jar Filepath: {connector_jar_filepath}")
env.add_jars(connector_jar_filepath)
print(f"Building file source from input_path: {input_path}")
ds = env.from_source(
source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_path)
.process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
# The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered
print(f"Building es sink")
es_sink = Elasticsearch6SinkBuilder() \
.set_emitter(ElasticsearchEmitter.static_index(index_name)) \
.set_hosts([es_host]) \
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
.set_bulk_flush_max_actions(1) \
.set_bulk_flush_max_size_mb(2) \
.set_bulk_flush_interval(1000) \
.set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
.set_connection_request_timeout(30000) \
.set_connection_timeout(31000) \
.set_socket_timeout(32000) \
.build()
ds.sink_to(es_sink).name('es6 sink')
# submit for execution
env.execute()
Unfortunately it results in this error:
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.util.Map (java.lang.String and java.util.Map are in module java.base of loader 'bootstrap')
org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter.emit(MapElasticsearchEmitter.java:34) ~[blob_p-9023b6bb5fd8b3aa9fa4825b835eb98973457cd5-cb0ad61268b11c10dd96786e0959649b:3.0.1-1.16]
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.write(ElasticsearchWriter.java:123) ~[blob_p-9023b6bb5fd8b3aa9fa4825b835eb98973457cd5-cb0ad61268b11c10dd96786e0959649b:3.0.1-1.16]
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-dist-1.17.1-amzn-1.jar:1.17.1-amzn-1]
I'm guessing I'm misunderstanding how flink processes json and/or elasticsearch indices. Does anyone have any advice?