Integrate IoT Device with AWS IoT using Python — Part I: upload data to MQTT topic
This is part I of a series discussing one way to integrate IoT device with AWS IoT using Python. Other parts of the series are listed below:
- Part I: upload data to MQTT topic (this article)
- Part II: command-and-response
- Part III: command-and-response using API
- Part IV: remote SSH login with ngrok
- Part V: over-the-air software update
This series attempt to build a fairly robust system for the IoT device. Thus, the source code follows proper design, linting, and type checking to the best of my effort. Unfortunately, due to time constraint, unit test is currently not included.
The IoT device chosen for this series is a mock vehicle detector. It “detects” whether a vehicle has passed in front of its line of sight and increments an internal counter. Note that there is no real vehicle detector. I am using it just as an example whose functionalities are common among many IoT device: collect data and upload it to the cloud periodically. In the source code, the functionality of the vehicle detector is mocked by a simple function.
Before You Start
It is assumed that you are familiar with Python 3 and know how to code and run a Python program on an IoT device. It is also assumed that you have an AWS account (free tier is sufficient), are familiar with the AWS console, and know the basics of AWS IoT. It is highly recommended that you download the GitHub repo for this article and parse the code along the way. Although the code was developed on a Raspberry Pi 4, it shall run on any Unix system.
Download the GitHub repo and switch to the
git clone https://github.com/FanchenBao/aws_iot_integration.git
git checkout -b upload_data
The directory tree looks like this:
│ └── readme.md
│ ├── __init__.py
│ └── aws_iot_client_wrapper.py
│ ├── __init__.py
│ └── child_processes.py
│ ├── __init__.py
│ └── upload.py
│ ├── __init__.py
│ └── network_connection_error.py
│ ├── __init__.py
│ ├── logger_config.py
│ └── ouput.py
pipenv to set up the virtual environment for the source code. If you don’t have
pipenv , refer to its documentation regarding how to install it.
This command installs all the regular packages contained in the
boto3. It will take a few minutes. Once it is done, run command
pipenv shell to enter the virtual environment.
If you want to expand the source code, I would recommend you do
pipenv install --dev instead, which installs all the development packages that help with linting and type checking. After installing all the dev packages, run
pre-commit install. This sets up the pre-commit hook that runs linting and type-checking each time you commit new changes.
Set up AWS IoT
Before diving into the source code, we need to set up the AWS IoT first. The tutorials provided by AWS IoT documentation, such as this one, is a good place to familiarize yourself with the workflow of AWS IoT.
Create a thing type
A thing type can be attached to an IoT thing and provides more searchable attributes and tags for organizational purpose. To create a thing type, go to AWS IoT Core → Manage → Types → Create. Under Name, input
VehicleDetectorUpload . Under Description, input “The upload type for vehicle detector. Vehicle detector in this type handles uploading the live data to the MQTT topic.” Then Click Create thing type.
Create a thing group
A thing group is similar to a thing type in regards to its ability to organize IoT things. However, it is more powerful because it has its own ARN and can attach policies, which will be forced onto the IoT things contained within the group. These features allow us to use thing group to apply global policies to all IoT things that share the same characteristics, without having to repeatedly attach the same policy over and over again.
To create a thing group, go to AWS IoT Core → Manage → Thing groups → Create → Create Thing Group. Under Name, input
VehicleDetectorUpload . Under Description, input “The upload group for vehicle detector. All devices in this group are for uploading live data to the MQTT topic.”. Then click Create thing group.
Generally speaking, the policies are usually set as the most permissive for tutorial purpose. However, let’s try to make them as specific as possible. We need to create two policies. One for each individual IoT thing to allow the actual IoT device to connect to the IoT thing on AWS; the other for the
VehicleDetectorUpload group we have just created.
To create the connect policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input
vehicle_detector_connect_policy . Choose Advanced mode, and paste the following policy:
In the policy,
[region] refers to the region (e.g.
us-east-1) of your AWS account, and
[aws_account] your AWS account number. This policy allows an actual IoT device to connect to AWS IoT provided that the thing name specified in the Python program on the actual IoT device matches the thing name of the IoT thing we are about to create. Then click Create.
To create the
VehicleDetectorUpload group policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input
vehicle_detector_upload_group_policy . Choose Advanced mode, and paste the following policy:
This policy allows all the IoT things contained within the
VehicleDetectorUpload group to publish messages to three MQTT topics
vehicle_detector/dev/raw , and
vehicle_detector/prod/raw . The topics are separated for different environment in order to facilitate the software development cycle.
Create IoT thing
We have finally reached the stage of creating the AWS IoT thing. It is an entity to which the actual IoT device connects in order to transfer data. To create an IoT thing, go to AWS IoT Core → Manage → Things→ Create → Create a single thing. Under Name, input
vehicle_detector_1_UPLOAD . Under Apply a type to this thing, choose
VehicleDetectorUpload from the drop down. Under Add this thing to a group, click Change and choose
VehicleDetectorUpload from the drop down. Under Set searchable thing attributes, input
id for Attribute key and
1 for Value. Click Next.
Click Create certificate.
Click Download for A certificate for this thing, A public key, and A private key. These three files are the credentials that must reside in the actual IoT device. They are used to authenticate the device when it attempts to connect to AWS IoT. Also click Download for A root CA for AWS IoT. In the next webpage, click Amazon Root CA 1. Copy all the content on the webpage and store in a file called
AmazonRootCA1.pem . This file is a root certificate authority provided by Amazon, also required for authentication. For details of how asymmetric encryption and certificate authority work in TLSv1.2 Mutual Authentication, please refer to this wonderfully explained video (it’s about HTTPS, but the underlying mechanism is similar). Keep all four credential files in a secure place. We will need them later.
Click Attach a policy, choose
vehicle_detector_connect_policy , and click Register Thing. This attaches the connect policy to the IoT thing and register the thing on AWS IoT.
Attach group policy
The last step is to attach the
vehicle_detector_upload_group_policy to the
VehicleDetectorUpload thing group. Go to AWS IoT Core → Thing groups → VehicleDetectorUpload → Security → Edit. Under Select a policy to attach to this group, click Select and choose
vehicle_detector_upload_group_policy . Click Save.
In this section, we have set up the AWS IoT by creating an IoT thing named
vehicle_detector_1_UPLOAD , tagged it with the
VehicleDetectorUpload thing type, added it to the
VehicleDetectorUpload thing group, created and downloaded the associated credentials, and attached the
vehicle_detector_connect_policy policy to the credentials. In addition, we have created the
vehicle_detector_upload_group_policy policy and attached it to the
VehicleDetectorUpload thing group.
The AWS IoT is now ready to be connected and accept messages published to one of the specified MQTT topics.
Set up Credentials
The four credential files downloaded earlier (a certificate, a private key, a public key, and an Amazon Root CA file) must be transferred to the Python application folder under
credentials/ . It is crucial that none of these files (except for the Amazon Root CA file) shall ever be exposed to the public. They must only reside in the IoT device. If they are exposed, discard them and create new ones from AWS IoT console.
Set up Python Application Configuration
The configuration of the Python program is specified in a
.env file and loaded in
pydantic package. The benefit of using
pydantic is that it performs type checking when loading
.env or environment variables. In addition, since the configuration is manifested as a
Settings class, we can easily tweak how the configuration should be exposed to its consumer (e.g. see how the credential file names are exposed in
.env file must not be committed, as it contains sensitive information. To create your own
.env file, copy and paste the following template:
# AWS IoT config
upload_topic=vehicle_detector/test/raw# Main program config
Notice that the
sensor_name is set to
vehicle_detector_1. This name must correspond to the first half of the AWS IoT thing name we created earlier, which is
vehicle_detector_1_UPLOAD. The last portion of the thing name will be fulfilled by the
endpoint can be found on the AWS console by going to AWS IoT Core → Things →
vehicle_detector_1_UPLOAD → Interact → HTTPS
upload_cert_file are the file names of the corresponding credential files downloaded earlier. We can leave
remote_cert_file blank for now.
upload_topic is the MQTT topic where the actual IoT device will upload the data. It must be one of the three topics specified in the
Set up AWS IoT Upload Client
The design for the
Upload client is to first create a generic AWS IoT client wrapper class, and then inherit the wrapper class in the
Upload client. The reason for this design is that in part II of this series, we will create another client called
Remote . It uses almost the same set up as
Upload. Thus it is easier for code maintenance to set up a generic wrapper class, from which both the
Remote class can inherit.
Set up the generic AWS IoT client wrapper
The generic AWS IoT client wrapper is set up using the
AWSIoTPythonSDK (GitHub repo link). Being a generic wrapper,
AWSIoTMQTTClientWrapper takes a
client_type param, which determines whether the client is an
Remote client. If the client is an
Upload, as is the case in this article,
self.thing_name is a concatenation of
sensor_name from the
.env file and the word
UPLOAD. Note that
self.thing_name must match the AWS IoT thing name created prior.
The generic AWS IoT client wrapper offers two clients:
self.myShadowClient , and
self.myShadowClient handles connection and disconnection of the client to the AWS IoT thing (it can also update thing shadow, but that is outside the scope of this article), while
self.myAWSIoTMQTTClient handles publishing messages to an MQTT topic.
Set up the Upload client
Setting up the
Upload client is simple, because the bulk of the work has been done in the
AWSIoTMQTTClientWrapper and defines only one method:
upload_msg. This method uses the
send method defined in
AWSIoTMQTTClientWrapper, yet it offers more flexibility regarding the MQTT topic. By default, the topic is configured in the
.env file (accessed by
settings.upload_topic). However, if a different topic is needed, it can be passed directly to the
upload_msg to overwrite the default topic.
Set up The Mock Vehicle Detection Function
detect_vehicle is a mock function for the vehicle detector. It has an internal counter
num_vehicles, which increments whenever a vehicle is detected. In reality, a vehicle detector would require sensors to determine when the counter can be incremented. In the mock implementation, this step is replaced by a random wait
It is worth noting that a
data_q is used in
detect_vehicle to accept newly created data packet. Since the
detect_vehicle function will be running in a child process,
data_q is needed to communicate and transfer data from the child process to the main process where data upload happens.
Set up Logging
This section can be safely skipped if logging to a local file is not needed.
Logging in Python with multiprocessing can be tricky, especially when the log record needs to be saved on a file. For details, please check my previous article Python3: Logging with Multiprocessing. Briefly speaking, since Python’s logging delegates all the log record from anywhere in the program to the root logger for handling, we only need to set up one root logger to handle all output work. Such handling includes where the log record ends up (console and/or a local file) and how the log record should be formatted. In our case, this root logger is the
In addition, we need to set up another root logger that collects log records from all over the program, including those from a child process, and sends them to the
output_logger synchronously. This root logger is the
queue_logger, which uses a
logger_q to collect and transfer log records. Without the
queue_logger, writing logs to a local file can invoke racing condition among the processes.
output_logger runs in its own child process. It must be set up before the
queue_logger. Both loggers must be set up ahead of any other functions/processes; otherwise, some log record will be lost. Both loggers are initiated in
Set up A Class to Handle All Child Processes
This section can be skipped if one prefers other ways to manage child processes.
Given that we have at least two child processes: one for the vehicle detecting function, and the other
output_logger, and that the procedures to create, start, and terminate these two child processes are quite similar, a central class to standardize the handling of child processes helps us reduce code repetition and improve code maintenance.
ChildProcesses uses an internal dictionary to record key information of each child process, including the process’s name, the process object, and the associated termination event. It is important to point out that only functions that accepts a termination event as its last argument can be handled by
ChildProcesses. A termination event is a simple way to end a forever loop in a child process by calling
term_event.set(), as can be seen in the
Set up The Main Entry Point
We have finally arrived at the main entry point of the program. The main entry is located at the
main function in
main function accomplishes the following tasks:
ChildProcessesclass to handle all child processes in this program.
child_processesto create and start the
output_loggerprocess. Also set up the
child_processesto create and start the
- Run a session of
detect_vehicleprocess, which includes uploading five instances of data to AWS IoT.
child_processesto terminate the
output_loggerprocess when the session ends.
Run The Program And Observe The Logs
In your, run command
pipenv run python3 main.py. You can also choose to use
python3 main.py directly, but you must make sure that you are in the virtual environment and that the environment variables have been updated with the
.env file in your current shell.
The terminal shall output logging information like this:
$ pipenv run python3 main.py
Loading .env environment variables…
2020-08-03 11:44:36,921 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized
2020-08-03 11:44:36,922 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: vehicle_detector_1_UPLOAD
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: 0
2020-08-03 11:44:36,924 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.100000 sec
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates...
2020-08-03 11:44:36,921 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 1
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
2020-08-03 11:44:36,928 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:36,928 - src.aws.aws_iot_client_wrapper - INFO - Connecting vehicle_detector_1_UPLOAD to MQTT client...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
2020-08-03 11:44:37,285 - src.aws.aws_iot_client_wrapper - INFO - vehicle_detector_1_UPLOAD ONLINE.
2020-08-03 11:44:37,286 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:39,928 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 2
2020-08-03 11:44:39,929 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:39,930 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:41,930 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 3
2020-08-03 11:44:41,932 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:41,932 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:45,935 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 4
2020-08-03 11:44:45,936 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:45,937 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:50,941 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 5
2020-08-03 11:44:50,942 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:50,942 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:51,013 - root - INFO - Program ended
2020-08-03 11:44:52,943 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detection terminated.
2020-08-03 11:44:53,947 - root - INFO - Logging terminated.
The same log records are also saved in a file called
Run The Program And Observe The Uploaded Data in Real Time
On AWS console, go to AWS IoT Core → Test. Under Subscription topic, input
vehicle_detector/test/raw, and then click Subscribe to topic.
Run the program again as described earlier, but this time keep an eye on the AWS console. As the program runs, you should see the uploaded data appear on the screen. Each data message looks like this:
It contains an epoch timestamp (in millisecond) and the current vehicle count number, both of which are the data produced by the
We have successfully produced data on an actual IoT device and uploaded it to AWS IoT. To process the uploaded data, the next step is to set up a rule that passes each data message from the MQTT topic to a downstream AWS service. However, this is outside the scope of this series.
In this article, we have shown a non-trivial example of how to set up AWS IoT and a Python program to upload data from an IoT device to an AWS IoT MQTT topic.
In Part II, we will provide a working example of how to perform command-and-response with the IoT device using AWS IoT Jobs. With a remote command-and-response capability, we will gain better control of the IoT device once it’s deployed in the field.