The Internet of Things on AWS – Official Blog

Use AWS IoT Core MQTT broker with standard MQTT libraries

Introduction

AWS IoT Core connects Internet of Things (IoT) devices to AWS IoT and other AWS services. Devices and clients can use the MQTT protocol to publish and subscribe to messages. MQTT libraries, such as the AWS IoT Device SDKs, include open-source libraries, developer guides with samples, and porting guides so that you can build innovative IoT products or solutions on your choice of hardware platforms.

Customers ask us how they can use the AWS IoT Core message broker with standard MQTT libraries. The reasons for this request can be wanting to migrate from another MQTT broker to AWS IoT Core while currently using standard MQTT libraries, or they might already be using standard MQTT libraries.

In this post you will learn how you can use standard MQTT libraries for different languages like Python, Node.js, or Java to interact with the AWS IoT Core message broker. The MQTT libraries covered in this post support the MQTT protocol version 5. AWS IoT Core recently launched support for MQTT version 5. To get started and to learn more about MQTT5 for AWS IoT Core, refer to the technical documentation.

Metadata

Time to read: 8 minutes
Learning level: 300
Services used: AWS IoT Core

Prerequisites

To execute the walkthrough in this post, you need to have an AWS account and permissions to provision IoT things.

Walkthrough

For the examples in this post, you will use a device with the name mqtt5. For device authentication you will use an X.509 certificate which is not issued by AWS IoT Core. Create a certificate with openssl and register it with AWS IoT Core without a CA. For ease of use, create an open IoT policy. In general, you should use permissions which follow the principle of least privilege.

Create a device

Use the following commands to create your device.

# assign the thing name to a shell variable 
THING_NAME=mqtt5 
# create the thing in the AWS IoT Core device registry 
aws iot create-thing --thing-name $THING_NAME 
# create a key pair 
openssl req -x509 -newkey rsa:2048 -keyout $THING_NAME.private.key -out $THING_NAME.certificate.pem -sha256 -days 365 -nodes -subj "/CN=$THING_NAME" 
# register the device certificate with AWS IoT Core 
aws iot register-certificate-without-ca --certificate-pem file://$THING_NAME.certificate.pem --status ACTIVE > /tmp/register_certificate.json
CERTIFICATE_ARN=$(jq -r ".certificateArn" /tmp/register_certificate.json)
CERTIFICATE_ID=$(jq -r ".certificateId" /tmp/register_certificate.json) 
# create an IoT policy 
POLICY_NAME=${THING_NAME}_Policy
aws iot create-policy --policy-name $POLICY_NAME \
  --policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action": "iot:*","Resource":"*"}]}'
# attach the policy to your certificate aws iot attach-policy --policy-name $POLICY_NAME \   
  --target $CERTIFICATE_ARN 
# attach the certificate to your thing
aws iot attach-thing-principal --thing-name $THING_NAME \
  --principal $CERTIFICATE_ARN

IoT endpoint

Assign your AWS IoT Core endpoint to a shell variable. This makes it easier to use your endpoint in the examples in the blog post.

export IOT_ENDPOINT=$(aws iot describe-endpoint --endpoint-type iot:Data-ATS --query 'endpointAddress' --output text)

Root CA certificate

Download the root CA certificate that is used to sign AWS IoT Core’s server certificate. Store the root CA certificate in the file AmazonRootCA1.pem.

HiveMQ MQTT CLI

The MQTT CLI is an open source project backed by HiveMQ. It supports MQTT 3.1.1 and MQTT 5.0. You can use the MQTT CLI to interact with the AWS IoT Core message broker. The HiveMQ MQTT CLI is executed as mqtt.

Subscribe

Subscribe to the topic hivemq/with/aws with MQTT version 5.

mqtt sub -h $IOT_ENDPOINT -p 8883 \
  --cafile AmazonRootCA1.pem \
  --cert mqtt5.certificate.pem \
  --key mqtt5.private.key \
  -d -V 5 -q 0 \
  -t hivemq/with/aws

Publish

Let the subscriber run and publish a message to the topic hivemq/with/aws. You should see the message arrive at the subscriber. The following command publishes one message. Execute the command multiple times to publish any number of messages.

mqtt pub -h $IOT_ENDPOINT -p 8883 \
  --cafile AmazonRootCA1.pem \
  --cert mqtt5.certificate.pem \
  --key mqtt5.private.key \
  -d -V 5 -q 0 \
  -t hivemq/with/aws \
  -m "{\"mqtt5\": \"arrived\", \"client lib\": \"hivemq\", \"date\": \"$(date)\"}"

You will see all messages that you published arrive at the subscriber.

Mosquitto

Eclipse Mosquitto is an open source message broker and provides publish – mosquitto_pub – and subscribe – mosquitto_sub – clients.

Subscribe with mosquitto_sub to a topic

You use mosquitto_sub to subscribe to a topic and mosquitto_pub to publish to the same topic. AWS IoT Core routes the messages from the publisher to the subscriber.

Subscribe

Use the mosquitto_sub client to subscribe to the topic mosquitto/with/aws.

mosquitto_sub --cafile AmazonRootCA1.pem \
  --cert $THING_NAME.certificate.pem \
  --key $THING_NAME.private.key -h $IOT_ENDPOINT -p 8883 \
  -q 0 -t mosquitto/with/aws -i ${THING_NAME}-sub \
  --tls-version tlsv1.2 -d -V mqttv5

Publish

Publish multiple messages with the mosquitto_pub client. To publish multiple messages, create a file containing the messages and mosquitto_pub reads the messages from that file.

Create a file named messages.json with the following content.

{"mqtt5": "arrived", "message": "1"}
{"mqtt5": "arrived", "message": "2"}
{"mqtt5": "arrived", "message": "3"}

Publish a message using a topic alias. A topic alias is an integer number that can be used instead of a topic name. The first publish request introduces a topic alias for a topic. All subsequent publishing requests then use the topic alias instead of the topic name. In the following example, you use the topic alias 2 for the topic mosquitto/with/aws.

cat messages.json |mosquitto_pub --cafile AmazonRootCA1.pem \
  --cert $THING_NAME.certificate.pem \
  --key $THING_NAME.private.key -h $IOT_ENDPOINT -p 8883 \
  -q 0 -t mosquitto/with/aws -i $THING_NAME --tls-version tlsv1.2 \
  -d -V mqttv5 -D publish topic-alias 2 -l

The output from the publish requests should look similar to the following output:

Client mqtt5 sending CONNECT
Client mqtt5 received CONNACK (0)
Client mqtt5 sending PUBLISH (d0, q0, r0, m1, 'mqtt5', ... (36 bytes))
Client mqtt5 sending PUBLISH (d0, q0, r0, m2, '(null)', ... (36 bytes))
Client mqtt5 sending PUBLISH (d0, q0, r0, m3, '(null)', ... (36 bytes))
Client mqtt5 sending DISCONNECT

Only the first publish request includes the topic name. For the subsequent requests you will get '(null)' as topic which means that the topic alias is used.

At the subscriber, you can observe incoming messages from the publisher.

Paho Python Client

The following code snippets demonstrate how you can use AWS IoT Core Eclipse Paho Python Client library. Connect to AWS IoT Core first. Upon a successful connection, you can calculate the topic alias and subscribe to the topic. Then you can publish to a topic.

MQTT Client

Create an MQTT version 5 client with handlers for a successful connection, and when a message arrives, connect to the AWS IoT Core endpoint. In this example, the root CA certificate, device certificate, device key, and endpoint are provided as command line options.

mqttc = mqtt.Client(protocol=mqtt.MQTTv5)
mqttc.tls_set(
    ca_certs=args.ca,
    certfile=args.certificate,
    keyfile=args.key,
    tls_version=2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect(args.endpoint, 8883, 60)

Connection handler

Upon connection, the MQTT client will receive a CONNACK from AWS IoT Core that includes the maximum usable topic alias. Based on the maximum usable topic alias, the code generates a random topic alias in the range from 0 to the maximum topic alias.

def on_connect(mqttc, userdata, flags, rc, properties=None):
    global TOPIC_ALIAS_MAX
    LOGGER.info("connected to endpoint %s with result code %s", args.endpoint, rc)
    LOGGER.info("userdata: %s, flags: %s properties: %s", userdata, flags, properties)
    LOGGER.info("topic_alias_maximum: %s", properties.TopicAliasMaximum)
    TOPIC_ALIAS_MAX = properties.TopicAliasMaximum
    mqttc.is_connected = True

    LOGGER.info('subscribing to topic: %s', args.topic)
    mqttc.subscribe(args.topic, qos=0, options=None, properties=None)

topic_alias = random.SystemRandom().randint(0,TOPIC_ALIAS_MAX)

When a message is received by the on_message handler, it will be logged.

def on_message(mqttc, userdata, msg):
    LOGGER.info('received message: topic: %s payload: %s', msg.topic, msg.payload.decode())

To define a topic alias for your topic name, you can publish the first message including the topic alias and topic name. All subsequent messages will be published in a while loop using the topic alias.

properties.TopicAlias = topic_alias
message = json.dumps({"mqttv5": "has arrived", "date_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "topic_alias": topic_alias})
LOGGER.info('publish: topic: %s message: %s', args.topic, message)
mqttc.publish(args.topic, payload=message, qos=0, retain=False, properties=properties)

while True:
    message = json.dumps({"mqttv5": "has arrived", "date_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "topic_alias": topic_alias})
    LOGGER.info('publish: topic_alias: %s message: %s', topic_alias, message)
    mqttc.publish('', payload=message, qos=0, retain=False, properties=properties)
    time.sleep(2)

MQTT.js

In the previous section you learned how to use the Paho Python Client. Paho also provides a JavaScript client that uses WebSockets to connect to the MQTT broker. The MQTT.js client library supports not only websockets, but also TLS connections with certificate based authentication. The following Node.js code snippets assume that you provide the root CA, device private key and certificate, as well as the AWS IoT Core endpoint and topic in the command line. In this example, you will also use a topic alias to publish messages. MQTT Client Build a client to connect to AWS IoT Core with MQTT version 5.

console.log('building client');
const client = mqtt.connect(
    'mqtts://' + argv.e + ':8883',
    {
        key:  fs.readFileSync(argv.k),
        cert: fs.readFileSync(argv.c),
        ca: [ fs.readFileSync(argv.ca) ],
        protocolId: 'MQTT',
        protocolVersion: 5,
    }
);

Connection handler

Upon a successful connection, get the maximum topic alias advertised by AWS IoT Core. Calculate the topic alias with a random function and set the publish properties to use the topic alias. Subscribe to the topic you are publishing to and publish a message using both the topic and topic alias.

client.on('connect', function () {
    topicAliasMax = client.topicAliasSend.numberAllocator.max;
    topicAlias = Math.floor(Math.random() * (topicAliasMax - 0 + 1) + 0);
    console.log('topicAliasMax: ' + topicAliasMax + ' topicAlias: ' + topicAlias);
    console.log('subscribe to: ' + argv.t);
    publishOptions.properties.topicAlias = topicAlias;

    console.log('subscribe: topic: ' + argv.t);
    client.subscribe(argv.t, function (err) {
        if (err) {
            console.log('subscribe error: ' + err);
        } else {
            var message = generateMessage();
            console.log('publish first message to set topicAlias: ' + topicAlias + ' topic: ' + argv.t + ' message: ' + message);
            client.publish(argv.t, message, publishOptions);
        };
    });
});

Receive messages

Messages received will be logged to the console.

client.on('message', (topic, message) => {
  console.log('message received: subscription topic: ' + argv.t + ' topic: ' + topic + ' message: ' + message.toString());
});

Publish messages

Publish messages continuously using the topic alias.

setInterval(function () {
    var message = generateMessage();
    console.log('publish: topic: ' + argv.t + ' message: ' + message);
    client.publish('', message, publishOptions);
}, 5000);

You should see messages arriving at your subscriber, which logs them to the console.

Cleaning Up

Delete the thing that you created, and delete the associated certificate and IoT policy. You can find detailed steps in the How to manage things with the registry documentation.

Conclusion

In this post you have learned how to use AWS IoT Core with standard MQTT libraries, which already include support for MQTT5. Using AWS IoT Device SDKs simplifies and accelerates the development of code running on connected devices by including methods that facilitate the use of AWS IoT features like AWS IoT Greengrass discovery, custom authentication or device shadows. AWS IoT Device SDKs include open-source libraries, developer guides with samples, and porting guides so that you can build innovative IoT products or solutions on your choice of hardware platforms. AWS IoT Device SDKs are freely available as open-source projects.

About the author

Philipp Sacha is a Partner Solutions Architect at Amazon Web Services and works with partners in the manufacturing area. He joined AWS in 2015 and held several roles as Solutions Architect also as a Specialist in the IoT area.