14

Real-Time Pulsar and Python Apps on a Pi

 2 years ago
source link: https://dzone.com/articles/five-sensors-real-time-with-pulsar-and-python-on-a
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Real-Time Pulsar and Python Apps on a Pi

Build a Python application on a Raspberry Pi that streams sensor data and more from the edge to any and all data stores while processing data in event time.

Apr. 05, 22 · IoT Zone · Tutorial

Join the DZone community and get the full member experience.

Join For Free

Today we will look at the easy way to build Python streaming applications from the edge to the cloud. Let's walk through how to build a Python application on a Raspberry Pi that streams sensor data and more from the edge to any and all data stores while processing data in event time.

My GitHub repository has all of the code, configuration, and scripts needed to build and run this application.

If you wish to use the same data source as I did, you will need to purchase a Raspberry Pi 3 or 4.  I know it is hard to obtain a Pi now, but you may already have one. I highly recommend the Pimoroni Breakout Garden Hat, as it allows you to run an interchangeable amount of sensors with no soldering or difficulty. If you are doing Python programming like we are, they provide solid drivers for all the sensors.

Gear/Hardware

  • Raspberry Pi 3 Model B Rev 1.2, Bullseye Raspian, armv71
  • Pimoroni Breakout Garden Hat
  • 1.12" Mono OLED Breakout 128x128 White/Black Screen
  • BME680 Air Quality, Temperature, Pressure, Humidity Sensor
  • LWM303D 6D0F Motion Sensor (X, Y, Z Axes)
  • BH1745 Luminance and Color Sensor
  • LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux
  • VL53L1X Time of Flight (TOF) Sensor

Fully Setup Raspberry Pi 3 with Garden Hat and Sensors

Fully Setup Raspberry Pi 3 with Garden Hat and Sensors

Software/Libraries

  • Python 3.9
  • Pulsar Python Client 2.10 (avro) pip3 install pulsar-client[avro]
  • Python Breakout Garden
  • Python PSUTIL https://pypi.org/project/psutil/
  • Python LUMA OLED pip3 install --upgrade luma.oled
  • Libraries: sudo apt-get install python3 python3-pip python3-pil libjpeg-dev zlib1g-dev libfreetype6-dev liblcms2-dev libopenjp2-7 libtiff5 -y

I recommend you install Python version 3.9 or later on your Pi, which may require you to update to Pi Bullseye OS edition with the latest drivers. This update could take a while, as there are a lot of related libraries, SDKs, and build tools to update. Once you have rebooted and have the latest Python, you will need to install the breakout garden libraries and related drivers. We are also utilizing the Python library PSUtil, which will give us access to Pi hardware metrics such as disk space, CPU, and network information. We like to report this data as part of the data or in metadata. This is helpful when you expand to thousands of devices.

Python Edge Application with Apache Pulsar, Apache NiFi, and Apache FlinkPython Edge Application with Apache Pulsar, Apache NiFi, and Apache Flink

Python Edge Application with Apache Pulsar, Apache NiFi, and Apache Flink

In the above diagram, we can see the full application that we are going to build today from the sensor to the final data stores. In this application, you will build a Pulsar producer that sends data to the pi-sensors topic. This topic will be consumed by a continuous Flink SQL query, Apache NiFi, and Apache Spark SQL query. This will send data to a NoSQL store and to the file storage of your choice.

The first thing to do is to create the topic that will act as your real-time conduit for sensor and device data. This topic will live under a tenant and namespace inside our Pulsar cluster. I recommend you create a tenant and namespace for this application. We are utilizing the default ones in the code below. You will need the Pulsar Admin client installed. You can also create your topic from the StreamNative Cloud Web UI if you are utilizing StreamNative for your Apache Pulsar hosting needs.

StreamOps

bin/pulsar-admin topics create "persistent://public/default/pi-sensors"

Now that you have a topic, you can begin sending data to it.

First, let's build a schema to model our data, as this is a best practice.  We can use JSON or AVRO.

Python
import pulsar
import logging
from pulsar.schema import *
from pulsar.schema import AvroSchema
from pulsar.schema import JsonSchema
from pulsar import Client, AuthenticationOauth2
class breakoutsensor(Record):
    uuid = String()
    ipaddress = String()
    cputempf = Integer()
    runtime = Integer()
    host = String()
    hostname = String()
    macaddress = String()
    endtime = String()
    te = String()
    cpu = Float()
    diskusage = String()
    memory = Float()
    rowid = String()
    systemtime = String()
    ts = Integer()
    starttime = String()
    BH1745_red = Float()
    BH1745_green = Float()
    BH1745_blue = Float()
    BH1745_clear = Float()
    VL53L1X_distance_in_mm = Float()
    ltr559_lux = Float()
    ltr559_prox = Float()
    bme680_tempc = Float()
    bme680_tempf = Float()
    bme680_pressure = Float()
    bme680_humidity = Float()
    lsm303d_accelerometer = String() 
    lsm303d_magnetometer = String() 

Now that we have defined our schema, let's build the code to grab our sensor readings and send them as structured JSON to our Pulsar topic.

Python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os.path
import re
import sys
import os
from time import sleep
from time import gmtime, strftime
import numpy as np
import datetime
import subprocess
import base64
import uuid
import datetime
import traceback
import math
import random, string
import socket
import base64
import json
import math
import time
import psutil
import socket
# import paho.mqtt.client as paho
from time import gmtime, strftime
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import sh1106
#
# Sensors
#
from bh1745 import BH1745
from ltr559 import LTR559
import VL53L1X
import ltr559
import bme680
from lsm303d import LSM303D
import pulsar
import logging
from pulsar.schema import *
from pulsar.schema import AvroSchema
from pulsar.schema import JsonSchema
from pulsar import Client, AuthenticationOauth2
### 
#https://pulsar.apache.org/docs/en/client-libraries-python/
# http://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema 
class breakoutsensor(Record):
    uuid = String()
    ipaddress = String()
    cputempf = Integer()
    runtime = Integer()
    host = String()
    hostname = String()
    macaddress = String()
    endtime = String()
    te = String()
    cpu = Float()
    diskusage = String()
    memory = Float()
    rowid = String()
    systemtime = String()
    ts = Integer()
    starttime = String()
    BH1745_red = Float()
    BH1745_green = Float()
    BH1745_blue = Float()
    BH1745_clear = Float()
    VL53L1X_distance_in_mm = Float()
    ltr559_lux = Float()
    ltr559_prox = Float()
    bme680_tempc = Float()
    bme680_tempf = Float()
    bme680_pressure = Float()
    bme680_humidity = Float()
    lsm303d_accelerometer = String() 
    lsm303d_magnetometer = String() 
# parse arguments
parse = argparse.ArgumentParser(prog='breakoutsensor.py')
parse.add_argument('-su', '--service-url', dest='service_url', type=str, required=True,
                   help='The pulsar service you want to connect to')
parse.add_argument('-t', '--topic', dest='topic', type=str, required=True,
                   help='The topic you want to produce to')
parse.add_argument('-n', '--number', dest='number', type=int, default=1,
                   help='The number of message you want to produce')
parse.add_argument('--auth-params', dest='auth_params', type=str, default="",
                   help='The auth params which you need to configure the client')
args = parse.parse_args()
# yyyy-mm-dd hh:mm:ss
currenttime= strftime("%Y-%m-%d %H:%M:%S",gmtime())
host = os.uname()[1]
def do_nothing(obj):
    pass
def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))
def IP_address():
        try:
            s = socket.socket(socket_family, socket.SOCK_DGRAM)
            s.connect(external_IP_and_port)
            answer = s.getsockname()
            s.close()
            return answer[0] if answer else None
        except socket.error:
            return None
# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if i.family == psutil.AF_LINK:
                return i.address
# - start timing
starttime = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
start = time.time()
external_IP_and_port = ('198.41.0.4', 53)  # a.root-servers.net
socket_family = socket.AF_INET
# Set up OLED
oled = sh1106(i2c(port=1, address=0x3C), rotate=2, height=128, width=128)
oled.cleanup = do_nothing
# Set Constants
MAX_DISTANCE_MM = 800 # Distance at which our bar is full
TRIGGER_DISTANCE_MM = 80
# Ip address
host_name = socket.gethostname()
ipaddress = IP_address() 
# bh1745
bh1745 = BH1745()
bh1745.setup()
bh1745.set_leds(1)
r, g, b, c = bh1745.get_rgbc_raw()
bh1745.set_leds(0)
# VL53L1X
tof = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
# ltr559
ltr559 = LTR559()
# lsm303d
lsm = LSM303D(0x1d)
#print(args.service_url)
#print(args.auth_params)
# connect to pulsar
if (len(args.auth_params) == 0 ):
   client = pulsar.Client(args.service_url)
else:
   client = pulsar.Client(args.service_url, authentication=AuthenticationOauth2(args.auth_params))
#sensorschema = AvroSchema(breakoutsensor)
#print("Schema info is: " + sensorschema.schema_info().schema())
#producer = client.create_producer(topic='persistent://public/default/pi-sensors-avro' ,schema=sensorschema,properties={"producer-name": "sensoravro-py-sensor","producer-id": "sensor-avro-sensor" })
producer = client.create_producer(topic=args.topic ,schema=JsonSchema(breakoutsensor),properties={"producer-name": "sensor-py-sensor","producer-id": "sensor-sensor" })
# mqtt
#client = paho.Client()
# loop forever
try:
  while True:
    tof.open() # Initialise the i2c bus and configure the sensor
    tof.start_ranging(2) # Start ranging, 1 = Short Range, 2 = Medium Range, 3 = Long Range
    tof.stop_ranging() # Stop ranging
    distance_in_mm = tof.get_distance() # Grab the range in mm
    distance_in_mm = min(MAX_DISTANCE_MM, distance_in_mm) # Cap at our MAX_DISTANCE
    ltr559.update_sensor()
    lux = ltr559.get_lux()
    prox = ltr559.get_proximity()
    lsm3accl = lsm.accelerometer()
    lsm3mag = lsm.magnetometer()
    # bme680
    try:
        sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY)
    except IOError:
        sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY)
    sensor.set_humidity_oversample(bme680.OS_2X)
    sensor.set_pressure_oversample(bme680.OS_4X)
    sensor.set_temperature_oversample(bme680.OS_8X)
    sensor.set_filter(bme680.FILTER_SIZE_3)
    sensor.set_gas_status(bme680.ENABLE_GAS_MEAS)
    sensor.set_gas_heater_temperature(320)
    sensor.set_gas_heater_duration(150)
    sensor.select_gas_heater_profile(0)
    bh1745.set_leds(1)
    r, g, b, c = bh1745.get_rgbc_raw()
    bh1745.set_leds(0)
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    uniqueid = 'snr_{0}'.format(strftime("%Y%m%d%H%M%S",gmtime()))
    cpuTemp= int(float(getCPUtemperature()))
    cputempf = int(round(9.0/5.0 * float(cpuTemp) + 32))
    usage = psutil.disk_usage("/")
    end = time.time()
    sensorRec = breakoutsensor()
    sensorRec.uuid = uniqueid
    sensorRec.ipaddress = ipaddress
    sensorRec.cputempf = int(cputempf)
    sensorRec.runtime =  int(round(end - start)) 
    sensorRec.host = os.uname()[1]
    sensorRec.hostname = host_name
    sensorRec.macaddress = psutil_iface('wlan0')
    sensorRec.endtime = '{0}'.format( str(end ))
    sensorRec.te = '{0}'.format(str(end-start))
    sensorRec.cpu = float(psutil.cpu_percent(interval=1))
    sensorRec.diskusage = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
    sensorRec.memory = float(psutil.virtual_memory().percent)
    sensorRec.rowid = str(uuid2)
    sensorRec.systemtime = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
    sensorRec.ts =  int( time.time() )
    sensorRec.starttime = str(starttime)
    sensorRec.BH1745_red = float('{:3.1f}'.format(r))
    sensorRec.BH1745_green = float('{:3.1f}'.format(g))
    sensorRec.BH1745_blue = float('{:3.1f}'.format(b))
    sensorRec.BH1745_clear = float('{:3.1f}'.format(c))
    sensorRec.VL53L1X_distance_in_mm = float(distance_in_mm)
    sensorRec.ltr559_lux = float('{:06.2f}'.format(lux))
    sensorRec.ltr559_prox = float('{:04d}'.format(prox))
    sensorRec.bme680_tempc = float('{0:.2f}'.format(sensor.data.temperature))
    sensorRec.bme680_tempf = float('{0:.2f}'.format((sensor.data.temperature * 1.8) + 32))
    sensorRec.bme680_pressure = float('{0:.2f}'.format(sensor.data.pressure))
    sensorRec.bme680_humidity = float('{0:.3f}'.format(sensor.data.humidity))
    sensorRec.lsm303d_accelerometer = "{:+06.2f}g : {:+06.2f}g : {:+06.2f}g".format(*lsm3accl)
    sensorRec.lsm303d_magnetometer = "{:+06.2f} : {:+06.2f} : {:+06.2f}".format(*lsm3mag)
    print(sensorRec)
    producer.send(sensorRec,partition_key=uniqueid)
    #client.connect("pulsar1", 1883, 60)
    #client.publish("aqsensor", payload=str(sensorRec))
    with canvas(oled) as draw:
           draw.rectangle(oled.bounding_box, outline="white", fill="black")
           draw.text((0, 0), "- Apache Pulsar -", fill="white")
           draw.text((0, 10), ipaddress, fill="white")
           draw.text((0, 20), starttime, fill="white")
           draw.text((0, 30), 'Temp: {}'.format( sensor.data.temperature ), fill="white")
           draw.text((0, 40), 'Humidity: {}'.format( sensor.data.humidity ), fill="white")
           draw.text((0, 50), 'Pressure: {}'.format( sensor.data.pressure ), fill="white")
           draw.text((0, 60), 'Distance: {}'.format(str(distance_in_mm)), fill="white")
           draw.text((0, 70), 'CPUTemp: {}'.format( cpuTemp ), fill="white")
           draw.text((0, 80), 'TempF: {}'.format( sensorRec.bme680_tempf ), fill="white")
           draw.text((0, 90), 'A: {}'.format(sensorRec.lsm303d_accelerometer), fill="white")
           draw.text((0, 100), 'M: {}'.format(sensorRec.lsm303d_magnetometer), fill="white")
           draw.text((0, 110), 'DU: {}'.format(sensorRec.diskusage), fill="white")
           #time.sleep(0.5)
except KeyboardInterrupt:
  pass
client.close()

In the next step, we will run our device and start producing records as we can see below.

Device Running

VL53L0X_GetDeviceInfo:
Device Name : VL53L1 cut1.1
Device Type : VL53L1
Device ID : 
ProductRevisionMajor : 1
ProductRevisionMinor : 15
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'snr_20220323200032', 'ipaddress': '192.168.1.229', 'cputempf': 99, 'runtime': 154, 'host': 'piups', 'hostname': 'piups', 'macaddress': 'b8:27:eb:4a:4b:61', 'endtime': '1648065632.645613', 'te': '154.00473523139954', 'cpu': 0.0, 'diskusage': '3895.3 MB', 'memory': 21.5, 'rowid': '20220323200032_6a66f9ea-1273-4e5d-b150-9300f6272482', 'systemtime': '03/23/2022 16:00:33', 'ts': 1648065633, 'starttime': '03/23/2022 15:57:58', 'BH1745_red': 112.2, 'BH1745_green': 82.0, 'BH1745_blue': 63.0, 'BH1745_clear': 110.0, 'VL53L1X_distance_in_mm': -1185.0, 'ltr559_lux': 6.65, 'ltr559_prox': 0.0, 'bme680_tempc': 23.6, 'bme680_tempf': 74.48, 'bme680_pressure': 1017.48, 'bme680_humidity': 33.931, 'lsm303d_accelerometer': '-00.08g : -01.00g : +00.01g', 'lsm303d_magnetometer': '+00.06 : +00.30 : +00.07'}
VL53L1X Start Ranging Address 0x29

We can easily test that data is entering our topic using the command line consumer tool.   No code is required.

Consumer

bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisnsrgrdnrdr" -n 0

If we wish to do some reporting, querying, or building SQL dashboards, we can use Pulsar SQL which runs on Presto. We can use any JDBC tool to do this or the build in the Pulsar SQL command line interface shown below.

SQL Consumers

Pulsar SQL/Presto/Trino


desc pulsar."public/default"."pi-sensors";

         Column         |   Type    | Extra |                                   Comment                                   
------------------------+-----------+-------+-----------------------------------------------------------------------------
 uuid                   | varchar   |       | ["null","string"]                                                           
 ipaddress              | varchar   |       | ["null","string"]                                                           
 cputempf               | integer   |       | ["null","int"]                                                              
 runtime                | integer   |       | ["null","int"]                                                              
 host                   | varchar   |       | ["null","string"]                                                           
 hostname               | varchar   |       | ["null","string"]                                                           
 macaddress             | varchar   |       | ["null","string"]                                                           
 endtime                | varchar   |       | ["null","string"]                                                           
 te                     | varchar   |       | ["null","string"]                                                           
 cpu                    | real      |       | ["null","float"]                                                            
 diskusage              | varchar   |       | ["null","string"]                                                           
 memory                 | real      |       | ["null","float"]                                                            
 rowid                  | varchar   |       | ["null","string"]                                                           
 systemtime             | varchar   |       | ["null","string"]                                                           
 ts                     | integer   |       | ["null","int"]                                                              
 starttime              | varchar   |       | ["null","string"]                                                           
 bh1745_red             | real      |       | ["null","float"]                                                            
 bh1745_green           | real      |       | ["null","float"]                                                            
 bh1745_blue            | real      |       | ["null","float"]                                                            
 bh1745_clear           | real      |       | ["null","float"]                                                            
 vl53l1x_distance_in_mm | real      |       | ["null","float"]                                                            
 ltr559_lux             | real      |       | ["null","float"]                                                            
 ltr559_prox            | real      |       | ["null","float"]                                                            
 bme680_tempc           | real      |       | ["null","float"]                                                            
 bme680_tempf           | real      |       | ["null","float"]                                                            
 bme680_pressure        | real      |       | ["null","float"]                                                            
 bme680_humidity        | real      |       | ["null","float"]                                                            
 lsm303d_accelerometer  | varchar   |       | ["null","string"]                                                           
 lsm303d_magnetometer   | varchar   |       | ["null","string"]                                                           
 __partition__          | integer   |       | The partition number which the message belongs to                           
 __event_time__         | timestamp |       | Application defined timestamp in milliseconds of when the event occurred    
 __publish_time__       | timestamp |       | The timestamp in milliseconds of when event as published                    
 __message_id__         | varchar   |       | The message ID of the message used to generate this row                     
 __sequence_id__        | bigint    |       | The sequence ID of the message used to generate this row                    
 __producer_name__      | varchar   |       | The name of the producer that publish the message used to generate this row 
 __key__                | varchar   |       | The partition key for the topic                                             
 __properties__         | varchar   |       | User defined properties                                                     
(37 rows)

presto> select * from pulsar."public/default"."pi-sensors";
        uuid        |   ipaddress   | cputempf | runtime | host  | hostname |    macaddress     |      endtime       |         te         | cpu | disk
--------------------+---------------+----------+---------+-------+----------+-------------------+--------------------+--------------------+-----+-----
 snr_20220323180318 | 192.168.1.229 |       99 |       4 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058598.8543017 | 4.47935152053833   | 0.2 | 3895
 snr_20220323180324 | 192.168.1.229 |       99 |      10 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058604.4054732 | 10.03052306175232  | 0.0 | 3895
 snr_20220323180329 | 192.168.1.229 |       99 |      16 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058609.8929565 | 15.518006324768066 | 6.5 | 3895
 snr_20220323180335 | 192.168.1.229 |       99 |      21 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058615.3783045 | 21.00335431098938  | 0.2 | 3895
 snr_20220323180340 | 192.168.1.229 |       99 |      26 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058620.8675282 | 26.49257802963257  | 4.6 | 3895
 snr_20220323180346 | 192.168.1.229 |       99 |      32 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058626.3639522 | 31.989001989364624 | 0.0 | 3895
 snr_20220323180351 | 192.168.1.229 |       99 |      38 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058631.8793604 | 37.50441026687622  | 0.0 | 3895
 snr_20220323180357 | 192.168.1.229 |      100 |      43 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058637.38347   | 43.008519887924194 | 0.0 | 3895
 snr_20220323180402 | 192.168.1.229 |       99 |      49 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058642.8820572 | 48.50710701942444  | 0.0 | 3895
 snr_20220323180408 | 192.168.1.229 |       99 |      54 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058648.3795574 | 54.00460720062256  | 6.2 | 3895
 snr_20220323180413 | 192.168.1.229 |       99 |      59 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058653.8280468 | 59.45309662818909  | 0.0 | 3895
 snr_20220323180419 | 192.168.1.229 |       99 |      65 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058659.3180714 | 64.94312119483948  | 4.9 | 3895
 snr_20220323180424 | 192.168.1.229 |       99 |      70 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058664.8023574 | 70.42740726470947  | 0.0 | 3895
 snr_20220323180430 | 192.168.1.229 |       99 |      76 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058670.286937  | 75.91198682785034  | 0.0 | 3895
 snr_20220323180435 | 192.168.1.229 |       97 |      81 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058675.7804654 | 81.40551519393921  | 0.0 | 3895
 snr_20220323180441 | 192.168.1.229 |       99 |      87 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058681.2751634 | 86.90021324157715  | 0.0 | 3895
 snr_20220323180446 | 192.168.1.229 |       99 |      92 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058686.7713509 | 92.39640069007874  | 5.9 | 3895
 snr_20220323180452 | 192.168.1.229 |       99 |      98 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058692.2672575 | 97.89230728149414  | 0.3 | 3895
 snr_20220323180457 | 192.168.1.229 |       99 |     103 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058697.7704427 | 103.39549255371094 | 5.4 | 3895
 snr_20220323180503 | 192.168.1.229 |       99 |     109 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058703.21333   | 108.83837985992432 | 0.3 | 3895
 snr_20220323180508 | 192.168.1.229 |       99 |     114 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058708.6879904 | 114.31304025650024 | 0.0 | 3895
 snr_20220323180514 | 192.168.1.229 |       99 |     120 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058714.1396198 | 119.76466965675354 | 0.3 | 3895
 snr_20220323180519 | 192.168.1.229 |       99 |     125 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058719.6158638 | 125.24091362953186 | 0.0 | 3895
 snr_20220323180525 | 192.168.1.229 |      100 |     131 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058725.0950723 | 130.72012209892273 | 6.5 | 3895
 snr_20220323180530 | 192.168.1.229 |       99 |     136 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058730.57256   | 136.19760990142822 | 0.0 | 3895
(25 rows)

Query 20220323_184946_00003_p66fs, FINISHED, 1 node

Table Layout

Table Layout

Example Rows

Example Rows

Detailed Schema for the Topic in Pulsar SQL

Detailed Schema for the Topic in Pulsar SQL

Below we are creating a simple Spark streaming application in Scala to consume topic messages in JSON and store them in a file system as CSV as events arrived with Spark Structured streaming.

Spark SQL

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()

scala> dfPulsar.printSchema()
root
 |-- uuid: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- cputempf: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- host: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- macaddress: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- te: string (nullable = true)
 |-- cpu: float (nullable = true)
 |-- diskusage: string (nullable = true)
 |-- memory: float (nullable = true)
 |-- rowid: string (nullable = true)
 |-- systemtime: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- BH1745_red: float (nullable = true)
 |-- BH1745_green: float (nullable = true)
 |-- BH1745_blue: float (nullable = true)
 |-- BH1745_clear: float (nullable = true)
 |-- VL53L1X_distance_in_mm: float (nullable = true)
 |-- ltr559_lux: float (nullable = true)
 |-- ltr559_prox: float (nullable = true)
 |-- bme680_tempc: float (nullable = true)
 |-- bme680_tempf: float (nullable = true)
 |-- bme680_pressure: float (nullable = true)
 |-- bme680_humidity: float (nullable = true)
 |-- lsm303d_accelerometer: string (nullable = true)
 |-- lsm303d_magnetometer: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)


## Example Queries

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()

val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", 
                                 "CAST(uuid AS STRING)",
                                 "CAST(ipaddress AS STRING)",
                                 "CAST(cputempf AS STRING)",
                                 "CAST(host AS STRING)",
                                 "CAST(cpu AS STRING)",
                                 "CAST(diskusage AS STRING)",
                                 "CAST(memory AS STRING)",
                                 "CAST(systemtime AS STRING)",
                                 "CAST(BH1745_red AS STRING)",
                                 "CAST(BH1745_green AS STRING)",
                                 "CAST(BH1745_blue AS STRING)",
                                 "CAST(BH1745_clear AS STRING)",
                                 "CAST(VL53L1X_distance_in_mm AS STRING)",
                                 "CAST(ltr559_lux AS STRING)",                                 
                                 "CAST(bme680_tempf AS STRING)",
                                 "CAST(bme680_pressure AS STRING)",
                                 "CAST(bme680_humidity AS STRING)")
                                 .as[(String, String, String, String, String, String, String, String,
                                 String, String, String, String, String, String, String, String, String, String)]
            .writeStream.format("csv")
            .option("truncate", "false")
            .option("header", true)
            .option("path", "/opt/demo/pisensordata")
            .option("checkpointLocation", "/tmp/checkpoint")
            .start()

## You could do csv, parquet, json, orc

pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()

// can be "orc", "json", "csv", etc.

The above simple ETL job will read events as they arrive, convert them from JSON to CSV, and then land them in a file system which could be S3, HDFS, or a vanilla Linux file system.

Spark Runtime Metrics

Spark Runtime Metrics 

Spark Explain Plan for our SQL ETL Job

Spark Explain Plan for our SQL ETL Job

Output from our Spark job

Output from our Spark job

Details for Query 7

Details for Stage 14

Job Details

Streaming Query Statistics

Performance Metrics for Spark server

Below we can see the output of one of our loaded records as a CSV file.

Example Spark ETL CSV Output

/opt/demo/pisensordata# cat part-00000-0425bfc8-5d25-4143-818c-bc7af5e1d82c-c000.csv
__key,uuid,ipaddress,cputempf,host,cpu,diskusage,memory,systemtime,BH1745_red,BH1745_green,BH1745_blue,BH1745_clear,VL53L1X_distance_in_mm,ltr559_lux,bme680_tempf,bme680_pressure,bme680_humidity
snr_20220324215723,snr_20220324215723,192.168.1.229,95,piups,0.0,3887.5 MB,20.6,03/24/2022 17:57:24,134.2,99.0,75.6,130.0,15.0,6.09,70.66,1006.11,44.737

Directory of CSV Files Produced



We can also run continous Apache Flink SQL applications against our topic data using standard SQL. First, we connect Flink to Pulsar via a catalog pointing to the Administration and Service servers.   We can then examine our topics as Flink SQL tables and run a continuous query on them.

Flink SQL

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;


describe `pi-sensors`;
> 
+------------------------+--------+------+-----+--------+-----------+
|                   name |   type | null | key | extras | watermark |
+------------------------+--------+------+-----+--------+-----------+
|                   uuid | STRING | true |     |        |           |
|              ipaddress | STRING | true |     |        |           |
|               cputempf |    INT | true |     |        |           |
|                runtime |    INT | true |     |        |           |
|                   host | STRING | true |     |        |           |
|               hostname | STRING | true |     |        |           |
|             macaddress | STRING | true |     |        |           |
|                endtime | STRING | true |     |        |           |
|                     te | STRING | true |     |        |           |
|                    cpu |  FLOAT | true |     |        |           |
|              diskusage | STRING | true |     |        |           |
|                 memory |  FLOAT | true |     |        |           |
|                  rowid | STRING | true |     |        |           |
|             systemtime | STRING | true |     |        |           |
|                     ts |    INT | true |     |        |           |
|              starttime | STRING | true |     |        |           |
|             BH1745_red |  FLOAT | true |     |        |           |
|           BH1745_green |  FLOAT | true |     |        |           |
|            BH1745_blue |  FLOAT | true |     |        |           |
|           BH1745_clear |  FLOAT | true |     |        |           |
| VL53L1X_distance_in_mm |  FLOAT | true |     |        |           |
|             ltr559_lux |  FLOAT | true |     |        |           |
|            ltr559_prox |  FLOAT | true |     |        |           |
|           bme680_tempc |  FLOAT | true |     |        |           |
|           bme680_tempf |  FLOAT | true |     |        |           |
|        bme680_pressure |  FLOAT | true |     |        |           |
|        bme680_humidity |  FLOAT | true |     |        |           |
|  lsm303d_accelerometer | STRING | true |     |        |           |
|   lsm303d_magnetometer | STRING | true |     |        |           |
+------------------------+--------+------+-----+--------+-----------+

select max(bme680_pressure) as maxpressure, max(bme680_tempf) as maxtemp, max(ltr559_lux) as maxlux, avg(BH1745_red) as avgred,
       max(VL53L1X_distance_in_mm) as maxdistance
from `pi-sensors`

select * from `pi-sensors`;

In the below screen captures, you can see the results of our Flink SQL queries.

Flink SQL Row Summary

Flink SQL Row Summary

Creating Our Flink SQL Catalog to Apache Pulsar

Creating Our Flink SQL Catalog to Apache Pulsar

Flink SQL Running

Flink SQL Running

Flink SQL Totals Updating

Flink SQL Totals Updating

In the final part of our application, we can write an Apache NiFi flow to consume that same data from the pi-sensors Pulsar topic and write it as rows in MongoDB (or any other NoSQL database).

Apache NiFi: Pulsar Consumer, MongoDB Writer

Apache NiFi Process Group for Our Application

Apache NiFi Process Group for Our Application

NiFi Data Flow

NiFi Data Flow

  1. Consume Pulsar Records from the pi-sensors topic via Pulsar Connection Pool.
  2. Using Apache Calcite to Query, Filter, Route, and Transform incoming records.
  3. Store records as BSON rows in a MongoDB NoSQL collection.
  4. On failure, route to temporary NiFi queue to process in the future. Consume Pulsar Records from listed topic

Consume Pulsar Records from listed topic

Limit records to where humidity is greater than 25%

Limit records to where humidity is greater than 25%

Store Records in PiSensors MongoDB collection via MongoDB Connection Pool

Store Records in PiSensors MongoDB collection via MongoDB Connection Pool

Configure MongoDB Connection Pool

Configure MongoDB Connection Pool

In the code below we will log into our MongoDB server and query our new collection inserted by our Apache NiFi flow.

Data Store: MongoDB


mongo -u debezium -p dbz --authenticationDatabase admin pulsar1:27017/inventory

show databases

db.createCollection("pisensors")

show collections

db.pisensors.find().pretty()

db.pisensors.find().pretty()
{
        "_id" : ObjectId("623b812e5dae8913d42a93ee"),
        "uuid" : "snr_20220323194514",
        "ipaddress" : "192.168.1.229",
        "cputempf" : 100,
        "runtime" : 9,
        "host" : "piups",
        "hostname" : "piups",
        "macaddress" : "b8:27:eb:4a:4b:61",
        "endtime" : "1648064714.7820184",
        "te" : "9.371636629104614",
        "cpu" : 6.5,
        "diskusage" : "3895.4 MB",
        "memory" : 21.4,
        "rowid" : "20220323194514_c9ec900f-05c2-49c4-985f-ddd83e8b15c0",
        "systemtime" : "03/23/2022 15:45:15",
        "ts" : 1648064715,
        "starttime" : "03/23/2022 15:45:05",
        "BH1745_red" : 112.2,
        "BH1745_green" : 83,
        "BH1745_blue" : 64.8,
        "BH1745_clear" : 110,
        "VL53L1X_distance_in_mm" : 31,
        "ltr559_lux" : 6.65,
        "ltr559_prox" : 0,
        "bme680_tempc" : 23.47,
        "bme680_tempf" : 74.25,
        "bme680_pressure" : 1017.71,
        "bme680_humidity" : 34.432,
        "lsm303d_accelerometer" : "-00.08g : -01.01g : +00.01g",
        "lsm303d_magnetometer" : "+00.06 : +00.30 : +00.07"
}
MongoDB Command Line Query

MongoDB Command Line Query

Finally, if it's not watched did it happen?   So let's monitor our Pulsar cluster and see what's going on.

Monitor Everything! Let me see what's going on!?!??!

Performance Metrics for Pulsar Brokers

Performance Metrics for Pulsar Brokers

Metrics on Apache Pulsar Topics and Data

Metrics on Apache Pulsar Topics and Data

Grafana Chart of Pulsar Metrics

Grafana Chart of Pulsar Metrics

Our application is complete, taking sensor data from start to final display in an easy and scalable way.

References


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK