Integrate IoT Device with AWS IoT using Python — Part I: upload data to MQTT topic

FanchenBao
13 min readAug 3, 2020
Workflow of uploading data from IoT device to AWS IoT MQTT topic

Update 08/15/2021

AWS console UI has undergone significant change since this article was published. Some instructions are updated due to the new UI deviating too much from the old version. However, if the deviation is not too big, the wording of the old version is kept intact.

Introduction

This is part I of a series discussing one way to integrate IoT device with AWS IoT using Python. Other parts of the series are listed below:

This series attempt to build a fairly robust system for the IoT device. Thus, the source code follows proper design, linting, and type checking to the best of my effort. Unfortunately, due to time constraint, unit test is currently not included.

The IoT device chosen for this series is a mock vehicle detector. It “detects” whether a vehicle has passed in front of its line of sight and increments an internal counter. Note that there is no real vehicle detector. I am using it just as an example whose functionalities are common among many IoT device: collect data and upload it to the cloud periodically. In the source code, the functionality of the vehicle detector is mocked by a simple function.

Before You Start

It is assumed that you are familiar with Python 3 and know how to code and run a Python program on an IoT device. It is also assumed that you have an AWS account (free tier is sufficient), are familiar with the AWS console, and know the basics of AWS IoT. It is highly recommended that you download the GitHub repo for this article and parse the code along the way. Although the code was developed on a Raspberry Pi 4, it shall run on any Unix system.

Download the GitHub repo and switch to the upload_data tag (to be more specific, we first create a new branch, then move the HEAD to the new branch, and finally sync the new branch with the upload_data tag):

git clone https://github.com/FanchenBao/aws_iot_integration.git
cd aws_iot_integration
git checkout -b new_branch_name upload_data

The directory tree looks like this:

.
├── Pipfile
├── Pipfile.lock
├── config.py
├── credentials
│ └── readme.md
├── main.py
├── pyproject.toml
├── setup.cfg
└── src
├── __init__.py
├── aws
│ ├── __init__.py
│ └── aws_iot_client_wrapper.py
├── child_processes
│ ├── __init__.py
│ └── child_processes.py
├── clients
│ ├── __init__.py
│ └── upload.py
├── errors
│ ├── __init__.py
│ └── network_connection_error.py
├── logger
│ ├── __init__.py
│ ├── logger_config.py
│ └── ouput.py
└── vehicle_detector
├── __init__.py
└── detect_vehicle.py

Use pipenv to set up the virtual environment for the source code. If you don’t have pipenv , refer to its documentation regarding how to install it.

Run command:

pipenv install

This command installs all the regular packages contained in the Pipfie, specifically AWSIoTPythonSDK and boto3. It will take a few minutes. Once it is done, run command pipenv shell to enter the virtual environment.

If you want to expand the source code, I would recommend you do pipenv install --dev instead, which installs all the development packages that help with linting and type checking. After installing all the dev packages, run pre-commit install. This sets up the pre-commit hook that runs linting and type-checking each time you commit new changes.

Set up AWS IoT

Before diving into the source code, we need to set up the AWS IoT first. The tutorials provided by AWS IoT documentation, such as this one, is a good place to familiarize yourself with the workflow of AWS IoT.

Create a thing type

A thing type can be attached to an IoT thing and provides more searchable attributes and tags for organizational purpose. To create a thing type, go to AWS IoT Core → Manage → Types → Create. Under Name, input VehicleDetectorUpload . Under Description, input “The upload type for vehicle detector. Vehicle detector in this type handles uploading the live data to the MQTT topic.” Then Click Create thing type.

Create a thing group

A thing group is similar to a thing type in regards to its ability to organize IoT things. However, it is more powerful because it has its own ARN and can attach policies, which will be forced onto the IoT things contained within the group. These features allow us to use thing group to apply global policies to all IoT things that share the same characteristics, without having to repeatedly attach the same policy over and over again.

To create a thing group, go to AWS IoT Core → Manage → Thing groups → Create → Create Thing Group. Under Name, input VehicleDetectorUpload . Under Description, input “The upload group for vehicle detector. All devices in this group are for uploading live data to the MQTT topic.”. Then click Create thing group.

Create policies

Generally speaking, the policies are usually set as the most permissive for tutorial purpose. However, let’s try to make them as specific as possible. We need to create two policies. One for each individual IoT thing to allow the actual IoT device to connect to the IoT thing on AWS; the other for the VehicleDetectorUpload group we have just created.

To create the connect policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input vehicle_detector_connect_policy . Choose Advanced mode, and paste the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": [
"arn:aws:iot:[region]:[aws_account]:client/${iot:Connection.Thing.ThingName}"
]
}
]
}

In the policy, [region] refers to the region (e.g. us-east-1) of your AWS account, and [aws_account] your AWS account number. This policy allows an actual IoT device to connect to AWS IoT provided that the thing name specified in the Python program on the actual IoT device matches the thing name of the IoT thing we are about to create. Then click Create.

To create the VehicleDetectorUpload group policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input vehicle_detector_upload_group_policy . Choose Advanced mode, and paste the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/test/raw",
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/dev/raw",
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/prod/raw"
]
}
]
}

This policy allows all the IoT things contained within the VehicleDetectorUpload group to publish messages to three MQTT topics vehicle_detector/test/raw , vehicle_detector/dev/raw , and vehicle_detector/prod/raw . The topics are separated for different environment in order to facilitate the software development cycle.

Create IoT thing

We have finally reached the stage of creating the AWS IoT thing. It is an entity to which the actual IoT device connects in order to transfer data. To create an IoT thing, go to AWS IoT Core → Manage → Things→ Create → Create a single thing. Under Name, input vehicle_detector_1_UPLOAD . Under Apply a type to this thing, choose VehicleDetectorUpload from the drop down. Under Add this thing to a group, click Change and choose VehicleDetectorUpload from the drop down. Under Set searchable thing attributes, input id for Attribute key and 1 for Value. Click Next.

Click Create certificate.

Download four files: A certificate for this thing, A public key, A private key, and Amazon Root CA 1. These four files are the credentials that must reside in the actual IoT device. They are used to authenticate the device when it attempts to connect to AWS IoT. For details of how asymmetric encryption and certificate authority work in TLSv1.2 Mutual Authentication, please refer to this wonderfully explained video (it’s about HTTPS, but the underlying mechanism is similar). Keep all four credential files in a secure place. We will need them later.

Click Attach a policy, choose vehicle_detector_connect_policy , and click Register Thing. This attaches the connect policy to the IoT thing and register the thing on AWS IoT.

Attach group policy

The last step is to attach the vehicle_detector_upload_group_policy to the VehicleDetectorUpload thing group. Go to AWS IoT Core → Thing groups → VehicleDetectorUpload → Security → Edit. Under Select a policy to attach to this group, click Select and choose vehicle_detector_upload_group_policy . Click Save.

Summary

In this section, we have set up the AWS IoT by creating an IoT thing named vehicle_detector_1_UPLOAD , tagged it with the VehicleDetectorUpload thing type, added it to the VehicleDetectorUpload thing group, created and downloaded the associated credentials, and attached the vehicle_detector_connect_policy policy to the credentials. In addition, we have created the vehicle_detector_upload_group_policy policy and attached it to the VehicleDetectorUpload thing group.

The AWS IoT is now ready to be connected and accept messages published to one of the specified MQTT topics.

Set up Credentials

The four credential files downloaded earlier (a certificate, a private key, a public key, and an Amazon Root CA file) must be transferred to the Python application folder under credentials/ . It is crucial that none of these files (except for the Amazon Root CA file) shall ever be exposed to the public. They must only reside in the IoT device. If they are exposed, discard them and create new ones from AWS IoT console.

Set up Python Application Configuration

Source code: config.py

The configuration of the Python program is specified in a .env file and loaded in config.py using pydantic package. The benefit of using pydantic is that it performs type checking when loading .env or environment variables. In addition, since the configuration is manifested as a Settings class, we can easily tweak how the configuration should be exposed to its consumer (e.g. see how the credential file names are exposed in Settings).

The .env file must not be committed, as it contains sensitive information. To create your own .env file, copy and paste the following template:

# AWS IoT config
sensor_name=vehicle_detector_1
endpoint=[iot_thing_https_endpoint]
port=8883
root_ca=AmazonRootCA1.pem
upload_private_key=[credential_id]-private.pem.key
upload_cert_file=[credential_id]-certificate.pem.crt
remote_private_key=
remote_cert_file=
upload_topic=vehicle_detector/test/raw
# Main program config
total_iterations=5
debug=False

Notice that the sensor_name is set to vehicle_detector_1. This name must correspond to the first half of the AWS IoT thing name we created earlier, which is vehicle_detector_1_UPLOAD. The last portion of the thing name will be fulfilled by the Upload client.

The endpoint can be found on the AWS console by going to AWS IoT Core → Settings → Device data endpoint

upload_private_key and upload_cert_file are the file names of the corresponding credential files downloaded earlier. We can leave remote_private_key and remote_cert_file blank for now.

upload_topic is the MQTT topic where the actual IoT device will upload the data. It must be one of the three topics specified in the vehicle_detector_upload_group_policy.

Set up AWS IoT Upload Client

The design for the Upload client is to first create a generic AWS IoT client wrapper class, and then inherit the wrapper class in the Upload client. The reason for this design is that in part II of this series, we will create another client called Remote . It uses almost the same set up as Upload. Thus it is easier for code maintenance to set up a generic wrapper class, from which both the Upload and Remote class can inherit.

Set up the generic AWS IoT client wrapper

Source code: src/aws/aws_iot_client_wrapper.py

The generic AWS IoT client wrapper is set up using the AWSIoTPythonSDK (GitHub repo link). Being a generic wrapper, AWSIoTMQTTClientWrapper takes a client_type param, which determines whether the client is an Upload or Remote client. If the client is an Upload, as is the case in this article, self.thing_name is a concatenation of sensor_name from the .env file and the word UPLOAD. Note that self.thing_name must match the AWS IoT thing name created prior.

The generic AWS IoT client wrapper offers two clients: self.myShadowClient , and self.myAWSIoTMQTTClient. self.myShadowClient handles connection and disconnection of the client to the AWS IoT thing (it can also update thing shadow, but that is outside the scope of this article), while self.myAWSIoTMQTTClient handles publishing messages to an MQTT topic.

Set up the Upload client

Source code: src/clients/upload.py

Setting up the Upload client is simple, because the bulk of the work has been done in the AWSIoTMQTTClientWrapper. Upload inherits AWSIoTMQTTClientWrapper and defines only one method: upload_msg. This method uses the send method defined in AWSIoTMQTTClientWrapper, yet it offers more flexibility regarding the MQTT topic. By default, the topic is configured in the .env file (accessed by settings.upload_topic). However, if a different topic is needed, it can be passed directly to the upload_msg to overwrite the default topic.

Set up The Mock Vehicle Detection Function

Source code: src/vehicle_detector/detect_vehicle.py

The function detect_vehicle is a mock function for the vehicle detector. It has an internal counter num_vehicles, which increments whenever a vehicle is detected. In reality, a vehicle detector would require sensors to determine when the counter can be incremented. In the mock implementation, this step is replaced by a random wait sleep(randint(1, 5)).

It is worth noting that a data_q is used in detect_vehicle to accept newly created data packet. Since the detect_vehicle function will be running in a child process, data_q is needed to communicate and transfer data from the child process to the main process where data upload happens.

Set up Logging

Source code: src/logger/output.py and src/logger/logger_config.py

This section can be safely skipped if logging to a local file is not needed.

Logging in Python with multiprocessing can be tricky, especially when the log record needs to be saved on a file. For details, please check my previous article Python3: Logging with Multiprocessing. Briefly speaking, since Python’s logging delegates all the log record from anywhere in the program to the root logger for handling, we only need to set up one root logger to handle all output work. Such handling includes where the log record ends up (console and/or a local file) and how the log record should be formatted. In our case, this root logger is the output_logger.

In addition, we need to set up another root logger that collects log records from all over the program, including those from a child process, and sends them to the output_logger synchronously. This root logger is the queue_logger, which uses a logger_q to collect and transfer log records. Without the queue_logger, writing logs to a local file can invoke racing condition among the processes.

The output_logger runs in its own child process. It must be set up before the queue_logger. Both loggers must be set up ahead of any other functions/processes; otherwise, some log record will be lost. Both loggers are initiated in main.py.

Set up A Class to Handle All Child Processes

Source code: src/child_processes/child_processes.py

This section can be skipped if one prefers other ways to manage child processes.

Given that we have at least two child processes: one for the vehicle detecting function, and the other output_logger, and that the procedures to create, start, and terminate these two child processes are quite similar, a central class to standardize the handling of child processes helps us reduce code repetition and improve code maintenance.

The class ChildProcesses uses an internal dictionary to record key information of each child process, including the process’s name, the process object, and the associated termination event. It is important to point out that only functions that accepts a termination event as its last argument can be handled by ChildProcesses. A termination event is a simple way to end a forever loop in a child process by calling term_event.set(), as can be seen in the ChildProcesses.terminate() method.

Set up The Main Entry Point

Source code: main.py

We have finally arrived at the main entry point of the program. The main entry is located at the main function in main.py. The main function accomplishes the following tasks:

  1. Instantiate child_processes from the ChildProcesses class to handle all child processes in this program.
  2. Initialize data_q and logger_q.
  3. Use child_processes to create and start the output_logger process. Also set up the queue_logger.
  4. Use child_processes to create and start the detect_vehicle process.
  5. Run a session of detect_vehicle process, which includes uploading five instances of data to AWS IoT.
  6. Use child_processes to terminate the detect_vehicle and the output_logger process when the session ends.

Run The Program And Observe The Logs

In your, run command pipenv run python3 main.py. You can also choose to use python3 main.py directly, but you must make sure that you are in the virtual environment and that the environment variables have been updated with the .env file in your current shell.

The terminal shall output logging information like this:

$ pipenv run python3 main.py
Loading .env environment variables…
2020-08-03 11:44:36,921 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized
2020-08-03 11:44:36,922 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: vehicle_detector_1_UPLOAD
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: 0
2020-08-03 11:44:36,924 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.100000 sec
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates...
2020-08-03 11:44:36,921 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 1
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
2020-08-03 11:44:36,928 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:36,928 - src.aws.aws_iot_client_wrapper - INFO - Connecting vehicle_detector_1_UPLOAD to MQTT client...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
2020-08-03 11:44:37,285 - src.aws.aws_iot_client_wrapper - INFO - vehicle_detector_1_UPLOAD ONLINE.
2020-08-03 11:44:37,286 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:39,928 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 2
2020-08-03 11:44:39,929 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:39,930 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:41,930 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 3
2020-08-03 11:44:41,932 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:41,932 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:45,935 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 4
2020-08-03 11:44:45,936 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:45,937 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:50,941 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 5
2020-08-03 11:44:50,942 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:50,942 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:51,013 - root - INFO - Program ended
2020-08-03 11:44:52,943 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detection terminated.
2020-08-03 11:44:53,947 - root - INFO - Logging terminated.

The same log records are also saved in a file called app.log.

Run The Program And Observe The Uploaded Data in Real Time

On AWS console, go to AWS IoT Core → Test. Under Subscription topic, input vehicle_detector/test/raw, and then click Subscribe to topic.

Run the program again as described earlier, but this time keep an eye on the AWS console. As the program runs, you should see the uploaded data appear on the screen. Each data message looks like this:

{
"timestamp": 1596470034182,
"cur_vehicle_count": 1
}

It contains an epoch timestamp (in millisecond) and the current vehicle count number, both of which are the data produced by the detect_vehicle function.

We have successfully produced data on an actual IoT device and uploaded it to AWS IoT. To process the uploaded data, the next step is to set up a rule that passes each data message from the MQTT topic to a downstream AWS service. However, this is outside the scope of this series.

Summary

In this article, we have shown a non-trivial example of how to set up AWS IoT and a Python program to upload data from an IoT device to an AWS IoT MQTT topic.

In Part II, we will provide a working example of how to perform command-and-response with the IoT device using AWS IoT Jobs. With a remote command-and-response capability, we will gain better control of the IoT device once it’s deployed in the field.

--

--