AWS Big Data Blog

Securing Apache Kafka is easy and familiar with IAM Access Control for Amazon MSK

This is a guest blog post by AWS Data Hero Stephane Maarek. 

AWS launched IAM Access Control for Amazon MSK, which is a security option offered at no additional cost that simplifies cluster authentication and Apache Kafka API authorization using AWS Identity and Access Management (IAM) roles or user policies to control access. This eliminates the need for administrators to run an unfamiliar system to control access to Apache Kafka on Amazon MSK, and learn intricate details and specific commands to manage Apache Kafka access control lists (ACLs).

This is a game-changer from a security perspective for AWS customers who use Apache Kafka: I recommend Amazon MSK customers use IAM Access Control unless they have a specific need for using mutual TLS or SASL/SCRAM authN/Z.

As a cherry on the cake, IAM Access Control logs events related to Apache Kafka resource changes to Amazon CloudTrail, such as topic creation, adding partitions, and topic configuration modifications, which can be very helpful for adding an audit layer to your Apache Kafka clusters (something you could only obtain otherwise by parsing unstructured Apache Kafka logs).

These new features complete the suite of existing security features for Amazon MSK such as Amazon VPC integration for private connectivity and network isolation, at-rest encryption via AWS Key Management Service (AWS KMS), and encryption in transit via TLS.

Traditionally, Apache Kafka comes with its own ways of managing authentication and authorization. Amazon MSK began with support for mutual TLS authN/Z, and then offered SASL/SCRAM, which are standard Apache Kafka security options. Amazon MSK took a step forward to make authN/Z easier by standardizing security management for MSK clusters and Apache Kafka using IAM.

In this post, we explore how this new feature works in detail (setting up and creating a topic, producer, and consumer), and how to connect Conduktor, a graphical Apache Kafka desktop client, which allows us to quickly test our connectivity to Amazon MSK and ensure our first IAM administrator policy gets applied correctly.

The following diagram illustrates our solution architecture.

Create a MSK cluster using AWS_MSK_IAM

When creating a MSK cluster, you can enable one of several security mechanisms. The IAM-based security mechanism runs on port 9098 on your Apache Kafka brokers, and consists of only one setting to enable in your cluster configuration.

For this post, I made the MSK security group as permissive as possible, to remove any constraints on security groups. In production, make sure you configure your security groups properly.

After you launch your MSK cluster, you can start working with IAM policies to manage Apache Kafka security.

Connect using an admin user

When I work with Apache Kafka, I like to use Conduktor Desktop, a graphical user interface for Apache Kafka, because it allows me to quickly configure secure connections and test APIs.

First, we need to establish a connection into our VPC, for which you have several options.

For this post, I used the Client VPN methodology. For more information, see Getting started with Client VPN or a video walkthrough in the Amazon MSK Master Class.

Let’s set up my IAM user stephane-msk to have a policy that gives myself admin privileges for my MSK cluster.

I created the following customer-managed IAM policy and attached it to my user stephane-msk in AWS:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:*"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:387124123361:*/stephane-cluster/*"
           ]
       }
   ]
}

This should give me control over all the resources within my cluster-level permissions: cluster, topic, group, and transactional ID.

You need to make sure to download access keys and secret access keys for this IAM user and set them as a new profile on your computer:

$ aws configure --profile stephane-msk
AWS Access Key ID [None]: AKIAVUITFK3*******
AWS Secret Access Key [None]: QFO60Ce5YHdav********
Default region name [None]: us-east-1
Default output format [None]: json

I can retrieve the bootstrap servers via the AWS Management Console or the AWS Command Line Interface (AWS CLI). .

Next, I need to specify the connectivity settings to Apache Kafka, which for Java clients is the following:

# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="stephane-msk";
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler 

The ssl.truststore.location is unnecessary if your Java JDK distribution already trusts the TLS certificate of the MSK broker, which should be the case of all standard JDK distributions, because the TLS certificate is public.

I’ve included awsProfileName="stephane-msk" to make sure I point to the credentials of my newly configured profile. For more information, see the README of the Amazon MSK IAM client auth libraries.

Conduktor is now connected to the MSK cluster, and we have full access to the cluster thanks to the managed IAM policy. Note how I didn’t have to “apply” the IAM policy. As soon as it was created and attached to my user, the MSK cluster picked it up. Let’s make sure by creating a topic, producing some data to it, and consuming data from it.

Create a Apache Kafka topic from an EC2 instance

Let’s create an IAM role, for now without any IAM policies. We’ll call it DemoMSKClient.

Create an Amazon Elastic Compute Cloud (Amazon EC2) instance using Amazon Linux 2 and attach the DemoMSKClient role.

Using EC2 Instance Connect, let’s ensure the EC2 instance has network connectivity to your MSK cluster:

$ sudo yum install -y nc
[ec2-user@ip-172-31-58-110 ~]$ nc -vz b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com 9098
Ncat: Version 7.50 ( https://nmap.org/ncat )
Ncat: Connected to 172.31.33.225:9098.
Ncat: 0 bytes sent, 0 bytes received in 0.03 seconds.

Next, install the Apache Kafka clients:

#!/bin/bash
sudo su
yum update -y
yum install -y java-11-amazon-corretto-headless
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xzf kafka_2.13-2.5.0.tgz
mv kafka_2.13-2.5.0 /usr/local/
for i in /usr/local/kafka_2.13-2.5.0/bin/*.sh; do
   mv $i ${i%???}
done;
ln -sfn /usr/local/kafka_2.13-2.5.0 /usr/local/kafka
cp /usr/local/kafka_2.13-2.5.0/bin/kafka-run-class /usr/local/kafka_2.13-2.5.0/bin/kafka-run-class.sh
exit
echo 'export PATH=/usr/local/kafka/bin:$PATH' >> /home/ec2-user/.bashrc

Now, let’s download the JAR file to give our clients the extra classes they need to work with Amazon MSK IAM security (make sure to change the version in the following code, in case you’re not downloading v1.0.0).

Create a client configuration file named client-config.properties:

# ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

The truststore isn’t needed because Amazon Linux 2 with Amazon Corretto 11 trusts the certificate by default.

Additionally, our EC2 instance is configured with a role, so the IAMLoginModule automatically uses that role.

Now let’s create an Apache Kafka topic:

export CLASSPATH=/home/ec2-user/aws-msk-iam-auth-1.0.0-all.jar
kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties

We can export the CLASSPATH to run the Kafka CLI and make sure the Amazon MSK IAM libraries are included, or we can copy the JAR file directly in the Kafka libs directory.

We get an authentication error, which is by design:

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [174b92a5-b2d6-43d9-beef-af4b8a2dd1f6]: Access denied (kafka.admin.TopicCommand$)

Let’s fix this by adding the kafka-cluster:Connect statement for our IAM role:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*"
           ]
       }
   ]
}

Let’s try running the command again:

kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties

Now we’re connected to the cluster, but we get an authorization error. This is by design because IAM access control assumes least privileges and none exist that allow me to create a topic. Let’s fix it by adding the minimum privilege we need to create that topic:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:CreateTopic"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
            "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic"
           ]
       }
   ]
}

Let’s try creating the topic again:

$ kafka-topics --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client-config.properties
Created topic my-topic.

Perfect!

Note that if I try to list the topics, I don’t see my topic, because I didn’t add my permissions to my IAM policy. Fine-grained access control at its finest!

$ kafka-topics --bootstrap-server   b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --list --command-config client-config.properties

Produce from an EC2 instance

Let’s try to produce to the topic we created previously:

$ kafka-console-producer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --producer.config client-config.properties --topic my-topic
> hello
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic]

Now we’re connected to the MSK cluster, but we get an authorization error. We can fix it by adding the minimum privilege we need to write to that topic:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:DescribeTopic",
               "kafka-cluster:WriteData"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
              "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic"
           ]
       }
   ]
}

Let’s try producing again:

kafka-console-producer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --producer.config client-config.properties --topic my-topic

Consume from an EC2 instance

We use the same EC2 instance for simplicity’s sake, but you could create a separate EC2 instance with a separate role as an exercise.

Let’s try consuming from the topic, using the group my-group:

kafka-console-consumer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --topic my-topic --from-beginning --consumer.config client-config.properties --group my-group

We get an authorization exception. We can fix this by adding an IAM policy:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Effect": "Allow",
           "Action": [
               "kafka-cluster:Connect",
               "kafka-cluster:DescribeTopic",

               "kafka-cluster:ReadData",
               "kafka-cluster:DescribeGroup",
               "kafka-cluster:AlterGroup"
           ],
           "Resource": [
               "arn:aws:kafka:us-east-1:123456123456:cluster/stephane-cluster/*",
               "arn:aws:kafka:us-east-1:123456123456:topic/stephane-cluster/*/my-topic",
               "arn:aws:kafka:us-east-1:123456123456:group/stephane-cluster/*/my-group"
           ]
       }
   ]
}

Let’s try the command again, which should work:

kafka-console-consumer --bootstrap-server b-2.stephane-cluster.a19s0g.c2.kafka-beta.us-east-1.amazonaws.com:9098 --topic my-topic --from-beginning --consumer.config client-config.properties --group my-group

Java-based code

If you’re using the Java clients (such as Java Producer, Consumer, Kafka Streams, or Kafka Connect), you’re in luck—all you need to do is to reference the Java library using Maven:

<dependency>
    <groupId>software.amazon.msk</groupId>
    <artifactId>aws-msk-iam-auth</artifactId>
    <version>1.0.0</version>		
</dependency>

Then you specify the necessary Kafka properties:

ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler		

Takeaways

That’s all for this post. Here are my takeaways as I learned how to use this feature:

  • The Apache Kafka ACLs created via IAM aren’t stored in Zookeeper. This is due to an internal implementation of AWS. Therefore, the only place to manage ACLs is in IAM.
  • When IAM Access Control is enabled, no one is allowed to connect to a MSK cluster or take an Apache Kafka action without permission, which means IAM Access Control secures your cluster and Apache Kafka by default.
  • The IAM authorizer is a class that was written by AWS and the code is not available, so this feature is specific to MSK clusters only. This makes the Apache Kafka version used by MSK the same as the open-source version, plus a few additional classes that were coded by AWS but don’t affect the performance or general functioning logic of Apache Kafka.
  • The feature only works out of the box for Java-based clients. If you’re interested in using IAM authentication with Apache Kafka clients written in languages other than Java, see the following GitHub repo. At a high level, this involves implementing a SASL mechanism that uses the AWS SDK to fetch client credentials and sends a very specific JSON authentication payload to the MSK cluster, which then automatically handles access control.
  • Read the documentation to map Apache Kafka permissions to IAM policies. Some of them can have changed names, for example the AlterGroup permission, which allows them to commit offsets into a consumer group (necessary for consuming).
  • Wildcards are supported, for example the IAM resource <topic-arn...>/sometopicprefix*somethinginthemiddle*sometopicsuffix This means you can have even more freedom than the simple wildcard provided within the Apache Kafka SimpleAuthorizer.
  • Network security is done using security groups or using IAM policy conditions. Although normal Apache Kafka ACLs support a host parameter, for MSK, network security is implemented at the security group level or using conditions for IP addresses.
  • You can now audit your Apache Kafka resource API calls via CloudTrail.

Summary

In this post, we demonstrated how easy it is to secure a MSK cluster and Apache Kafka with IAM Access Control. We also showed how to configure Conduktor to connect to our MSK cluster. For more information on IAM Access Control, see the MSK user documentation. Happy learning!


Stéphane Maarek is an AWS Data Hero and the co-founder of Conduktor, a desktop graphical user interface for Apache Kafka. He’s also the course instructor for the Amazon MSK Master Class and the Apache Kafka Series on Udemy, followed by over 100,000 people around the world.