I have a problem with my writer. I see in tcpdump that it send calls to kafka even I don't send messages to it. May be somebode already resolve this?
This is code for connection, send and close:
package kafka
import (
"chrome-grabber/config"
"chrome-grabber/grabber"
logger "chrome-grabber/log"
"context"
"github.com/mailru/easyjson"
"github.com/segmentio/kafka-go"
"net"
)
var KafkaMessenger = Messenger{}
var IsConnectedToKafka bool = true
func getKafkaProducer(brokers []string, topic string) *kafka.Writer {
transport := &kafka.DefaultTransport
return &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: 16384,
Transport: *transport,
Logger: kafka.LoggerFunc(logger.Debug),
ErrorLogger: kafka.LoggerFunc(logger.Debug),
}
}
func sendKafkaMessage(ctx context.Context, kafkaWriter *kafka.Writer, message *grabber.KafkaMessage) error {
marshalJsonEntry, err := easyjson.Marshal(message)
if err != nil {
IsConnectedToKafka = false
logger.Error("Could not marshal message: %v", err)
return err
}
msg := kafka.Message{
Key: []byte(message.Key),
Value: marshalJsonEntry,
Time: message.Time,
}
logger.Info("Message to output: %s", msg)
return kafkaWriter.WriteMessages(ctx, msg)
}
type ChannelMessage struct {
Message *grabber.KafkaMessage
Topic string
}
type Messenger struct {
requestProducer *kafka.Writer
logProducer *kafka.Writer
transport *kafka.Transport
ctx context.Context
}
func Init(ctx context.Context) {
logger.Info("init kafka")
grabberConfig := config.LoadConfig()
KafkaMessenger.ctx = ctx
transport := &kafka.Transport{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := &net.Dialer{}
return d.DialContext(ctx, "tcp", address)
},
}
KafkaMessenger.requestProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaNetworkRequestTopic)
KafkaMessenger.logProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaBrowserLogTopic)
KafkaMessenger.transport = transport
logger.Error("Status: %v", KafkaMessenger.requestProducer.Stats().Errors)
}
func CloseConnection(ctx context.Context) {
logger.Debug("Close connection to kafka")
KafkaMessenger.ctx = ctx
err := KafkaMessenger.requestProducer.Close()
if err != nil {
logger.Error("close requestProducer connection error: %v", err)
return
}
err = KafkaMessenger.logProducer.Close()
if err != nil {
logger.Error("close logProducer connection error: %v", err)
return
}
KafkaMessenger.transport.CloseIdleConnections()
}
func CheckConnection() error {
logger.Info("Start to check connection")
grabberConfig := config.LoadConfig()
conn, err := kafka.Dial("tcp", grabberConfig.KafkaBrokers[0])
if err != nil {
logger.Error("failed connect to kafka:", err)
IsConnectedToKafka = false
} else {
logger.Info("Close dial")
IsConnectedToKafka = true
conn.Close()
}
return err
}
func (m *Messenger) Send(msg *ChannelMessage, grabberConfig *grabber.Config) {
logger.Info("Start sending")
var ctx = m.ctx
switch msg.Topic {
case grabberConfig.KafkaBrowserLogTopic:
err := sendKafkaMessage(ctx, m.logProducer, msg.Message)
if err != nil {
IsConnectedToKafka = false
logger.Error("Could not write message to browser log topic: %v", err)
m.logProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaBrowserLogTopic)
}
m.transport.CloseIdleConnections()
case grabberConfig.KafkaNetworkRequestTopic:
err := sendKafkaMessage(ctx, m.requestProducer, msg.Message)
if err != nil {
IsConnectedToKafka = false
//go CheckKafka()
logger.Error("Could not write message to request topic: %v", err)
m.requestProducer = getKafkaProducer(grabberConfig.KafkaBrokers, grabberConfig.KafkaNetworkRequestTopic)
}
m.transport.CloseIdleConnections()
}
}
**And this code is for chrome grabber, which intercepts events and sends them to kafka. I am trying to implement a circuit breaker and stop sending any traffic to kafka. But it is still present even if I don't send messages and close channels **
package browser
import (
"chrome-grabber/config"
"chrome-grabber/grabber"
grabberHttp "chrome-grabber/http"
"chrome-grabber/kafka"
logger "chrome-grabber/log"
"chrome-grabber/utils"
"context"
"encoding/json"
"fmt"
cdpLog "github.com/chromedp/cdproto/log"
"github.com/chromedp/cdproto/network"
"github.com/chromedp/cdproto/target"
"github.com/chromedp/chromedp"
"net"
"net/http"
"os"
"time"
)
// канал по которому определяется, что пора закончить сбор информации
var closeBrowserConnection = make(chan bool)
var Enable bool
func getCurrentPage(ctx context.Context, targetId string) *grabber.KafkaMessagePageInfo {
targets, err := chromedp.Targets(ctx)
if err != nil {
logger.Error(fmt.Sprintf("%v", err))
return nil
}
for _, browserTarget := range targets {
if browserTarget.Type == "page" && browserTarget.TargetID.String() == targetId {
return grabber.GetKafkaMessagePageInfo(browserTarget.URL)
}
}
logger.Debug("Could not find page browserTarget with id: %v", targetId)
return nil
}
func checkChromeContext(ctx context.Context) bool {
logger.Debug("check chrome context")
var title = ""
err := chromedp.Run(ctx, chromedp.Title(&title))
if err != nil {
logger.Error("error with chrome context: %v", err)
} else {
logger.Debug("context ok with title: %s", title)
}
return err == nil
}
func CheckPortOpen(host string, port int) bool {
timeout := time.Second
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, fmt.Sprintf("%d", port)), timeout)
if conn != nil {
defer conn.Close()
}
if err != nil {
logger.Debug("%v", err)
}
return err == nil && conn != nil
}
func getRemoteDebugContext(port int) (string, string, string) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/json", port))
if err != nil {
logger.PanicError(err)
return "", "", ""
}
var result []map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.PanicError(err)
return "", "", ""
}
for _, item := range result {
if "page" == item["type"] {
return item["webSocketDebuggerUrl"].(string), item["id"].(string), item["title"].(string)
}
}
return "", "", ""
}
func tryConnectToBrowser(ctx context.Context, port int, chromeChannel chan context.Context) {
logger.Debug("trying connect to Browser")
config.SetGrabberRunning()
ConnectToBrowser(ctx, port, chromeChannel)
config.SetGrabberNotRunning()
}
func CheckBrowserPortEnabled(ctx context.Context) {
var host = "localhost"
var chromeChannel = make(chan context.Context)
var chromeContext context.Context
interval := time.Duration(config.LoadConfig().PortCheckInterval)
ticker := time.NewTicker(interval * time.Second)
isConnectedToKafka := config.LoadConfig().CheckKafkaConnectionFlag
isGrabberStatusAvailable := config.LoadConfig().CheckKafkaConnectionFlag
kafkaInterval := config.LoadConfig().KafkaCheckInterval
var i, z = 0, 0
var count = 0
hostname, _ := os.Hostname() //получаем имя хоста
//проверяем статус сервиса доступности граббера
answer, _ := grabberHttp.Ping(config.LoadConfig().UrlToCheckService)
logger.Error("! Is available sending messages by grabber_status service: %v for %v!", answer, hostname)
for {
<-ticker.C
var port = config.LoadConfig().BrowserPort
isPortOpen := CheckPortOpen(host, port)
//isConnectedToKafka = kafka.IsConnectedToKafka
isRunning := config.IsGrabberRunning()
isConnectedToKafka = kafka.IsConnectedToKafka
//опрашивает эндпоинт каждые три минуты и блокирует отправку сообщений в случае получения false
if z >= 3 {
answer, err := grabberHttp.Ping(config.LoadConfig().UrlToCheckService)
if !answer.GrabberEnable || err != nil {
logger.Error("! The grabber_status service returned false or error, sending messages is unavailable for %v!", hostname)
isGrabberStatusAvailable = false
kafka.CloseConnection(ctx)
} else {
logger.Error("! The grabber_status service returned true, sending messages available for %v!", hostname)
isGrabberStatusAvailable = true
}
z = 0
}
z++
//проверка связи с кафкой если попытка отправить сообщение вызывает ошибку то запускается проверка связи с кафкой
if !isConnectedToKafka {
i += int(interval) // после каждой попытки увеличивается на интервал в настройках
if i >= kafkaInterval { // после трех попыток величина интервала 1 час и дальше увеличение интервала не происходит
i = 0
count++
if count >= 3 {
kafkaInterval = 3600
}
isConnectedToKafka = kafka.CheckConnection() == nil
}
} else {
count = 0
kafkaInterval = config.LoadConfig().KafkaCheckInterval
}
Enable = isConnectedToKafka && isGrabberStatusAvailable
logger.Debug("Enable: %v", Enable)
logger.Debug("debug port open: %v\tgrabber running: %v\t Enable: %v", isPortOpen, isRunning, Enable)
logger.Debug("isConnectedToKafka: %v\tisGrabberStatusAvailable: %v", isConnectedToKafka, isGrabberStatusAvailable)
if isRunning && !isPortOpen {
closeBrowserConnection <- true
config.SetGrabberNotRunning()
//коннектимся к браузеру только если есть доступ к кафке isConnectedToKafka = true
} else if !isRunning && isPortOpen && isConnectedToKafka && isGrabberStatusAvailable {
logger.Debug(" go tryConnectToBrowser")
go tryConnectToBrowser(ctx, port, chromeChannel)
chromeContext = <-chromeChannel
}
if config.IsGrabberRunning() {
if !checkChromeContext(chromeContext) {
closeBrowserConnection <- true
}
}
}
}
func ConnectToBrowser(ctx context.Context, port int, chromeChannel chan context.Context) {
debugUrl, targetId, title := getRemoteDebugContext(port)
if len(debugUrl) == 0 {
logger.Panic("Could not start browser log grabber")
}
// create allocator context for use with creating a browser context later
allocatorContext, cancel := chromedp.NewRemoteAllocator(ctx, debugUrl)
defer cancel()
// create context
ctxt, cancel := chromedp.NewContext(allocatorContext)
defer cancel()
c := chromedp.FromContext(ctxt)
chromedp.WithTargetID(target.ID(targetId))(c)
if err := chromedp.Run(ctxt, cdpLog.Enable()); err != nil {
logger.Panic("Failed enable Log Domain: %v", err)
}
if err := chromedp.Run(ctxt, network.Enable()); err != nil {
logger.Panic("Failed enable Network Domain: %v", err)
}
grabberConfig := config.LoadConfig()
//слушаем события браузера
chromedp.ListenTarget(ctxt, func(ev interface{}) {
switch ev := ev.(type) {
case *network.EventRequestWillBeSent:
//сохраняем время старта запроса
utils.Store(ev)
if Enable {
//logger.Debug("ENABLE")
go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
Message: grabber.CreateRequestMessage(grabberConfig.Uid, ev, getCurrentPage(ctxt, targetId)),
Topic: grabberConfig.KafkaNetworkRequestTopic,
}, grabberConfig)
}
case *network.EventLoadingFinished:
startNs := utils.FixDuration(ev.RequestID)
if Enable {
go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
Message: grabber.CreateCompleteRequestMessage(grabberConfig.Uid, ev, startNs),
Topic: grabberConfig.KafkaNetworkRequestTopic,
}, grabberConfig)
}
case *network.EventLoadingFailed:
startNs := utils.FixDuration(ev.RequestID)
if Enable {
go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
Message: grabber.CreateFailedRequestMessage(grabberConfig.Uid, ev, startNs),
Topic: grabberConfig.KafkaNetworkRequestTopic,
}, grabberConfig)
}
case *network.EventResponseReceived:
utils.StoreThisShit(ev)
if Enable {
go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
Message: grabber.CreateResponseReceivedMessage(grabberConfig.Uid, ev),
Topic: grabberConfig.KafkaNetworkRequestTopic,
}, grabberConfig)
}
case *cdpLog.EventEntryAdded:
if ev.Entry.Level == "error" && Enable {
go kafka.KafkaMessenger.Send(&kafka.ChannelMessage{
Message: grabber.CreateBrowserLogMessage(grabberConfig.Uid, ev, getCurrentPage(ctxt, targetId)),
Topic: grabberConfig.KafkaBrowserLogTopic,
}, grabberConfig)
}
}
})
logger.Info("Connected to %s\nWaits close Browser", title)
chromeChannel <- ctxt
<-closeBrowserConnection
}
I am tried to implement a circuit breaker and stop sending any traffic to kafka.