Perform Monitoring Task for Kafka with Golang

Zhimin Wen
3 min readJan 22, 2024
Image by Monica Volpin from Pixabay

Long time we have to rely on the Kafka command line tools to check the topic offset, to list the group lags and so on. You must have a JRE and install the Kafka in your system in the first place.

Recently, I discovered “franz-go” — A complete Apache Kafka client written in Go. With its pure go implementation and the full coverage of Kafka features we can create our own administrative tools to achieve some of the monitoring requirement.

Let’s use the adm package for the admin client.

go get github.com/twmb/franz-go/pkg/kadm

SASL Authentication

All my Kafka system is SASL authenticated. Let’s create a utility function to initiate a Kafka client with SASL.

Create a tls.Config with the CA cert from Kafka cluster.


func tlsConfigWithRootCA(pemFile string) (*tls.Config, error) {
rootCA, err := x509.SystemCertPool()
if err != nil {
log.Printf("Failed to load system cert:%v", err)
return nil, err
}

if rootCA == nil {
log.Printf("root ca is nil. Creating new cert pool")
rootCA = x509.NewCertPool()
}

cert, err := os.ReadFile(pemFile)
if err != nil {
log.Printf("Failed to read the cert: %v", err)
return nil, err
}
ok := rootCA.AppendCertsFromPEM(cert)
if !ok {
log.Printf("The cert of %s is not added.", pemFile)
return nil, fmt.Errorf("Check the certificate %s", pemFile)
}

return &tls.Config{
RootCAs: rootCA,
}, nil
}

The client can be then created,

func NewSASL512Client(caPemFile, sasl_user, sals_pass string, opts ...kgo.Opt) (*kgo.Client, error) {
tlsConfig, err := tlsConfigWithRootCA(caPemFile)
if err != nil {
return nil, err
}

myOpts := []kgo.Opt{
kgo.DialTLSConfig(tlsConfig),
kgo.SASL(scram.Auth{
User: sasl_user,
Pass: sals_pass,
}.AsSha512Mechanism()),
}
myOpts = append(myOpts, opts...)
client, err := kgo.NewClient(myOpts...)
if err != nil {
log.Printf("Failed to create kafka client:%v", err)
return nil, err
}
return client, nil
}

For an example, the Kafka admin client can be created with the following


func get_admin_client() *kadm.Client {
client := try.E1(kutil.NewSASL512Client(
config.String("source.kafka.caCert")…

--

--