Airflow

Integrate Debezium with AWS Secret Manager For Retrieving Passwords

gcp , bigquery , postgresql

Debezium is my all-time favorite CDC tool. It’s an OpenSource Kafka connect application that extracts the CDC data from MSSQL, MySQL, Oracle, Postgresql MongoDB, and a few more databases. Generally, the Kafka connect will use plain text passwords or external secrets to pass the password. But still, it is a plain text password from the external secret. After a long time, I was going to deploy a debezium connector for a MySQL database. The whole stack is on AWS and we are using AWS Secret manager for storing and rotating passwords. I was checking a few more options to integrate the AWS Secret Manager with debezium via Environment variables, but I couldn’t make it work. It is very easy if you are deploying it via Docker. But I was doing this on an EC2 instance. Then I found an interesting plugin called Secret Provider from lenses.io. Seems it’s providing the out of the box integration with AWS, Azure, Hashicorp Vault, and more. Here is a small tutorial for integrating the AWS Secret manager with Debezium using this Lenses’s secret provider.

Reasons to consider this plugin: #

  1. Integration with cloud providers(GCP is still not supported)
  2. Passwords are not exposed via API
  3. Lightweight
  4. Works with EC2 instance’s IAM Role and Keys.

How does it work? #

Integrate Debezium with AWS Secret Manager For Retrieving Passwords

In any source or sink connectors, we need to pass the Secret name and it’ll look like an env variable. For example, if we have a secret name called prod_db_password for a MySQL password with the Key value pair of {"password": "My-Strong-Pass"} then we need to pass the value as ${aws:prod_db_password:password}.

Then while creating the connector, it’ll check whether the IAM role has been attached and having the necessary permissions to get the secret, Or any access and secret key provide to retrieve the secret then, it’ll call the GetSecret API to get the password from the AWS Secret Manager.

Demo Time: #

Let’s create a secret for your MySQL user in the AWS secret.

  • Secret Name: prod/debezium/mysql/service1
  • Key & Value: mysql_pass:mystrong-pass

Create an EC2 IAM role: #

Permissions: #

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret"
            ],
            "Resource": "arn:aws:secretsmanager:<<YOUR_SECRET_REGION>>:<<AWS_ACCOUNT_NUMBER>>:secret:prod/debezium/*"
        }
    ]
}

Install the Secret Provider plugin: #

cd /usr/share/confluent-hub-components
wget https://github.com/lensesio/secret-provider/releases/download/2.1.6/secret-provider-2.1.6-all.jar

Note: /usr/share/confluent-hub-components - This is my plugin directory, you can download the JAR anywhere and put that path into plugin.path

Add the plugin configurations into the worker properties

vi /etc/kafka/connect-distributed.properties

config.providers=aws
config.providers.aws.class=io.lenses.connect.secrets.providers.AWSSecretProvider
config.providers.aws.param.aws.auth.method=default
config.providers.aws.param.aws.access.key=$AWS_ACCESS_KEY_ID
config.providers.aws.param.aws.secret.key=$AWS_SECRET_ACCESS_KEY
config.providers.aws.param.aws.region=ap-south-1
config.providers.aws.param.file.dir= /etc/kafka/

Restart the kafka connect service.

Debezium Connector Config: #

Its not a full config file. File name: mysql.json

...
...
"database.hostname": "xxxxxx.ap-south-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "root",
"database.password": "${aws:prod/debezium/mysql/service1:mysql_pass}",

Testing: #

Let’s create this connector.

curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors -d @mysql.json

And the status:

curl GET localhost:8083/connectors/mysql-connector-db01/status 

{
  "name": "mysql-connector-db01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.30.32.13:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.30.32.13:8083"
    }
  ],
  "type": "source"
}

Whoo hooo!, it worked. I really like this plugin but still, one thing is not clear to me. While calling the secret, it is trying to access a file insider the directory (config.providers.aws.param.file.dir= /etc/kafka/). If we have the secret like /prod/data/mysql/mypass then it is expecting a directory structure under the /etc/kafka.

mkdir /etc/kafka/prod/data/mysql/mypass

I have raised an issue in their Github repo, lets see. But overall, there were no issues with this plugin.