Fluentd Is Not Emitting Event and Giving Errors of unknown keywords

380 Views Asked by At

We were using td-agent with custom plugin. One plugin was opening a tcp socket and will listen to logs coming from rsyslog. They are custom logs with business logic. Thats why we had developed custom plugin which will receive logs and will parse it and will make a JSON out of pipe separated logs. It will emit event using router.emit and one custom output plugin will get that json object it will again open it and according to business logic it will decide in which index of elasticsearch that json doc should be inserted. It will also add metadata regarding elasticsearch.

In old version of td-agent the plugins were working fine. But in latest version of fluentd the plugin is not working. From the logs of custom plugin i verified all the things before router.emit is sucessfully executed. But the log statements right below router.emit is not being executed.

From Logs only i can verify that output plugin is not getting json data itself.

If i start fluetnd with debug mode i can see some enquing message but dequeing message i am not able to see. FLuentd goes for restart after this. The problem i suspect is it is not able to deque buffer.

See the logs below and help me regarding it.

Code Which Emits Event

#start method will open up tcp socket using Cool.io library of ruby.
#business logic for conversion
@log.debug "Kcsv: JSON - #{record}"
if record.length > 0
   @log.info("IUnidkhkhfkwehfkhewifhewifhweihfncnndndndndndndndn HELEOELEOLEOLEO")
   router.emit(tag, time, record)
end

Code snippet which takes emited object and will make api call.

$klog = @log
def handle_bulk_msg(body)
        begin
            $klog.info("Inside http_bulk_msg")
            http = Net::HTTP.new(@host, @port_i)
            request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'})
            request.body = body
            request.basic_auth("admin", "Elas#1234")
            http.request(request).value
        rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError,Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError, Net::ProtocolError => e
            $klog.error e.backtrace.join("\n")
        end
    end

Have a look at logs below. Input plugin is writing log in kcsv.log, fluentd is writting in fluentd.log and output plugin is writting in tdahent_elasticsearch.log file. Output is tailed of all this files so that we can see all the logs in timeline in which they occur.

==> ../tdagent/tdagent_kcsv.log <==
D, [2023-06-06T14:09:34.112400 #35453] DEBUG -- : Kcsv: JSON - {"field1"=>"value1", "f2"=>"v2", "f3"=>"v3", ...., "f4"=>"v4"}
I, [2023-06-06T14:09:34.112459 #35453]  INFO -- : IUnidkhkhfkwehfkhewifhewifhweihfncnndndndndndndndn HELEOELEOLEOLEO

==> fluentd.log <==
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1

==> ../tdagent/tdagent_kcsv.log <==
I, [2023-06-06T14:09:34.113302 #35453]  INFO -- : closed fluent socket object_id=2620

==> fluentd.log <==
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:34 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:40 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:40 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1

Here from log we can verify that input plugin is converting log into JSON as desired but when we emit event. It seems that event is not emitted no output plugins log message are being displayed into logfile and nothing is inserted in elasticsearch indices. Output plugin is being initialized and being registered also, we verify that using debug statement in initialize method of output plugin.

Require community help to debug the issue.

Also fluentd workers are dying repeatedly and starting again. After few msssages like below.

2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:09:45 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1

From log i can see it is adding data into buffer queue and not dequeing it. Before dequeue it will go for restart. Some times when it deques it end ups in below error and go for restart. Basically it ends up with some thread related error i believe. If we look at last few lines of below logs it goes for restart and strangly only registers llelasticsearch(Name of custom plugin) plugin again.

==> fluentd.log <==
2023-06-06 14:04:50 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:50 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:50 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:04:50 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing all chunks in buffer instance=1560
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: enqueueing chunk instance=1560 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: dequeueing a chunk instance=1560
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: chunk dequeued instance=1560 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
2023-06-06 14:04:56 +0530 [error]: #0 fluent/log.rb:402:error: unexpected error error_class=ArgumentError error="unknown keywords: :plugin, :title, :thread, :error"
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:962:main_process: /ruby/lib/ruby/2.7.0/logger.rb:536:in `warn'
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:962:main_process: /fluentd/lib/fluent/plugin_helper/thread.rb:90:in `ensure in block in thread_create'
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:962:main_process: /fluentd/lib/fluent/plugin_helper/thread.rb:90:in `block in thread_create'
2023-06-06 14:04:56 +0530 [error]: #0 fluent/log.rb:402:error: unexpected error error_class=ArgumentError error="unknown keywords: :plugin, :title, :thread, :error"
2023-06-06 14:04:56 +0530 [trace]: #0 fluent/log.rb:317:trace: writing events into buffer instance=1560 metadata_size=1
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:961:main_process: /ruby/lib/ruby/2.7.0/logger.rb:536:in `warn'
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:961:main_process: /fluentd/lib/fluent/plugin_helper/thread.rb:90:in `ensure in block in thread_create'
  2023-06-06 14:04:56 +0530 [error]: #0 fluent/supervisor.rb:961:main_process: /fluentd/lib/fluent/plugin_helper/thread.rb:90:in `block in thread_create'
2023-06-06 14:04:56 +0530 [error]: fluent/log.rb:402:error: Worker 0 exited unexpectedly with status 1
2023-06-06 14:04:57 +0530 [info]: #0 fluent/log.rb:360:info: init worker0 logger path="/logs/fluentd/fluentd.log" rotate_age=nil rotate_size=nil
2023-06-06 14:04:57 +0530 [warn]: #0 fluent/log.rb:381:warn: 'type' is deprecated parameter name. use '@type' instead.
2023-06-06 14:04:57 +0530 [info]: fluent/log.rb:360:info: adding match pattern="*.**" type="llelasticsearch"
2023-06-06 14:04:57 +0530 [trace]: #0 fluent/log.rb:317:trace: registered output plugin 'llelasticsearch'
2023-06-06 14:04:57 +0530 [warn]: #0 fluent/log.rb:381:warn: 'type' is deprecated parameter name. use '@type' instead.
2023-06-06 14:04:57 +0530 [trace]: #0 fluent/log.rb:317:trace: registered metrics plugin 'local'
2023-06-06 14:04:57 +0530 [info]: #0 fluent/log.rb:360:info: 'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour
2023-06-06 14:04:57 +0530 [trace]: #0 fluent/log.rb:317:trace: registered buffer plugin 'memory'
2023-06-06 14:04:57 +0530 [info]: #0 fluent/log.rb:360:info: InsideElastic Configurationnhskdfhskfhsdkhfksdhfksdhfkshfkd

Any help is highly appreciated. Thanks In Advance. Let me know if more inputs are required...

0

There are 0 best solutions below