commit 39fecfe655a104cdee2a974abbe1c78a44ff6b06
parent 63b29fec04dc5ab42172f55d277c42168eee9932
Author: Leah (ctucx) <leah@ctu.cx>
Date: Thu, 18 Feb 2021 12:36:02 +0100
parent 63b29fec04dc5ab42172f55d277c42168eee9932
Author: Leah (ctucx) <leah@ctu.cx>
Date: Thu, 18 Feb 2021 12:36:02 +0100
improve connection handling
7 files changed, 59 insertions(+), 10 deletions(-)
diff --git a/config.json b/config.json @@ -74,7 +74,7 @@ "icon": "lightbulb", "type": "switches", "switches": [ - { "name": "Decke (agedunkelt)", "device": "modbus-10", "relay": 0 }, + { "name": "Decke (abgedunkelt)", "device": "modbus-10", "relay": 0 }, { "name": "Decke", "device": "modbus-10", "relay": 2 }, { "name": "Küche", "device": "modbus-10", "relay": 1 }, { "name": "Bett", "device": "modbus-10", "relay": 3 },
diff --git a/src/devices/lacrosseSensors.nim b/src/devices/lacrosseSensors.nim @@ -51,9 +51,10 @@ proc lacrosseConnectLoop () {.async.} = while true: try: let config = server.config.serverConfig.lacrosse.get - let sock = await asyncnet.dial(config.host, Port(config.port)) - await lacrosseHandleLoop(sock) + lacrosseSocket = await asyncnet.dial(config.host, Port(config.port)) + await lacrosseHandleLoop(lacrosseSocket) except: + lacrosseSocket.close() let e = getCurrentException() echo("error while connectiong to lacrosse relay: ", e.msg) await sleepAsync(1000)
diff --git a/src/frontend.nim b/src/frontend.nim @@ -8,7 +8,7 @@ proc processWsClient(req: Request) {.async,gcsafe.} = try: ws = await newWebsocket(req) - lastClientId += 1 + wsConnections.add(ws) setupPings(ws, 2) except: @@ -117,3 +117,12 @@ proc processRequest(req: Request) {.async,gcsafe.} = proc serveFrontend*() {.async.} = var httpServer = newAsyncHttpServer() await httpServer.serve(Port(server.config.serverConfig.frontendPort), processRequest) + #send empty paket every 2 seconds to every subscribed client + addTimer(2000, false, proc (fd: AsyncFD): bool {.gcsafe.} = + broadcast("") + ) + + #clean up broken websocket connections every 60 seconds + addTimer(60000, false, proc (fd: AsyncFD): bool {.gcsafe.} = + cleanupConnections() + )+ \ No newline at end of file
diff --git a/src/influx.nim b/src/influx.nim @@ -39,6 +39,8 @@ proc insertDatabase* (config: InfluxConfig, databaseName: string, tableName: str let response = await client.request(baseUrl & "write?db=" & databaseName, httpMethod = HttpPost, body = body) + client.close() + if response.code != Http204: return false
diff --git a/src/smartied.nim b/src/smartied.nim @@ -1,9 +1,21 @@ -import asyncdispatch, json, tables, os +import asyncdispatch, json, tables, os, posix import types, modbus, mqtt, influx, vars, utils, options import frontend import devices/[modbusPowermeter, modbusRelayboard, lacrosseSensors, zigbee2mqttLamp, zigbee2mqttRelay, zigbee2mqttRemote] +proc CtrlCHook() {.noconv.} = + echo "Ctrl+C fired! \nStopping Server now!" + closeOpenConnections() + quit() + proc main() {.async.} = + setControlCHook(CtrlCHook) + + onSignal(SIGTERM): + echo "Got SIGTERM! \nStopping Server now!" + closeOpenConnections() + quit() + var configFile = "./config.json" if getEnv("CONFIG_PATH") != "": @@ -13,7 +25,13 @@ proc main() {.async.} = echo "Config file not found" quit() - server = Server(config: parseFile(configFile).to(Config)) + try: + server = Server(config: parseFile(configFile).to(Config)) + except: + let e = getCurrentException() + echo "Error while parsing config: " & e.msg + quit() + if getEnv("ACCESS_TOKEN") != "": server.config.serverConfig.accessToken = getEnv("ACCESS_TOKEN")
diff --git a/src/utils.nim b/src/utils.nim @@ -1,6 +1,6 @@ -import asyncdispatch, sequtils, json, tables, math, options -import ws -import types, vars +import asyncdispatch, asyncnet, sequtils, json, tables, math, options +import ws, nmqtt +import types, vars, modbus proc initUtil*() = lastClientId = 1 @@ -25,6 +25,7 @@ proc fmtBool* (b: bool): string = proc cleanupConnections*() = # echo "subscribedConnections: " & $subscribedConnections.len subscribedConnections.keepIf(proc(x: Websocket): bool = x.readyState != Closed) + wsConnections.keepIf(proc(x: Websocket): bool = x.readyState != Closed) # echo "subscribedConnections(after clenup): " & $subscribedConnections.len proc broadcast* (msg: string) = @@ -58,3 +59,17 @@ proc checkAccessToken* (token: Option[string]): bool = return false return true + +proc closeOpenConnections* () = + #close mqtt connection + waitFor mqttContext.disconnect() + + #close modbus connection + deinitModbus() + + #close lacrosse socket + lacrosseSocket.close() + + #close all websocket connections + for wsConnection in wsConnections: + wsConnection.close()+ \ No newline at end of file
diff --git a/src/vars.nim b/src/vars.nim @@ -1,10 +1,12 @@ -import tables +import asyncnet, tables import ws, nmqtt import types var subscribedConnections* {.threadvar.}: seq[Websocket] +var wsConnections* {.threadvar.}: seq[Websocket] var server* {.threadvar.}: Server var mb* {.threadvar.}: modbus var mqttContext* {.threadvar.}: MqttCtx +var lacrosseSocket* {.threadvar.}: AsyncSocket var lastClientId* {.threadvar.}: int var zigbee2mqttDevices* {.threadvar.}: Table[string, string]