ctucx.git: sdm2mqtt

Publish data from SDM120M meters via MQTT

1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
63 
64 
65 
66 
67 
68 
69 
70 
71 
72 
73 
74 
75 
76 
77 
78 
79 
80 
81 
82 
83 
84 
85 
86 
87 
88 
89 
90 
91 
92 
93 
94 
95 
96 
97 
98 
99 
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
import std/[asyncdispatch, asyncnet]
import std/[strutils, tables]
### modbus general ###

var mb_host {.threadvar.}:        string
var mb_port {.threadvar.}:        Port
var modbusSocket* {.threadvar.}:  AsyncSocket
var transaction_id {.threadvar.}: uint16
var transactions {.threadvar.}:   Table[uint16, proc(msg: string)]

proc mkPacket_mbTcp (mb_packet: string): string =
  inc(transaction_id)
  return parseHexStr(toHex(transaction_id) & toHex(0u16) & toHex(uint16(len(mb_packet)))) & mb_packet

proc reconnect () {.async.} =
  echo "verbindung putt, ich fix das mal"
  await sleepAsync(5000)
  echo "jetzt"
  modbusSocket = await asyncnet.dial(mb_host, mb_port)

proc readPacket_mbTcp (): Future[(uint16, string)] {.async.} =
  var res = ""

  res = await modbusSocket.recv(8)
  while res == "":
    await reconnect()
    res = await modbusSocket.recv(8)

  let transaction_id = fromHex[uint16](toHex(res[0..1]))
  let length = fromHex[uint16](toHex(res[4..5]))
  let function_code = cast[uint8](res[7])

  res = await modbusSocket.recv(int(length) - 2)
  while res == "":
    await reconnect()
    res = await modbusSocket.recv(8)

  if function_code >= 128u8:
    raise newException(OsError, "mordbus error: " & toHex(res))

  return (transaction_id, res)

proc processAnswers() {.async.} =
  while true:
    try:
      let (transaction_id, mb_packet) = await readPacket_mbTcp()
      transactions[transaction_id](mb_packet)
    except:
      let e = getCurrentException()
      echo("error while processing mordbus answer: ", e.msg)

proc doRequest[T] (req: string, parse_proc: proc(foo: string): T): Future[T] =
  var fut = newFuture[T]()

  let tcp_packet = mkPacket_mbTcp(req)
  asyncCheck modbusSocket.send(tcp_packet)
  transactions[transaction_id] = proc(answer: string) =
    #transactions.del(transaction_id)
    fut.complete(parse_proc(answer))

  return fut

proc retry[T] (req: string, parse_proc: proc(foo: string): T): Future[T] {.async.} =
  var retries = 5
  var res: T

  while retries > 0:
    try:
      res = await doRequest(req, parse_proc)
      return res
    except:
      retries = retries - 1
      let e = getCurrentException()
      echo("error while processing mordbus answer: ", e.msg)

### readInputRegisters ###

proc mkPacket_readInputRegisters (unit_id: uint8, address: uint16, count: uint16): string =
  return parseHexStr(toHex(unit_id) & toHex(4u8) & toHex(address) & toHex(count))

proc parsePacket_readInputRegisters (packet: string): seq[uint16] =
  var res: seq[uint16] = @[]
  let bytes = cast[uint8](packet[0])
  var i = 1
  while i < int(bytes):
    res.add(fromHex[uint16](toHex(packet[i..i+1])))
    i += 2
  return res

proc readInputRegisters* (unit_id: uint8, address: uint16, count: uint16): Future[seq[uint16]] {.async.} =
  return await retry(mkPacket_readInputRegisters(unit_id, address, count), parsePacket_readInputRegisters)

### readRegisters ###

proc mkPacket_readRegisters (unit_id: uint8, address: uint16, count: uint16): string =
  return parseHexStr(toHex(unit_id) & toHex(3u8) & toHex(address) & toHex(count))

proc parsePacket_readRegisters (packet: string): seq[uint16] =
  var res: seq[uint16] = @[]
  let bytes = cast[uint8](packet[0])
  var i = 1
  while i < int(bytes):
    res.add(fromHex[uint16](toHex(packet[i..i+1])))
    i += 2
  return res

proc readRegisters* (unit_id: uint8, address: uint16, count: uint16): Future[seq[uint16]] {.async.} =
  return await retry(mkPacket_readRegisters(unit_id, address, count), parsePacket_readRegisters)

### writeRegister ###

proc mkPacket_writeRegister (unit_id: uint8, address: uint16, value: uint16): string =
  return parseHexStr(toHex(unit_id) & toHex(6u8) & toHex(address) & toHex(value))

proc parsePacket_writeRegister (packet: string): bool =
  return true

proc writeRegister* (unit_id: uint8, address: uint16, value: uint16): Future[bool] {.async.} =
  return await retry(mkPacket_writeRegister(unit_id, address, value), parsePacket_writeRegister)

### conversion ###

proc mbFloatDCBA* (input: seq[uint16]): float32 =
  let i: uint32 = uint32(input[0]) * 65536u32 + uint32(input[1])
  return cast[float32](i)

### main ###

proc initModbus* (host: string, port: int) {.async.} =
  mb_host        = host
  mb_port        = Port(port)
  modbusSocket           = await asyncnet.dial(mb_host, mb_port)
  transaction_id = 0u16
  transactions   = initTable[uint16, proc(msg: string)]()

  asyncCheck processAnswers()