Interacting with devices using MQTT

 1 year ago
source link: https://atlemann.github.io/fsharp/2022/12/16/interacting-with-devices-mqtt.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Interacting with devices using MQTT

Dec 16, 2022

This is part of Sergey Thion’s awesome F# advent calendar. Thanks for organizing one again!

Code related to this post is located in the FsAdvent2022 branch in this repository.

In this post we’re going to try to interact with the smart home MQTT API for FutureHome. Fortunately for us, someone has already created a MQTT client library for us, available here. And someone has been so kind as to extend it with observables, instead of having to deal with the utterly useless thing called events. That library can be found here.

Is the Futurehome IoT Messaging Protocol. The documentation can be found here. It’s a bit tricky to figure out how it works but we’ll give it a shot anyway. Some examples can be found in their Go implementation here which has been used to figure out what the different request content should be for certain things.

FIMP message format

Messages sent using FIMP are JSON messages containing the following properties (see here for original docs):

Property Type Required Description
corid String No Message correlation id. Used for request - response matching.
ctime String Yes Message creation time, e.g. "2019-05-31 17:36:31 +0200"
props Map<String, String> Yes Map of properties.
resp_to String No* Response topic where requester will expect to receive response.
serv String Yes Service name the interface is part of.
src String Yes Source or of the message, should be set only for commands.
tags List No List of tags.
type String Yes Interface type, defines message format.
uid String Yes Unique message identifier.
val dynamic Yes “payload” - type is defined by val_t.
val_t String Yes Data format of val field. See below.
ver String Yes Version of the message format, default: "1".

Value Types

Since val can be any type, val_t defines what type it is. List of supported val types:

val_t Sample val
string 'Hello world!'
int 3
float 3.1415
bool true
null null
str_array ['hello, 'world']
int_array [0, 1, 1, 2, 3, 5, 8, 13]
float_array [3.14, 2.71]
int_map {"answer": 42}
str_map {"ip": ""}
float_map {"pi: 3.14"}
bool_map {"normalityRestored": true}
object* {"nested": {"objects": "supported"}}
base64 U28gbG9uZywgYW5kIHRoYW5rcyBmb3IgYWxsIHRoZSBmaXNoLg==

Creating the object type

All the possible types of val_t are pretty simple except the object type. After some digging around here it seems it look something like this:

    "cmd": "delete",
    "component": "room",
    "id": 1,
    "param": {},
    "requestId": 7294000000006

where param could look like:

"param": {
    "components": [

So let’s start off with the possible commands. We’re going to use Thoth.Json.Net to encode our messages, like this:

type Cmd =
    | Get
    | Set
    | Delete
    | Edit

module Cmd =
    let encode (value: Cmd) : JsonValue =
        match value with
        | Cmd.Get -> Encode.string "get"
        | Cmd.Set -> Encode.string "set"
        | Cmd.Delete -> Encode.string "delete"
        | Cmd.Edit -> Encode.string "edit"

Next we have components, which can be any of the following:

type Component =
    | Config
    | Device
    | House
    | Hub
    | Mode
    | Room
    | Service
    | Shortcut
    | Thing

module Component =
    let encode (value: Component) : JsonValue =
        match value with
        | Component.Config -> Encode.string "config"
        | Component.Device -> Encode.string "device"
        | Component.House -> Encode.string "house"
        | Component.Hub -> Encode.string "hub"
        | Component.Mode -> Encode.string "mode"
        | Component.Room -> Encode.string "room"
        | Component.Service -> Encode.string "service"
        | Component.Shortcut -> Encode.string "shortcut"
        | Component.Thing -> Encode.string "thing"

The id can be things like room-id or house mode (home, sleep etc.):

type ObjectRequestId =
    | DeviceId of int
    | Mode of Mode
    | FireAlarm of Enabled: bool * Supported: bool

module ObjectRequestId =
    let encode (value: ObjectRequestId) : JsonValue =
        match value with
        | ObjectRequestId.DeviceId v -> Encode.int v
        | ObjectRequestId.Mode m -> Mode.encode m
        | ObjectRequestId.FireAlarm (isEnabled, isSupported) ->
            Encode.object [
                "enabled", Encode.bool isEnabled
                "supported", Encode.bool isSupported

And finally we have the Object type:

type ObjectVal =
    { Cmd: Cmd
      Component: Component option
      Id: ObjectRequestId option
      Param: JsonValue option }

module ObjectVal =
    let create cmd component' id param =
        { Cmd = cmd
          Component = component'
          Id = id
          Param = param }

    let encode (value: ObjectVal) =
        Encode.object [
            "cmd", Cmd.encode value.Cmd

            match value.Component with
            | Some c ->
                "component", Component.encode c
            | None ->
                "component", Encode.nil

            match value.Id with
            | Some id ->
                "id", ObjectRequestId.encode id
            | None ->
                "id", Encode.nil

            match value.Param with
            | Some p ->
                "param", p
            | None ->
                "param", Encode.nil

The Val type

Now that we have the object type, we can create the val type:

type Val =
    | String of string
    | Int of int
    | Float of float
    | Bool of bool
    | Null
    | Str_array of string array
    | Int_array of int array
    | Float_array of float array
    | Int_map of Map<string, int>
    | Str_map of Map<string, string>
    | Float_map of Map<string, float>
    | Bool_map of Map<string, bool>
    | Object of ObjectVal
    | Base64 of string

module Val =

    let encode (value: Val) =
        match value with
        | Val.String x -> ["val_t", Encode.string "string"; "val", Encode.string x]
        | Val.Int x -> ["val_t", Encode.string "int"; "val", Encode.int x]
        | Val.Float x -> ["val_t", Encode.string "float"; "val", Encode.float x]
        | Val.Bool x -> ["val_t", Encode.string "bool"; "val", Encode.bool x]
        | Val.Null -> ["val_t", Encode.string "null"; "val", Encode.string "null"]
        | Val.Str_array xs -> ["val_t", Encode.string "string_array"; "val", xs |> Array.map Encode.string |> Encode.array]
        | Val.Int_array xs -> ["val_t", Encode.string "int_array"; "val", xs |> Array.map Encode.int |> Encode.array]
        | Val.Float_array xs -> ["val_t", Encode.string "float_array"; "val", xs |> Array.map Encode.float |> Encode.array]
        | Val.Int_map x -> ["val_t", Encode.string "int_map"; "val", x |> Map.map (fun _ v -> Encode.int v) |> Encode.dict]
        | Val.Str_map x -> ["val_t", Encode.string "str_map"; "val", x |> Map.map (fun _ v -> Encode.string v) |> Encode.dict]
        | Val.Float_map x -> ["val_t", Encode.string "float_map"; "val", x |> Map.map (fun _ v -> Encode.float v) |> Encode.dict]
        | Val.Bool_map x -> ["val_t", Encode.string "bool_map"; "val", x |> Map.map (fun _ v -> Encode.bool v) |> Encode.dict]
        | Val.Object x -> ["val_t", Encode.string "object"; "val", ObjectVal.encode x]
        | Val.Base64 x -> ["val_t", Encode.string "base64"; "val", Encode.string x]

The message type

Finally we can create the FIMP message type that has to be sent as content to the server.

// Modules not included for brevity
type CorId = CorId of CorrelationId: string
type Ctime = Ctime of CreationTime: DateTime
type Props = Props of Properties: Map<string, string>
type RespTo = RespTo of ResponseTopic: string
type Serv = Serv of ServiceName: string
type Src = Src of string
type TagsList = TagsList of string list
type Type = Type of InterfaceType: string
type Uid = Uid of MessageIdentifier: Guid
type Ver = private Ver of string

type Message =
    { CorId: CorId option
      Ctime: Ctime
      Props: Props
      RespTo: RespTo option
      Serv: Serv
      Src: Src
      Tags: TagsList list option
      Type: Type
      Uid: Uid
      Val: Val
      Ver: Ver }

module Message =

    let create ctime props serv src interfaceType uid value =
        { CorId = None
          Ctime = ctime
          Props = props
          RespTo = None
          Serv = serv
          Src = src
          Tags = None
          Type = interfaceType
          Uid = uid
          Val = value
          Ver = Ver.defaultVer }

    let createTimeStamped props serv src interfaceType value =
            (Ctime.create DateTime.Now)

    let withCorrelationId corId msg =
        { msg with CorId = Some corId }

    let withResponseTopic topic msg =
        { msg with RespTo = Some topic }

    let withTags tags msg =
        { msg with Tags = Some tags }

    let private encodeOpt (f: 'a -> JsonValue) (value: 'a option) =
        |> Option.map f
        |> Option.defaultValue Encode.nil

    let encode (msg: Message) =
        Encode.object [
            "corid", msg.CorId |> encodeOpt CorId.encode
            "ctime", msg.Ctime |> Ctime.encode
            "props", msg.Props |> Props.encode
            "resp_to", msg.RespTo |> encodeOpt RespTo.encode
            "serv", msg.Serv |> Serv.encode
            "src", msg.Src |> Src.encode
            "tags", msg.Tags |> encodeOpt TagsList.encode
            "type", msg.Type |> Type.encode
            "uid", msg.Uid |> Uid.encode
            yield! msg.Val |> Val.encode
            "ver", msg.Ver |> Ver.encode

Creating the MQTT client

Here we’re using MQTTnet.Extensions.External.RxMQTT.Client to create a client we can interact with using observables.

module MqttClient =
    open MQTTnet.Extensions.External.RxMQTT.Client

    type ClientId = ClientId of string
    type Topic = Topic of string

    type TcpServer =
        { Url: string
          Port: int }

    type Credentials =
        { UserName: string
          Password: string }

    // ClientId -> TcpServer -> Credentials -> Task<IRxMqttClient>
    let create (ClientId clientId) (server: TcpServer) (credentials: Credentials) = task {
        let options =
                    .WithTcpServer(server.Url, server.Port)
                    .WithCredentials(credentials.UserName, credentials.Password)

        let mqttClient = MqttFactory().CreateRxMqttClient()
        do! mqttClient.StartAsync options
        return mqttClient

    // IRxMqttClient -> Topic -> IObservable<MqttApplicationMessageReceivedEventArgs>
    let createSubscription (mqttClient: IRxMqttClient) (Topic topic) =
    // Topic -> Message -> MqttApplicationMessage
    let createMessage (Topic topic) (message: Message) =
            .WithPayload(message |> Message.encode |> Encode.toString 0)

Listing all devices

An example request can be seen here. Let’s try to create the request message:

module Things =

    let requestTopic = RequestTopic.create "pt:j1/mt:cmd/rt:app/rn:vinculum/ad:1"

    // We can select the `ad` for the response
    let responseTopic = ResponseTopic.create "pt:j1/mt:rsp/rt:app/rn:vinculum/ad:things"

    // Val -> Message
    let private createDefaultMessage =
            (Ctime.create DateTime.Now)
            (Props.create List.empty)
            (Type.create "cmd.pd7.request") // Not sure what pd7 is, but copied from their example.

    // Val -> Message
    let createMessage value =
        |> createDefaultMessage
        |> Message.withResponseTopic responseTopic

    // Component list -> Val
    let private encodeComponents items =
        Encode.object [
            |> List.map Component.encode
            |> Encode.list
        |> Some
        |> ObjectVal.create Cmd.Get None None
        |> Val.Object

    // Val
    let listDevices =
        |> encodeComponents

Interacting with the API

The MqttClient we’re using returns responses using observables. If we want to get the list of devices in a request/response fashion, we have to leave the monad by converting the observable to a Task.

let getAsync (mqttClient: IRxMqttClient) requestTopic responseTopic message =
    task {    
        let respObs =
            responseTopic // The response topic we defined above
            |> MqttClient.createSubscription mqttClient
            // We want the first message and the observable to complete
            // so the task below will finish/return.
            |> Observable.first

        // Subscribe/start as task before sending the request
        let response = respObs.ToTask()

        do! message
            |> MqttClient.createMessage requestTopic
            |> mqttClient.PublishAsync

        return! response
/// Gets all available devices
let getAllDevices (mqttClient: IRxMqttClient) = task {
    let! response =
        |> Things.createMessage
        |> getAsync mqttClient Things.requestTopic Things.responseTopic

    return response.ApplicationMessage.Payload.ToUTF8String()

This is an example response for a motion sensor device:

    "client": {
        "name": "Motion sensor"
    "fimp": {
        "adapter": "zwave-ad",
        "address": "19",
        "group": "ch_0"
    "functionality": null,
    "id": 36,
    "lrn": true,
    "model": "zw_271_2049_4097",
    "param": {
        "alarms": {},
        "batteryLevel": "ok",
        "batteryPercentage": 100,
        "illuminance": 1.0,
        "presence": false,
        "supportedAlarms": {
            "burglar": [
        "temperature": 23.0,
        "timestamp": "2022-11-10 22:20:14 +0100",
        "zwaveConfigParameters": []
    "problem": false,
    "room": 6,
    "services": {
        "alarm_burglar": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:alarm_burglar/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_events": [
        "basic": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:basic/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true
        "battery": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:battery/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true
        "dev_sys": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:dev_sys/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true
        "sensor_accelx": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_accelx/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
        "sensor_accely": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_accely/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
        "sensor_accelz": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_accelz/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
        "sensor_lumin": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_lumin/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
        "sensor_presence": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_presence/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true
        "sensor_seismicint": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_seismicint/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
        "sensor_temp": {
            "addr": "/rt:dev/rn:zw/ad:1/sv:sensor_temp/ad:19_0",
            "enabled": true,
            "intf": [
            "props": {
                "is_secure": false,
                "is_unsecure": true,
                "sup_units": [
                "thing_role": "main"
    "supports": [
    "thing": 13,
    "type": {
        "subtype": "presence",
        "supported": {
            "sensor": [
        "type": "sensor"

Interaction between devices

Now that we know which devices we have, their features and topics, we can try to make some interaction between them. We have a Fibaro motion sensor and a Philips Hue Go lamp we can play with. First we need some requests for turning on lights and setting color.

module FsFimp.Commands

open System
open FsFimp.Fimp

let app = "fsfimp"
let src = Src.create app

module Units =
    type [<Measure>] Red
    type [<Measure>] Green
    type [<Measure>] Blue

type LevelSwitch = On | Off

module LevelSwitch =
    let interfaceType = Type.create "cmd.binary.set"
    let service = Serv.OutLevelSwitch

    let createMessage (toggle: LevelSwitch) =
        match toggle with
        | On -> true
        | Off -> false
        |> Val.Bool
        |> Message.createTimeStamped

type Red = int<Units.Red>
type Green = int<Units.Green>
type Blue = int<Units.Blue>

module Color =
    let interfaceType = Type.create "cmd.color.set"
    let service = Serv.ColorControl

    let createMessage (red: Red) (green: Green) (blue: Blue) =
        seq {
            "red", int red
            "green", int green
            "blue", int blue
        |> Map.ofSeq
        |> Val.Int_map
        |> Message.createTimeStamped

Next we have to listen to events from the motion sensor. We’ll turn on the lights when presence is detected and make it red when the burglar event is triggered (e.g. when shaking the sensor). The necessary topics were found from the getAllDevices response above.

use! mqttClient = MqttClient.create (ClientId "FsFimp") server credentials
use _ =
        .Subscribe(fun msg ->
            printfn "%A, %A" msg.ConnectResult msg.Exception.Message)

let send (msg: MQTTnet.MqttApplicationMessage) : Task<unit> =
    task {
         // This returns Task, but we need Task<unit>, hence the CE.
         // There might be a simpler way to do this though, but I 
         // couldn't figure it out in time.
        return! mqttClient.PublishAsync msg

let turnOnLightOnPresence =
    |> ResponseTopic.create
    |> MqttClient.createSubscription mqttClient
    |> Observable.map (fun msg ->
        let requestTopic =
            |> RequestTopic.create

        LevelSwitch.createMessage LevelSwitch.On
        |> MqttClient.createMessage requestTopic)

let makeLightRedOnBurglar =
    |> ResponseTopic.create
    |> MqttClient.createSubscription mqttClient
    |> Observable.map (fun msg ->
        let requestTopic =
            |> RequestTopic.create

        Color.createMessage 255<Units.Red> 0<Units.Green> 0<Units.Blue>
        |> MqttClient.createMessage requestTopic)

// Merge the two streams into one and send the messages when the different
// events are reported by the motion sensor.
use _ =
    Observable.merge turnOnLightOnPresence makeLightRedOnBurglar
    |> Observable.flatmapTask send
    |> Observable.subscribe id
Movie gif

Next steps

We’d want to decode the devices response and capture the different services each device provides. We could do something like this:

type Service =
    | Battery of ResponseTopic
    | BurglarAlarm of ResponseTopic
    | Luminance of ResponseTopic
    | Presence of ResponseTopic
    | Temperature of ResponseTopic

module Service =
    let decode : Decoder<Service list> =
        Decode.object (fun get ->
                get.Optional.At [ "battery"; "addr" ] Decode.string |> Option.map (ResponseTopic.createFromResourceType >> Battery)
                get.Optional.At [ "alarm_burglar"; "addr" ] Decode.string |> Option.map (ResponseTopic.createFromResourceType >> BurglarAlarm)
                get.Optional.At [ "sensor_lumin"; "addr" ] Decode.string |> Option.map (ResponseTopic.createFromResourceType >> Luminance)
                get.Optional.At [ "sensor_presence"; "addr" ] Decode.string |> Option.map (ResponseTopic.createFromResourceType >> Presence)
                get.Optional.At [ "sensor_temp"; "addr" ] Decode.string |> Option.map (ResponseTopic.createFromResourceType >> Temperature)
            |> List.choose id)

type Device =
    { Id: int
      Room: int option
      Model: string option
      Services: Service list }

module Device =
    let decoder : Decoder<Device> =
        Decode.object (fun get ->
            let id = get.Required.Field "id" Decode.int
            let room = get.Optional.Field "room" Decode.int

            let model = get.Optional.At [ "model" ] Decode.string
            let modelAlias = get.Optional.At [ "modelAlias" ] Decode.string

            let modelName =
                match model, modelAlias with
                | Some model, _ -> Some model
                | _, Some alias -> Some alias
                | _, _ -> None

            let services = get.Required.Field "services" Service.decode

            { Id = id
              Room = room
              Model = modelName
              Services = services })

    let devicesDecoder : Decoder<Device list> =
        Decode.object (fun get ->
            get.Required.At [ "val"; "param"; "device" ] (Decode.list decoder))

    let decodeAll json = Decode.fromString devicesDecoder json

Unfortunately, that’s all I had time for in this post. Next steps would be to try and add interaction with the lights to my smart house dashboard, written using Fable, where I already have weather forecast, netatmo weather station data and today’s electricity prices. But that’s a post for another time.

About Joyk

Aggregate valuable and interesting links.
Joyk means Joy of geeK