Device To Cloud Connectivity with Azure IoT Hub

IoT (Internet of Things) and IIoT (Industrial Internet of Things) are very common words come to our mind when we buy a new smart electronic appliance or we drive a new car or think about a sophisticated manufacturing plants.
To make the Things really intelligent not only we capture the events generated from these, we analyze the events, predict the future, visualize the details and act based on the analysis & prediction.

Depending on the use case, all or few of the above steps can be performed locally (on-premise) or, events can be transferred into cloud for complex processing.
As an example, different sophisticated devices are now coming up with its own edge computing AI/ML features embedded in the chip.
On the other hand for IIoT cases, there are software available at the plant level to display KPIs or highlight the alerts.

For large number of independent IoT devices e.g. set of sensors, cameras we may also want to integrate the events through an integration layer before pushing the data onto cloud.
Whatever it is, no one can deny the importance of cloud services to process IoT/IIoT data due to their high processing capabilities (with advanced AI/ML features), storage capabilities and enterprise data integration which may not be available in the other layers.
In this blog, we’ll take a very simple device to cloud (D2C) scenario and we will send streams of events to Azure cloud to process those in real & batch time.
The Architecture we’ll follow
To process the event streaming in real time and batch we’ll use the famous Lambda architecture. As soon as the events are pushed into the Azure IoT Hub, those will be routed into two paths:
- Hot Path — Stream Analytics to consume the events and output into a PowerBI visualization dashboard with little/no transformation.
- Cold Path — Storing the events into Azure Storage Account, transforming these periodically (any complex transformation goes here) and store as Hive tables; PowerBI visualizations to be periodically refreshed using the aggregated data from Hive.

Further implementation details are as follows:
Setting up an Azure IoT Hub
In the first step, we’ll create an Azure IoT Hub and register a device.



Once the ‘device’ has been setup, we’ll copy the Primary Connection String and verify if it’s enabled.

IoT Hub Message Routing
We’ll go to our IoT Hub and create couple of routing paths:

Path 1 — Azure Storage Account — Blob:

Path 2 — Stream Analytics: We’ll select the built-in endpoint — events and Device Telemetry Messages as data source.

Configuring the ‘Thing’
In this example, we’ll treat a laptop/computer as the ‘Thing’ i.e. a IoT device , using Azure IoT hub device SDKs. The SDKs are very useful to build apps which can run directly on the devices and send telemetries to the IoT Hub. Using the Python SDK, we’ll send laptop CPU & memory information to our IoT Hub.
Find below a sample code using the SDK:
CONNECTION_STRING = "<YOUR CONNECTION STRING>"# Define the JSON message to send to IoT Hub.
MSG_TXT = '{{"timestamp":"{timestamp}","user":"{user}","cpu_percent":{cpu_percent},"total_mem":{total_mem},"available_mem":{available_mem}}}'def iothub_client_init():
# Create an IoT Hub client
client = IoTHubDeviceClient.create_from_connection_string(CONNECTION_STRING)
return clientdef iothub_client_telemetry_sample_run():try:
client = iothub_client_init()
print ( "Sending periodic messages, press Ctrl-C to exit" )while True:
# Build the message with simulated telemetry values.
timestamp = datetime.datetime.fromtimestamp(time.time()).isoformat()
user = psutil.users()[0].name
mem = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent()
total_mem = mem.total/(1024*1024*1024)
available_mem = mem.available/(1024*1024*1024)
msg_txt_formatted = MSG_TXT.format(timestamp=timestamp, user=user, cpu_percent=str(round(cpu_percent,2)), total_mem=str(round(total_mem,2)), available_mem=str(round(available_mem,2)))
message = Message(msg_txt_formatted)# Add a custom application property to the message.
# An IoT hub can filter on these properties without access to the message body.
if cpu_percent > 80:
message.custom_properties["cpuAlert"] = "true"
else:
message.custom_properties["cpuAlert"] = "false"# Send the message.
print( "Sending message: {}".format(message) )
client.send_message(message)
print ( "Message successfully sent" )
time.sleep(1)
Batch Processing — Using Azure Databricks with Azure Data Factory
Once events reach to the IoT Hub, it emits the events to the Azure Storage we have configured. The data will be stored in appropriate partitions.

We’ll use Apache Spark (Azure Databricks) to read the records, aggregate as per our requirements and store as Hive tables. Azure Databricks notebook will be invoked by an Azure Data Factory v2 pipeline at regular intervals (e.g. daily).
We can use the following code to read the JSON blobs with appropriate schema:
// Define the Schemaimport org.apache.spark.sql.types._val cpuInfo = new StructType()
.add("available_mem", DoubleType)
.add("cpu_percent", DoubleType)
.add("timestamp", StringType)
.add("total_mem", DoubleType)
.add("user", StringType)val properties = new StructType()
.add("cpuAlert", StringType)val systemProperties = new StructType()
.add("connectionAuthMethod", StringType)
.add("connectionDeviceGenerationId", StringType)
.add("connectionDeviceId", StringType)
.add("contentEncoding", StringType)
.add("contentType", StringType)
.add("enqueuedTime", StringType)val event = new StructType()
.add ("Body", cpuInfo)
.add ("EnqueuedTimeUtc", StringType)
.add ("Properties", properties)
.add ("SystemProperties", systemProperties)// Read the events
val eventsDf = spark.read
.schema(event)
.json(s"wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>/*/*/*/*/*/*")
Select the required columns:
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.to_dateval deviceDF = eventsDf
.withColumn("Available_Memory", col("Body.available_mem"))
.withColumn("Total_Memory", col("Body.total_mem"))
.withColumn("CPU_Percent", col("Body.cpu_percent"))
.withColumn("CPU_Alert", col("Properties.cpuAlert"))
.withColumn("Logon_User", col("Body.user"))
.withColumn("Device_Timestamp", col("Body.timestamp"))
.withColumn("Connection_Device_Id", col ("SystemProperties.connectionDeviceId"))
.withColumn("Event_Eenqueued_Time", col ("SystemProperties.enqueuedTime"))
.withColumn("Event_Eenqueued_Date", to_date($"Event_Eenqueued_Time", "yyyy-MM-dd"))
.select ("Available_Memory", "Total_Memory", "CPU_Percent", "CPU_Alert", "Logon_User", "Device_Timestamp", "Connection_Device_Id", "Event_Eenqueued_Time", "Event_Eenqueued_Date")
Calculate the average & maximum CPU%:
import org.apache.spark.sql.functions._val avgCPU_DayDf = deviceDF
.groupBy("Event_Eenqueued_Date")
.agg(avg("CPU_Percent"), max("CPU_Percent"))
.withColumnRenamed ("avg(CPU_Percent)", "Avg_CPU_Percent")
.withColumnRenamed ("max(CPU_Percent)", "Max_CPU_Percent")
.sort(desc("Event_Eenqueued_Date"))
Calculate the total number of alerts received because of higher CPU usages:
val avgCPU_AlertDf = deviceDF.where ($"CPU_Alert" === "true")
.groupBy("Event_Eenqueued_Date")
.agg(count("CPU_Alert"))
.withColumnRenamed ("count(CPU_Alert)", "Total_CPU_Alert")
.sort(desc("Event_Eenqueued_Date"))
Save the CPU usages & CPU alert Spark DataFrames into Data Lake:
avgCPU_DayDf.write.mode("overwrite").parquet("/mnt/iot/device1/CPU_Info")
avgCPU_AlertDf.write.mode("overwrite").parquet("/mnt/iot/device1/CPU_Alert")
Create two Hive tables on the Data Lake locations:
CREATE TABLE CPU_Info
USING PARQUET
LOCATION '/mnt/iot/device1/CPU_Info';CREATE TABLE CPU_Alert
USING PARQUET
LOCATION '/mnt/iot/device1/CPU_Alert';


(In our architecture, we may need to provision an Azure SQL Database or Cosmos DB to store reference data.)
Configure Stream Analytics
We’ll create an Azure Stream Analytics instance, add the IoT Hub as an input and Power BI as output. For detailed steps follow here.
The Stream Analytics query will look like the following (if we’re not transforming the input):

We can start the Stream Analytics job once we’re happy with the configurations.
Power BI Dashboard
Once the Power BI service refreshes the datasets from Hive tables (e.g. daily refresh) and real time events are coming via Stream Analytics, the Power BI widgets will start displaying the graphs. In the following image, top two widgets in white background are updated in real time whereas the following two black widgets are getting refreshed daily from the two Hive tables (CPU_Info, CPU_Alert). We can use Power BI mobile apps to access the dashboards while we’re on move!

Points to note
- For simple one direction D2C connectivity we can use Azure Event Hub as this will be cheaper than IoT Hub.
- For D2C and C2D (cloud to device) connectivity we need to use Azure IoT Hub. Verify this before taking the decision.
- To connect multiple devices we can use some integration services like Sigfox and channelize the integrated/clubbed events to IoT Hub or Event Hub based on C2D/D2C scenarios.
- For IIoT cases, generally factory sensors, devices are kept inside a private network and those are configured in an OPC server e.g. KEPServerEX. KEPServerEX can be connected to Azure IoT Hub directly or via Azure IoT Edge.
In the next blog, we’ll talk about configuring KEPServerEX to connect Azure IoT Hub directly to transfer events generated from factory devices.