Pulsar in Python on Pi for Sensors
source link: https://dzone.com/articles/pulsar-in-python-on-pi
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Pulsar in Python on Pi for Sensors
Utilizing Apache Pulsar's Python Client on Raspberry Pi - FLiP-Py Stack
Join the DZone community and get the full member experience.
Join For FreeI have a new Raspberry Pi with a Breakout Garden with a thermal camera, 1.12" OLED screen, and a CO2+ sensor.
We first need to install the Pulsar Python Client, if you are running on certain architectures you will need to compile the Apache Pulsar C++ Client first.
pip3 install fastavro
pip3 install pytz
pip3 install pulsar-client[avro]
Our Python 3 App reads our sensor values and publishes the results as JSON Schema Records over Pulsar utilizing the Pulsar Client of Python.
root@thermal:/opt/demo# python3 thermal.py
2022-02-22 19:41:44.380 INFO [3069204864] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-22 19:41:44.380 INFO [3069204864] ConnectionPool:96 | Created connection for pulsar://pulsar1:6650
2022-02-22 19:41:44.386 INFO [3036730432] ClientConnection:368 | [192.168.1.204:34966 -> 192.168.1.230:6650] Connected to broker
2022-02-22 19:41:44.390 INFO [3036730432] HandlerBase:64 | [persistent://public/default/pi-thermal, ] Getting connection from pool
2022-02-22 19:41:44.392 INFO [3036730432] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-22 19:41:44.392 INFO [3036730432] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650
2022-02-22 19:41:44.396 INFO [3036730432] ClientConnection:370 | [192.168.1.204:34968 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2022-02-22 19:41:44.430 INFO [3036730432] ProducerImpl:189 | [persistent://public/default/pi-thermal, ] Created producer on broker [192.168.1.204:34968 -> 192.168.1.230:6650]
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_uri_20220223004144', 'ipaddress': '192.168.1.204', 'cputempf': 108, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576904.9345362', 'te': '0.0006937980651855469', 'cpu': 6.5, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004144_b9de27fa-fc0b-46a0-8d1f-04664360f3b0', 'systemtime': '02/22/2022 19:41:50', 'ts': 1645576910, 'starttime': '02/22/2022 19:41:44', 'datetimestamp': '2022-02-23 00:41:49.613734+00:00', 'temperature': 28.4543, 'humidity': 28.6, 'co2': 670.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_qle_20220223004150', 'ipaddress': '192.168.1.204', 'cputempf': 108, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576910.6230323', 'te': '0.0004811286926269531', 'cpu': 0.0, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004150_040bc286-7778-4b87-8f20-abbe3028fe29', 'systemtime': '02/22/2022 19:41:55', 'ts': 1645576915, 'starttime': '02/22/2022 19:41:50', 'datetimestamp': '2022-02-23 00:41:54.384983+00:00', 'temperature': 27.9977, 'humidity': 29.2, 'co2': 683.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_sgh_20220223004155', 'ipaddress': '192.168.1.204', 'cputempf': 107, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576915.400389', 'te': '0.0007336139678955078', 'cpu': 0.0, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004155_1a7c21b3-3e0b-48c9-8c3c-73e11b538602', 'systemtime': '02/22/2022 19:42:00', 'ts': 1645576920, 'starttime': '02/22/2022 19:41:55', 'datetimestamp': '2022-02-23 00:41:59.164362+00:00', 'temperature': 27.7413, 'humidity': 29.66, 'co2': 682.0}
^C2022-02-22 19:42:02.086 INFO [3069204864] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-02-22 19:42:02.086 INFO [3069204864] ProducerImpl:686 | [persistent://public/default/pi-thermal, standalone-1-2217] Closing producer for topic persistent://public/default/pi-thermal
2022-02-22 19:42:02.092 INFO [3036730432] ProducerImpl:729 | [persistent://public/default/pi-thermal, standalone-1-2217] Closed producer
2022-02-22 19:42:02.092 INFO [3036730432] ClientConnection:1548 | [192.168.1.204:34968 -> 192.168.1.230:6650] Connection closed
2022-02-22 19:42:02.092 INFO [3036730432] ClientConnection:1548 | [192.168.1.204:34966 -> 192.168.1.230:6650] Connection closed
2022-02-22 19:42:02.121 INFO [3069204864] ProducerImpl:655 | Producer - [persistent://public/default/pi-thermal, standalone-1-2217] , [batching = off]
2022-02-22 19:42:02.122 INFO [3069204864] ClientConnection:256 | [192.168.1.204:34966 -> 192.168.1.230:6650] Destroyed connection
2022-02-22 19:42:02.122 INFO [3069204864] ClientConnection:256 | [192.168.1.204:34968 -> 192.168.1.230:6650] Destroyed connection
We can also use an Avro schema instead of a JSON schema if that is to your liking.
root@thermal:/opt/demo# python3 thermalavro.py
Schema info is: {
"type": "record",
"name": "thermal",
"fields": [
{
"name": "uuid",
"type": [
"null",
"string"
]
},
{
"name": "ipaddress",
"type": [
"null",
"string"
]
},
{
"name": "cputempf",
"type": [
"null",
"int"
]
},
{
"name": "runtime",
"type": [
"null",
"int"
]
},
{
"name": "host",
"type": [
"null",
"string"
]
},
{
"name": "hostname",
"type": [
"null",
"string"
]
},
{
"name": "macaddress",
"type": [
"null",
"string"
]
},
{
"name": "endtime",
"type": [
"null",
"string"
]
},
{
"name": "te",
"type": [
"null",
"string"
]
},
{
"name": "cpu",
"type": [
"null",
"float"
]
},
{
"name": "diskusage",
"type": [
"null",
"string"
]
},
{
"name": "memory",
"type": [
"null",
"float"
]
},
{
"name": "rowid",
"type": [
"null",
"string"
]
},
{
"name": "systemtime",
"type": [
"null",
"string"
]
},
{
"name": "ts",
"type": [
"null",
"int"
]
},
{
"name": "starttime",
"type": [
"null",
"string"
]
},
{
"name": "datetimestamp",
"type": [
"null",
"string"
]
},
{
"name": "temperature",
"type": [
"null",
"float"
]
},
{
"name": "humidity",
"type": [
"null",
"float"
]
},
{
"name": "co2",
"type": [
"null",
"float"
]
}
]
}
2022-02-23 10:11:51.953 INFO [3069213056] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-23 10:11:51.953 INFO [3069213056] ConnectionPool:96 | Created connection for pulsar://pulsar1:6650
2022-02-23 10:11:51.960 INFO [3034862656] ClientConnection:368 | [192.168.1.204:34984 -> 192.168.1.230:6650] Connected to broker
2022-02-23 10:11:51.966 INFO [3034862656] HandlerBase:64 | [persistent://public/default/pi-thermal-avro, ] Getting connection from pool
2022-02-23 10:11:51.970 INFO [3034862656] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-23 10:11:51.970 INFO [3034862656] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650
2022-02-23 10:11:51.974 INFO [3034862656] ClientConnection:370 | [192.168.1.204:34986 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2022-02-23 10:11:52.008 INFO [3034862656] ProducerImpl:189 | [persistent://public/default/pi-thermal-avro, ] Created producer on broker [192.168.1.204:34986 -> 192.168.1.230:6650]
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_xtq_20220223151152', 'ipaddress': '192.168.1.204', 'cputempf': 110, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629112.5121799', 'te': '0.0006127357482910156', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151152_46867a0c-dcf1-4919-b4e6-47bd8ca87dc3', 'systemtime': '02/23/2022 10:11:58', 'ts': 1645629118, 'starttime': '02/23/2022 10:11:52', 'datetimestamp': '2022-02-23 15:11:57.192564+00:00', 'temperature': 30.0725, 'humidity': 28.01, 'co2': 1082.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_kzt_20220223151158', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629118.2040725', 'te': '0.0007352828979492188', 'cpu': 6.5, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151158_fece78a9-4c51-4ad6-8ffa-87ae6bc970d6', 'systemtime': '02/23/2022 10:12:02', 'ts': 1645629122, 'starttime': '02/23/2022 10:11:58', 'datetimestamp': '2022-02-23 15:12:01.967448+00:00', 'temperature': 29.7414, 'humidity': 28.58, 'co2': 1076.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_udz_20220223151202', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629122.9787736', 'te': '0.0005147457122802734', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151202_8fe911ba-5f02-47e4-a5bc-5535f3c789b9', 'systemtime': '02/23/2022 10:12:07', 'ts': 1645629127, 'starttime': '02/23/2022 10:12:02', 'datetimestamp': '2022-02-23 15:12:06.742775+00:00', 'temperature': 29.4877, 'humidity': 28.99, 'co2': 1071.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_kqr_20220223151207', 'ipaddress': '192.168.1.204', 'cputempf': 112, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629127.7585478', 'te': '0.0007393360137939453', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151207_c244eaa0-8b36-4040-8421-8bdafa0b46c0', 'systemtime': '02/23/2022 10:12:12', 'ts': 1645629132, 'starttime': '02/23/2022 10:12:07', 'datetimestamp': '2022-02-23 15:12:11.523104+00:00', 'temperature': 29.25, 'humidity': 29.35, 'co2': 1071.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_dfn_20220223151212', 'ipaddress': '192.168.1.204', 'cputempf': 110, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629132.5379777', 'te': '0.0008678436279296875', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151212_253d220a-34cd-4f1c-a435-1cb62834e3a2', 'systemtime': '02/23/2022 10:12:17', 'ts': 1645629137, 'starttime': '02/23/2022 10:12:12', 'datetimestamp': '2022-02-23 15:12:16.298915+00:00', 'temperature': 29.0097, 'humidity': 29.78, 'co2': 1064.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_flx_20220223151217', 'ipaddress': '192.168.1.204', 'cputempf': 111, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629137.3143985', 'te': '0.0007383823394775391', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151217_5d1cedb9-5fbb-4abe-bcf0-f836a6bec093', 'systemtime': '02/23/2022 10:12:22', 'ts': 1645629142, 'starttime': '02/23/2022 10:12:17', 'datetimestamp': '2022-02-23 15:12:21.179689+00:00', 'temperature': 28.8068, 'humidity': 30.11, 'co2': 1069.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_fjc_20220223151222', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629142.1899443', 'te': '0.0007307529449462891', 'cpu': 5.1, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151222_0f31c89a-96b5-4d8c-9f69-db3ff721d362', 'systemtime': '02/23/2022 10:12:26', 'ts': 1645629146, 'starttime': '02/23/2022 10:12:22', 'datetimestamp': '2022-02-23 15:12:25.953985+00:00', 'temperature': 28.5931, 'humidity': 30.52, 'co2': 1063.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_twe_20220223151226', 'ipaddress': '192.168.1.204', 'cputempf': 111, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629146.967186', 'te': '0.0004837512969970703', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151226_e85a8384-7f5b-4cc5-9bc7-ca5babd2fb54', 'systemtime': '02/23/2022 10:12:31', 'ts': 1645629151, 'starttime': '02/23/2022 10:12:26', 'datetimestamp': '2022-02-23 15:12:30.729362+00:00', 'temperature': 28.4009, 'humidity': 30.85, 'co2': 1061.0}
^C2022-02-23 10:12:31.743 INFO [3069213056] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-02-23 10:12:31.744 INFO [3069213056] ProducerImpl:686 | [persistent://public/default/pi-thermal-avro, standalone-1-2228] Closing producer for topic persistent://public/default/pi-thermal-avro
2022-02-23 10:12:31.746 INFO [3034862656] ProducerImpl:729 | [persistent://public/default/pi-thermal-avro, standalone-1-2228] Closed producer
2022-02-23 10:12:31.746 INFO [3034862656] ClientConnection:1548 | [192.168.1.204:34986 -> 192.168.1.230:6650] Connection closed
2022-02-23 10:12:31.747 INFO [3034862656] ClientConnection:1548 | [192.168.1.204:34984 -> 192.168.1.230:6650] Connection closed
2022-02-23 10:12:31.782 INFO [3069213056] ProducerImpl:655 | Producer - [persistent://public/default/pi-thermal-avro, standalone-1-2228] , [batching = off]
2022-02-23 10:12:31.782 INFO [3069213056] ClientConnection:256 | [192.168.1.204:34984 -> 192.168.1.230:6650] Destroyed connection
2022-02-23 10:12:31.783 INFO [3069213056] ClientConnection:256 | [192.168.1.204:34986 -> 192.168.1.230:6650] Destroyed connection
bin/pulsar-client consume "persistent://public/default/pi-thermal-avro" -s "thermalpiavro" -n 0
----- got message -----
key:[thrm_twe_20220223151226], properties:[], content:�.thrm_twe_20220223151226�?192.168.1.204�����thermal�thermal�"e4:5f:01:7c:3f:34�"1645629146.967186�*0.0004837512969970703���106314.1 MB����A�f20220223151226_e85a8384-7f5b-4cc5-9bc7-ca5babd2fb54�&02/23/2022 10:12:31�����
�&02/23/2022 10:12:26�@2022-02-23 15:12:30.729362+00:00�
5�A����A���D
We can connect Apache Flink to Apache Pulsar and create a catalog of topics (tables) to query.
CREATE CATALOG pulsar WITH (
'type' = 'pulsar',
'service-url' = 'pulsar://pulsar1:6650',
'admin-url' = 'http://pulsar1:8080',
'format' = 'json'
);
USE CATALOG pulsar;
SHOW TABLES;
select * from `pi-thermal`;
We can query the table (topic) from Trino/PrestoSQL as well.
Schema object in Python:
### Schema Object
# https://pulsar.apache.org/docs/en/client-libraries-python/
class thermal(Record):
uuid = String()
ipaddress = String()
cputempf = Integer()
runtime = Integer()
host = String()
hostname = String()
macaddress = String()
endtime = String()
te = String()
cpu = Float()
diskusage = String()
memory = Float()
rowid = String()
systemtime = String()
ts = Integer()
starttime = String()
datetimestamp = String()
temperature = Float()
humidity = Float()
co2 = Float()
Avro schema code:
client = pulsar.Client('pulsar://pulsar1:6650')
thermalschema = AvroSchema(thermal)
print("Schema info is: " + thermalschema.schema_info().schema())
producer = client.create_producer(topic='persistent://public/default/pi-thermal-avro' ,schema=thermalschema,properties={"producer-name": "thermal-pyavro-sensor","producer-id": "thermal-avro-sensor" })
JSON Schema code difference:
schema=JsonSchema(thermal)
Source:
https://github.com/tspannhw/FLiP-Pi-Thermal
References:
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK