Using TLS Authentication for your Go Kafka Client
If you want to access a Kafka server that have enabled TLS, you will need to be able to use certificate to connect from your Sarama / Go client. This article outlines the needed steps to configure it properly.
Configuring your Kafka server to support authentication
If you are managing your own Kafka service and would like to enable authentication, you should read this article from Confluent documentation site: Encryption and Authentication using SSL.
Converting Java keystore and truststore
The first steps to easily handle your certificates from Go is to convert them to a set of PEM files.
Here are the commands to extract the Certificate Authority (CA) certificate:
$ keytool -importkeystore -srckeystore kafka.server.truststore.jks -destkeystore server.p12 -deststoretype PKCS12
$ openssl pkcs12 -in server.p12 -nokeys -out server.cer.pem
You can then convert your client keystore to be usable from Go, with similar commands:
$ keytool -importkeystore -srckeystore kafka.server.keystore.jks -destkeystore client.p12 -deststoretype PKCS12
$ openssl pkcs12 -in client.p12 -nokeys -out client.cer.pem
$ openssl pkcs12 -in client.p12 -nodes -nocerts -out client.key.pem
Go Kafka client supporting TLS authentication
To connect to the server and authenticate with TLS, you just need to generate the proper TLSConfig
. Here is the relevant code:
func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
tlsConfig := tls.Config{}
// Load client cert
cert, err := tls.LoadX509KeyPair(clientCertFile, clientKeyFile)
if err != nil {
return &tlsConfig, err
}
tlsConfig.Certificates = []tls.Certificate{cert}
// Load CA cert
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return &tlsConfig, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
tlsConfig.BuildNameToCertificate()
return &tlsConfig, err
}
The code is then extremely simple to connect:
func main() {
tlsConfig, err := NewTLSConfig("bundle/client.cer.pem",
"bundle/client.key.pem",
"bundle/server.cer.pem")
if err != nil {
log.Fatal(err)
}
// This can be used on test server if domain does not match cert:
// tlsConfig.InsecureSkipVerify = true
consumerConfig := sarama.NewConfig()
consumerConfig.Net.TLS.Enable = true
consumerConfig.Net.TLS.Config = tlsConfig
client, err := sarama.NewClient([]string{"localhost:9093"}, consumerConfig)
if err != nil {
log.Fatalf("unable to create kafka client: %q", err)
}
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumerLoop(consumer, "mytopic")
}
The consumerLoop
has nothing special regarding TLS authentication. You can just use your standard Sarama code. You can read the full code on Github: base-client.go.