Elasticsearch RestHighLevelClient bulk operation not working after some time of restart of service

91 Views Asked by At

Elasticsearch RestHighLevelClient bulk operation not working after some time, but initially it works.

This is how our code looks.

public BulkResponse bulkOperation(BulkRequest bulkRequest) {

        BulkResponse bulkResponse;
        try {
            bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            LOGGER.error("Exception occurred while indexing bulk request data to elastic search index : " + bulkRequest.requests().get(0).index(), e);
            throw new RuntimeException("Error while bulk request index : " + bulkRequest.requests().get(0).index(),e);
        }

        return bulkResponse;
    }

Below is how we create Rest client

private void init(ElasticSearchConfiguration configuration) {
    HttpHost[] elasticSearchHosts = getElasticSearchHosts(configuration);
    RestClientBuilder builder = RestClient.builder(elasticSearchHosts);
    builder.setRequestConfigCallback(requestConfigBuilder ->
            //Sets a specified timeout value, in milliseconds, to be used when opening a communications link to the resource referenced by this URLConnection.
            // A timeout of -1 is interpreted as an infinite timeout.
            requestConfigBuilder.setConnectTimeout(configuration.getConnectionTimeout())
                    //Sets the socket timeout value. A timeout of -1 is interpreted as an infinite timeout.
                    .setSocketTimeout(configuration.getSocketTimeout())
                    //Set the time to wait for a connection from the connection manager/pool. A timeout of -1 is interpreted as an infinite timeout.
                    .setConnectionRequestTimeout(configuration.getConnectionRequestTimeout()));

    //create the rest high level client using above rest low level client
    restHighLevelClient = new RestHighLevelClient(builder);

    //createIndices(configuration);
}

public HttpHost[] getElasticSearchHosts(ElasticSearchConfiguration elasticSearchConfiguration) {
    List<HttpHost> hosts = new ArrayList<>();
    String esHosts = elasticSearchConfiguration.getHost();
    if (null != esHosts) {
        HashSet<String> hostsSet = new HashSet<>(Arrays.asList(esHosts.split(",")));
        hostsSet.stream().forEach(s -> {
            hosts.add(HttpHost.create(s));
        });
    }

    HttpHost[] hostsArr = hosts.toArray(new HttpHost[hosts.size()]);
    return hostsArr;
}

ES server version :

  "version" : {
    "number" : "7.10.2",
    "build_flavor" : "oss",
    "build_type" : "rpm",
    "build_hash" : "747e1cc71def077253878a59143c1f785afa92b9",
    "build_date" : "2021-01-13T00:42:12.435326Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },

Client version : elasticsearch-rest-high-level-client-7.5.1.jar.

Our application read messages from Kafka and convert into IndexRequests and save in ElasticSearch, initially all the data indexing works fine after some time it don't work, and there is no error or failure, once we reset Kafka offset and restart the application, it indexes all the data correctly. Any idea why its happening?

0

There are 0 best solutions below