5

阿里云IoT流转到postgresql数据库方案 - 波多尔斯基

 2 years ago
source link: https://www.cnblogs.com/podolski/p/16178761.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

之前写过一篇如使用阿里云上部署.NET 3.1自定义运行时的文章,吐槽一下,虽然现在已经2022年了,但是阿里云函数计算的支持依然停留在.NET Core 2.1,更新缓慢,由于程序解包大小的限制,也不能放太复杂的东西的上去,虽然现在.NET 6裁剪包能挺好地解决这个问题,但是心里还是不爽。

需求#

言归正传,有这么一个情景:发送数据想接入阿里云的IoT平台,然后直接插入postgresQL数据库中。正常来说,只要数据发送到了IoT平台,然后定义转发到RDS就可以了,不过阿里云有几个限制:

  1. 数据流转到RDS数据库,只能支持mysqlsql server
  2. 数据流转只支持json形式的数据流转,如果发送的是透传的数据,那么发送不了(更新:现在新版的数据流转已经支持了。)

思前想后,可能只能掏出阿里云的函数计算服务了,运用函数计算作为中转,将透传的数据流转给函数计算,然后在函数计算中执行sql语句。

IoT平台接收设置#

阿里云的物联网平台,设置了基本的产品和设备之后,如果是物模型的话,那么自行设置好对应的物模型。对于透传就比较简单了,支持MQTT的设备方只需要定义:

  • 透传的消息发送到/{productKey}/{deviceName}/user/update
  • 订阅阿里云的/{productKey}/{deviceName}/user/get
  • 设置阿里云的Mqtt IoT实例终端节点:({YourProductKey}.iot-as-mqtt.{YourRegionId}.aliyuncs.com:1883
  • 设置设备的ProductKey和ProductSecret

设置好之后,即可传输数据到阿里云IoT端,数据传输过来,看下日志,如果能看到:

img

那说明就已经发送OK了,接收到的是普通的字符串(不是json),需要进行进一步解析。

IoT流转设置#

云产品流转中,新建解析器,设置好数据源,数据目的选择函数计算:
img
解析器脚本比较简单:

var data = payload(); 
writeFc(1000, data);  

注意,payload函数payload(textEncoding)是內建的函数:

  • 不传入参数:默认按照UTF-8编码转换为字符串,即payload()等价于payload('utf-8')。
  • 'json':将payload数据转换成Map格式变量。如果payload不是JSON格式,则返回异常。
  • 'binary':将payload数据转换成二进制变量进行透传。
    这里我使用文本透传的形式,将数据转成UTF8文本传输。writeFc指将转换的内容传递给1000编号的目标函数。详情见文档

当然还可以使用更为复杂的脚本,实现对脚本数据的初步处理,由于我这里后面还有函数计算,我就直接将数据转到下一个节点。

函数计算配置#

按照官方文档新建函数,请注意不需要新建触发器!我们这里的函数使用python语言,通过psycopg2实现数据插入到postgres数据库中。

由于函数计算中,默认并没有该包,需要手动添加引用,官方建议使用Serverless Devs工具安装部署,这个玩意非常不好用,嗯,我不接受他的建议。推荐大家使用vscode,安装阿里云serverless的插件,这样其实更加方便。

按照插件的文档,自己建立好服务与函数,默认会给一个函数入口:

# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
#   logger = logging.getLogger()
#   logger.info('initializing')

def handler(event, context):
  logger = logging.getLogger()
  logger.info(event)
  return 'hello world'

我们首先在函数上右键,然后选择Install Package,选择pip安装psycopg2,依赖就自动被安装上了,这个非常方便。

请注意,通过IOT流转过来的字符串,是b'data'这样的形式的形式,需要先decode一下,然后在处理,修改函数为:

# -*- coding: utf-8 -*-
import logging
import psycopg2
import uuid
import time

def insert_database(device_id,data):
    timest = int(time.time()*1000)
    guid = str(uuid.uuid1())
    conn = psycopg2.connect(database="", user="", password="", host="", port="")
    cur = conn.cursor()
    sql = 'INSERT INTO "data"("Id","DeviceId","Timestamp", "DataArray") VALUES (\'{id}\', \'{deviceid}\', \'{timestamp}\', array{data})'
    sql = sql.format(id= guid, deviceid= device_id, timestamp= timest, data= data)
    cur.execute(sql)
    conn.commit()
    print(" Records inserted successfully")
    conn.close() 
    
def extract_string_array(data: bytes):
    arr = data.decode().strip().split(' ')
    # 写自己的逻辑
    return deviceid, resu   

def handler(event, context):
  logger = logging.getLogger()
  logger.info(event)
  device_id, result = extract_string_array(event)
  insert_database(device_id, result)
  return 'OK'

保存,然后在vscode中deploy即可。

提示:vscode中也可以进行本地的debug,还是比较方便的,不过这些功能依赖docker,所以还是提前装好比较好。

弄完了之后,应该是能看见这样的画面:

img

至此,数据就正常流转成功。

要点#

  1. 不要设置触发器,当时为了配置这个触发器弄了非常长的时间
  2. 函数计算与数据库的VPC应该相同,并且赋予权限,否则无法访问。
  3. 函数计算默认无法保持状态,如果有这个需求,最好试试别的方案,或者看下函数计算的预留实例(常驻实例)
  4. 提前在本地安装好Docker,要不会有各种各样的问题出现。
  5. Postgresql插入数组格式的数据,需要注意格式,可以参考这篇文档
  6. 如果长时间不用docker,导致docker无法启动,可以参考这篇文章

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK