ctucx.git: sdm2mqtt

Publish data from SDM120M meters via MQTT

commit 62bedcaf15af23c85354428fa1edafa1c2a67c2e
Author: Leah (ctucx) <git@ctu.cx>
Date: Mon, 5 Dec 2022 18:27:23 +0100

8 files changed, 372 insertions(+), 0 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -0,0 +1 @@
diff --git a/config.json b/config.json
@@ -0,0 +1,14 @@
+	"modbus": {
+		"host": "",
+		"port": 502
+	},
+	"mqtt": {
+		"host": "",
+		"port": 1883
+	},
+	"devices": {
+		"table": 50,
+	},
+	"updateInterval": 10
\ No newline at end of file
diff --git a/flake.lock b/flake.lock
@@ -0,0 +1,43 @@
+  "nodes": {
+    "flake-utils": {
+      "locked": {
+        "lastModified": 1667395993,
+        "narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
+        "owner": "numtide",
+        "repo": "flake-utils",
+        "rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
+        "type": "github"
+      },
+      "original": {
+        "owner": "numtide",
+        "repo": "flake-utils",
+        "type": "github"
+      }
+    },
+    "nixpkgs": {
+      "locked": {
+        "lastModified": 1670543317,
+        "narHash": "sha256-4mMR56rtxKr+Gwz399jFr4i76SQZxsLWxxyfQlPXRm0=",
+        "owner": "NixOS",
+        "repo": "nixpkgs",
+        "rev": "7a6a010c3a1d00f8470a5ca888f2f927f1860a19",
+        "type": "github"
+      },
+      "original": {
+        "owner": "NixOS",
+        "ref": "nixos-22.11",
+        "repo": "nixpkgs",
+        "type": "github"
+      }
+    },
+    "root": {
+      "inputs": {
+        "flake-utils": "flake-utils",
+        "nixpkgs": "nixpkgs"
+      }
+    }
+  },
+  "root": "root",
+  "version": 7
diff --git a/flake.nix b/flake.nix
@@ -0,0 +1,54 @@
+  description = "Exporter for SDM120M meters to mqtt, written in nim";
+  inputs = {
+    flake-utils.url = "github:numtide/flake-utils";
+    nixpkgs.url     = "github:NixOS/nixpkgs/nixos-22.11";
+  };
+  outputs = { self, nixpkgs, flake-utils }: {
+    overlay = final: prev: {
+      sdm2mqtt = (
+        let
+          nmqtt = final.fetchFromGitHub {
+            owner  = "zevv";
+            repo   = "nmqtt";
+            rev    = "v1.0.4";
+            sha256 = "1by0xyqz754dny19lf8rpkg42passnj0rs6rk3jr763m1zr803mc";
+          };
+        in final.nimPackages.buildNimPackage {
+          name        = "sdm2mqtt";
+          src         = self;
+          buildInputs = [ nmqtt ];
+          nimBinOnly  = true;
+          nimRelease  = true;
+        }
+      );
+    };
+  } // (flake-utils.lib.eachDefaultSystem (system:
+    let
+      pkgs = import nixpkgs {
+        inherit system;
+        overlays = [ self.overlay ];
+      };
+    in rec {
+      packages.default  = pkgs.sdm2mqtt;
+      packages.sdm2mqtt = pkgs.sdm2mqtt;
+      apps.default = {
+        type = "app";
+        program = "${pkgs.sdm2mqtt}/bin/sdm2mqtt";
+      };
+    }
+  ));
\ No newline at end of file
diff --git a/sdm2mqtt.nimble b/sdm2mqtt.nimble
@@ -0,0 +1,15 @@
+# Package
+version       = "0.1.0"
+author        = "Leah(ctucx)"
+description   = "Exports data from SDM120M powermeters to mqtt"
+license       = "AGPL-3.0"
+srcDir        = "./src"
+bin           = @["sdm2mqtt"]
+# Dependencies
+requires "nim >= 0.20.0"
+requires "nmqtt == 1.0.4"+
\ No newline at end of file
diff --git a/src/modbus.nim b/src/modbus.nim
@@ -0,0 +1,136 @@
+import std/[asyncdispatch, asyncnet]
+import std/[strutils, tables]
+### modbus general ###
+var mb_host {.threadvar.}:        string
+var mb_port {.threadvar.}:        Port
+var modbusSocket* {.threadvar.}:  AsyncSocket
+var transaction_id {.threadvar.}: uint16
+var transactions {.threadvar.}:   Table[uint16, proc(msg: string)]
+proc mkPacket_mbTcp (mb_packet: string): string =
+  inc(transaction_id)
+  return parseHexStr(toHex(transaction_id) & toHex(0u16) & toHex(uint16(len(mb_packet)))) & mb_packet
+proc reconnect () {.async.} =
+  echo "verbindung putt, ich fix das mal"
+  await sleepAsync(5000)
+  echo "jetzt"
+  modbusSocket = await asyncnet.dial(mb_host, mb_port)
+proc readPacket_mbTcp (): Future[(uint16, string)] {.async.} =
+  var res = ""
+  res = await modbusSocket.recv(8)
+  while res == "":
+    await reconnect()
+    res = await modbusSocket.recv(8)
+  let transaction_id = fromHex[uint16](toHex(res[0..1]))
+  let length = fromHex[uint16](toHex(res[4..5]))
+  let function_code = cast[uint8](res[7])
+  res = await modbusSocket.recv(int(length) - 2)
+  while res == "":
+    await reconnect()
+    res = await modbusSocket.recv(8)
+  if function_code >= 128u8:
+    raise newException(OsError, "mordbus error: " & toHex(res))
+  return (transaction_id, res)
+proc processAnswers() {.async.} =
+  while true:
+    try:
+      let (transaction_id, mb_packet) = await readPacket_mbTcp()
+      transactions[transaction_id](mb_packet)
+    except:
+      let e = getCurrentException()
+      echo("error while processing mordbus answer: ", e.msg)
+proc doRequest[T] (req: string, parse_proc: proc(foo: string): T): Future[T] =
+  var fut = newFuture[T]()
+  let tcp_packet = mkPacket_mbTcp(req)
+  asyncCheck modbusSocket.send(tcp_packet)
+  transactions[transaction_id] = proc(answer: string) =
+    #transactions.del(transaction_id)
+    fut.complete(parse_proc(answer))
+  return fut
+proc retry[T] (req: string, parse_proc: proc(foo: string): T): Future[T] {.async.} =
+  var retries = 5
+  var res: T
+  while retries > 0:
+    try:
+      res = await doRequest(req, parse_proc)
+      return res
+    except:
+      retries = retries - 1
+      let e = getCurrentException()
+      echo("error while processing mordbus answer: ", e.msg)
+### readInputRegisters ###
+proc mkPacket_readInputRegisters (unit_id: uint8, address: uint16, count: uint16): string =
+  return parseHexStr(toHex(unit_id) & toHex(4u8) & toHex(address) & toHex(count))
+proc parsePacket_readInputRegisters (packet: string): seq[uint16] =
+  var res: seq[uint16] = @[]
+  let bytes = cast[uint8](packet[0])
+  var i = 1
+  while i < int(bytes):
+    res.add(fromHex[uint16](toHex(packet[i..i+1])))
+    i += 2
+  return res
+proc readInputRegisters* (unit_id: uint8, address: uint16, count: uint16): Future[seq[uint16]] {.async.} =
+  return await retry(mkPacket_readInputRegisters(unit_id, address, count), parsePacket_readInputRegisters)
+### readRegisters ###
+proc mkPacket_readRegisters (unit_id: uint8, address: uint16, count: uint16): string =
+  return parseHexStr(toHex(unit_id) & toHex(3u8) & toHex(address) & toHex(count))
+proc parsePacket_readRegisters (packet: string): seq[uint16] =
+  var res: seq[uint16] = @[]
+  let bytes = cast[uint8](packet[0])
+  var i = 1
+  while i < int(bytes):
+    res.add(fromHex[uint16](toHex(packet[i..i+1])))
+    i += 2
+  return res
+proc readRegisters* (unit_id: uint8, address: uint16, count: uint16): Future[seq[uint16]] {.async.} =
+  return await retry(mkPacket_readRegisters(unit_id, address, count), parsePacket_readRegisters)
+### writeRegister ###
+proc mkPacket_writeRegister (unit_id: uint8, address: uint16, value: uint16): string =
+  return parseHexStr(toHex(unit_id) & toHex(6u8) & toHex(address) & toHex(value))
+proc parsePacket_writeRegister (packet: string): bool =
+  return true
+proc writeRegister* (unit_id: uint8, address: uint16, value: uint16): Future[bool] {.async.} =
+  return await retry(mkPacket_writeRegister(unit_id, address, value), parsePacket_writeRegister)
+### conversion ###
+proc mbFloatDCBA* (input: seq[uint16]): float32 =
+  let i: uint32 = uint32(input[0]) * 65536u32 + uint32(input[1])
+  return cast[float32](i)
+### main ###
+proc initModbus* (host: string, port: int) {.async.} =
+  mb_host        = host
+  mb_port        = Port(port)
+  modbusSocket           = await asyncnet.dial(mb_host, mb_port)
+  transaction_id = 0u16
+  transactions   = initTable[uint16, proc(msg: string)]()
+  asyncCheck processAnswers()
diff --git a/src/sdm2mqtt.nim b/src/sdm2mqtt.nim
@@ -0,0 +1,88 @@
+import std/[asyncdispatch, asyncnet]
+import std/[os, posix]
+import std/[tables, json]
+import std/math
+import nmqtt
+import types, modbus
+var mqttContext* {.threadvar.} : MqttCtx
+proc round* [T: float32|float64](value: T, places: int = 0): float = 
+  if places == 0:
+    result = round(value)
+  else:
+    result = value * pow(10.0, T(places))
+    result = floor(result)
+    result = result / pow(10.0, T(places))
+proc CtrlCHook* () {.noconv.} =
+  echo "Ctrl+C fired! \nStopping Server now!"
+  waitFor mqttContext.disconnect()
+  quit()
+proc updatePowermeter (deviceAddress: uint8, deviceName: string) {.async.} = 
+  let voltage   = mbFloatDCBA(await readInputRegisters(deviceAddress, 0, 2)).round(3)
+  let frequency = mbFloatDCBA(await readInputRegisters(deviceAddress, 70, 2)).round(3)
+  let `import`  = mbFloatDCBA(await readInputRegisters(deviceAddress, 72, 2)).round(3)
+  let cosphi    = mbFloatDCBA(await readInputRegisters(deviceAddress, 30, 2)).round(3)
+  let power     = mbFloatDCBA(await readInputRegisters(deviceAddress, 12, 2)).round(3)
+  let json = %* {
+    "id":         deviceAddress,
+    "deviceName": deviceName,
+    "voltage":    voltage,
+    "power":      power,
+    "frequency":  frequency,
+    "cosphi":     cosphi,
+    "import":     `import`
+  }
+  await mqttContext.publish("sdm2mqtt/"&deviceName,              $json,        2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/id",        $deviceName,  2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/voltage",   $voltage,     2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/frequency", $frequency,   2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/import",    $`import`,    2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/cosphi",    $cosphi,      2, true)
+  await mqttContext.publish("sdm2mqtt/"&deviceName&"/power",     $power,       2, true)
+proc updatePowermeters (config: Config) {.async.} =
+  await sleepAsync(500)
+  while true:
+    for name, address in config.devices.pairs():
+      try:
+        await updatePowermeter(address, name)
+      except:
+        echo "Error[updatePowermeters]:\n", getCurrentExceptionMsg()
+    await sleepAsync(int(config.updateInterval * 1000))
+proc main () {.async.} =
+  setControlCHook(CtrlCHook)
+  onSignal(SIGTERM):
+    echo "Got SIGTERM! \nStopping Server now!"
+    waitFor mqttContext.disconnect()
+    quit()
+  var configFile = "./config.json"
+  if getEnv("CONFIG_PATH") != "":
+    configFile = getEnv("CONFIG_PATH")
+  if not fileExists(configFile):
+    echo "Config file not found"
+    quit()
+  let config = parseFile(configFile).to(Config)
+  await initModbus(config.modbus.host, config.modbus.port)
+  mqttContext = newMqttCtx("sdm2mqtt")
+  mqttContext.set_host(config.mqtt.host, config.mqtt.port)
+  await mqttContext.start()
+  asyncCheck updatePowermeters(config)
+  runForever()
+waitFor main()+
\ No newline at end of file
diff --git a/src/types.nim b/src/types.nim
@@ -0,0 +1,17 @@
+import std/[options, tables]
+type ModbusConfig* = object
+  host*: string
+  port*: int
+type MqttConfig* = object
+  host*:     string
+  port*:     int
+  username*: Option[string]
+  password*: Option[string]
+type Config* = object
+  modbus*: ModbusConfig
+  mqtt*: MqttConfig
+  devices*: Table[string, uint8]
+  updateInterval*: int