Integrate IoT Device with AWS IoT using Python — Part I: upload data to MQTT topic

Workflow of uploading data from IoT device to AWS IoT MQTT topic

Update 08/15/2021

Introduction

This series attempt to build a fairly robust system for the IoT device. Thus, the source code follows proper design, linting, and type checking to the best of my effort. Unfortunately, due to time constraint, unit test is currently not included.

The IoT device chosen for this series is a mock vehicle detector. It “detects” whether a vehicle has passed in front of its line of sight and increments an internal counter. Note that there is no real vehicle detector. I am using it just as an example whose functionalities are common among many IoT device: collect data and upload it to the cloud periodically. In the source code, the functionality of the vehicle detector is mocked by a simple function.

Before You Start

Download the GitHub repo and switch to the upload_data tag (to be more specific, we first create a new branch, then move the HEAD to the new branch, and finally sync the new branch with the upload_data tag):

git clone https://github.com/FanchenBao/aws_iot_integration.git
cd aws_iot_integration
git checkout -b new_branch_name upload_data

The directory tree looks like this:

.
├── Pipfile
├── Pipfile.lock
├── config.py
├── credentials
│ └── readme.md
├── main.py
├── pyproject.toml
├── setup.cfg
└── src
├── __init__.py
├── aws
│ ├── __init__.py
│ └── aws_iot_client_wrapper.py
├── child_processes
│ ├── __init__.py
│ └── child_processes.py
├── clients
│ ├── __init__.py
│ └── upload.py
├── errors
│ ├── __init__.py
│ └── network_connection_error.py
├── logger
│ ├── __init__.py
│ ├── logger_config.py
│ └── ouput.py
└── vehicle_detector
├── __init__.py
└── detect_vehicle.py

Use pipenv to set up the virtual environment for the source code. If you don’t have pipenv , refer to its documentation regarding how to install it.

Run command:

pipenv install

This command installs all the regular packages contained in the Pipfie, specifically AWSIoTPythonSDK and boto3. It will take a few minutes. Once it is done, run command pipenv shell to enter the virtual environment.

If you want to expand the source code, I would recommend you do pipenv install --dev instead, which installs all the development packages that help with linting and type checking. After installing all the dev packages, run pre-commit install. This sets up the pre-commit hook that runs linting and type-checking each time you commit new changes.

Set up AWS IoT

Create a thing type

Create a thing group

To create a thing group, go to AWS IoT Core → Manage → Thing groups → Create → Create Thing Group. Under Name, input VehicleDetectorUpload . Under Description, input “The upload group for vehicle detector. All devices in this group are for uploading live data to the MQTT topic.”. Then click Create thing group.

Create policies

To create the connect policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input vehicle_detector_connect_policy . Choose Advanced mode, and paste the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": [
"arn:aws:iot:[region]:[aws_account]:client/${iot:Connection.Thing.ThingName}"
]
}
]
}

In the policy, [region] refers to the region (e.g. us-east-1) of your AWS account, and [aws_account] your AWS account number. This policy allows an actual IoT device to connect to AWS IoT provided that the thing name specified in the Python program on the actual IoT device matches the thing name of the IoT thing we are about to create. Then click Create.

To create the VehicleDetectorUpload group policy, go to AWS IoT Core → Secure → Policies → Create. Under Name, input vehicle_detector_upload_group_policy . Choose Advanced mode, and paste the following policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/test/raw",
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/dev/raw",
"arn:aws:iot:[region]:[aws_account]:topic/vehicle_detector/prod/raw"
]
}
]
}

This policy allows all the IoT things contained within the VehicleDetectorUpload group to publish messages to three MQTT topics vehicle_detector/test/raw , vehicle_detector/dev/raw , and vehicle_detector/prod/raw . The topics are separated for different environment in order to facilitate the software development cycle.

Create IoT thing

Click Create certificate.

Download four files: A certificate for this thing, A public key, A private key, and Amazon Root CA 1. These four files are the credentials that must reside in the actual IoT device. They are used to authenticate the device when it attempts to connect to AWS IoT. For details of how asymmetric encryption and certificate authority work in TLSv1.2 Mutual Authentication, please refer to this wonderfully explained video (it’s about HTTPS, but the underlying mechanism is similar). Keep all four credential files in a secure place. We will need them later.

Click Attach a policy, choose vehicle_detector_connect_policy , and click Register Thing. This attaches the connect policy to the IoT thing and register the thing on AWS IoT.

Attach group policy

Summary

The AWS IoT is now ready to be connected and accept messages published to one of the specified MQTT topics.

Set up Credentials

Set up Python Application Configuration

The configuration of the Python program is specified in a .env file and loaded in config.py using pydantic package. The benefit of using pydantic is that it performs type checking when loading .env or environment variables. In addition, since the configuration is manifested as a Settings class, we can easily tweak how the configuration should be exposed to its consumer (e.g. see how the credential file names are exposed in Settings).

The .env file must not be committed, as it contains sensitive information. To create your own .env file, copy and paste the following template:

# AWS IoT config
sensor_name=vehicle_detector_1
endpoint=[iot_thing_https_endpoint]
port=8883
root_ca=AmazonRootCA1.pem
upload_private_key=[credential_id]-private.pem.key
upload_cert_file=[credential_id]-certificate.pem.crt
remote_private_key=
remote_cert_file=
upload_topic=vehicle_detector/test/raw
# Main program config
total_iterations=5
debug=False

Notice that the sensor_name is set to vehicle_detector_1. This name must correspond to the first half of the AWS IoT thing name we created earlier, which is vehicle_detector_1_UPLOAD. The last portion of the thing name will be fulfilled by the Upload client.

The endpoint can be found on the AWS console by going to AWS IoT Core → Settings → Device data endpoint

upload_private_key and upload_cert_file are the file names of the corresponding credential files downloaded earlier. We can leave remote_private_key and remote_cert_file blank for now.

upload_topic is the MQTT topic where the actual IoT device will upload the data. It must be one of the three topics specified in the vehicle_detector_upload_group_policy.

Set up AWS IoT Upload Client

Set up the generic AWS IoT client wrapper

The generic AWS IoT client wrapper is set up using the AWSIoTPythonSDK (GitHub repo link). Being a generic wrapper, AWSIoTMQTTClientWrapper takes a client_type param, which determines whether the client is an Upload or Remote client. If the client is an Upload, as is the case in this article, self.thing_name is a concatenation of sensor_name from the .env file and the word UPLOAD. Note that self.thing_name must match the AWS IoT thing name created prior.

The generic AWS IoT client wrapper offers two clients: self.myShadowClient , and self.myAWSIoTMQTTClient. self.myShadowClient handles connection and disconnection of the client to the AWS IoT thing (it can also update thing shadow, but that is outside the scope of this article), while self.myAWSIoTMQTTClient handles publishing messages to an MQTT topic.

Set up the Upload client

Setting up the Upload client is simple, because the bulk of the work has been done in the AWSIoTMQTTClientWrapper. Upload inherits AWSIoTMQTTClientWrapper and defines only one method: upload_msg. This method uses the send method defined in AWSIoTMQTTClientWrapper, yet it offers more flexibility regarding the MQTT topic. By default, the topic is configured in the .env file (accessed by settings.upload_topic). However, if a different topic is needed, it can be passed directly to the upload_msg to overwrite the default topic.

Set up The Mock Vehicle Detection Function

The function detect_vehicle is a mock function for the vehicle detector. It has an internal counter num_vehicles, which increments whenever a vehicle is detected. In reality, a vehicle detector would require sensors to determine when the counter can be incremented. In the mock implementation, this step is replaced by a random wait sleep(randint(1, 5)).

It is worth noting that a data_q is used in detect_vehicle to accept newly created data packet. Since the detect_vehicle function will be running in a child process, data_q is needed to communicate and transfer data from the child process to the main process where data upload happens.

Set up Logging

This section can be safely skipped if logging to a local file is not needed.

Logging in Python with multiprocessing can be tricky, especially when the log record needs to be saved on a file. For details, please check my previous article Python3: Logging with Multiprocessing. Briefly speaking, since Python’s logging delegates all the log record from anywhere in the program to the root logger for handling, we only need to set up one root logger to handle all output work. Such handling includes where the log record ends up (console and/or a local file) and how the log record should be formatted. In our case, this root logger is the output_logger.

In addition, we need to set up another root logger that collects log records from all over the program, including those from a child process, and sends them to the output_logger synchronously. This root logger is the queue_logger, which uses a logger_q to collect and transfer log records. Without the queue_logger, writing logs to a local file can invoke racing condition among the processes.

The output_logger runs in its own child process. It must be set up before the queue_logger. Both loggers must be set up ahead of any other functions/processes; otherwise, some log record will be lost. Both loggers are initiated in main.py.

Set up A Class to Handle All Child Processes

This section can be skipped if one prefers other ways to manage child processes.

Given that we have at least two child processes: one for the vehicle detecting function, and the other output_logger, and that the procedures to create, start, and terminate these two child processes are quite similar, a central class to standardize the handling of child processes helps us reduce code repetition and improve code maintenance.

The class ChildProcesses uses an internal dictionary to record key information of each child process, including the process’s name, the process object, and the associated termination event. It is important to point out that only functions that accepts a termination event as its last argument can be handled by ChildProcesses. A termination event is a simple way to end a forever loop in a child process by calling term_event.set(), as can be seen in the ChildProcesses.terminate() method.

Set up The Main Entry Point

We have finally arrived at the main entry point of the program. The main entry is located at the main function in main.py. The main function accomplishes the following tasks:

  1. Instantiate child_processes from the ChildProcesses class to handle all child processes in this program.
  2. Initialize data_q and logger_q.
  3. Use child_processes to create and start the output_logger process. Also set up the queue_logger.
  4. Use child_processes to create and start the detect_vehicle process.
  5. Run a session of detect_vehicle process, which includes uploading five instances of data to AWS IoT.
  6. Use child_processes to terminate the detect_vehicle and the output_logger process when the session ends.

Run The Program And Observe The Logs

The terminal shall output logging information like this:

$ pipenv run python3 main.py
Loading .env environment variables…
2020-08-03 11:44:36,921 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized
2020-08-03 11:44:36,922 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: vehicle_detector_1_UPLOAD
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
2020-08-03 11:44:36,923 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: 0
2020-08-03 11:44:36,924 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.100000 sec
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
2020-08-03 11:44:36,925 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates...
2020-08-03 11:44:36,921 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 1
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
2020-08-03 11:44:36,927 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
2020-08-03 11:44:36,928 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
2020-08-03 11:44:36,928 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:36,928 - src.aws.aws_iot_client_wrapper - INFO - Connecting vehicle_detector_1_UPLOAD to MQTT client...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
2020-08-03 11:44:36,929 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
2020-08-03 11:44:37,285 - src.aws.aws_iot_client_wrapper - INFO - vehicle_detector_1_UPLOAD ONLINE.
2020-08-03 11:44:37,286 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:39,928 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 2
2020-08-03 11:44:39,929 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:39,930 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:41,930 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 3
2020-08-03 11:44:41,932 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:41,932 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:45,935 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 4
2020-08-03 11:44:45,936 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:45,937 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:50,941 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detected! Current number of vehicles: 5
2020-08-03 11:44:50,942 - root - INFO - Publishing data to topic vehicle_detector/test/raw...
2020-08-03 11:44:50,942 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2020-08-03 11:44:51,013 - root - INFO - Program ended
2020-08-03 11:44:52,943 - src.vehicle_detector.detect_vehicle - INFO - Vehicle detection terminated.
2020-08-03 11:44:53,947 - root - INFO - Logging terminated.

The same log records are also saved in a file called app.log.

Run The Program And Observe The Uploaded Data in Real Time

Run the program again as described earlier, but this time keep an eye on the AWS console. As the program runs, you should see the uploaded data appear on the screen. Each data message looks like this:

{
"timestamp": 1596470034182,
"cur_vehicle_count": 1
}

It contains an epoch timestamp (in millisecond) and the current vehicle count number, both of which are the data produced by the detect_vehicle function.

We have successfully produced data on an actual IoT device and uploaded it to AWS IoT. To process the uploaded data, the next step is to set up a rule that passes each data message from the MQTT topic to a downstream AWS service. However, this is outside the scope of this series.

Summary

In Part II, we will provide a working example of how to perform command-and-response with the IoT device using AWS IoT Jobs. With a remote command-and-response capability, we will gain better control of the IoT device once it’s deployed in the field.

Hi, I am from the Earth. And you?