From ec8590c30f1dac03cbdb5f8040033dd1a63a0fc8 Mon Sep 17 00:00:00 2001 From: Patrick Amrhein Date: Thu, 11 Jul 2024 18:42:07 +0200 Subject: [PATCH] mqtt redesign mit subscribe / unsubcribe der topic (Teamwork tictrick + Daniel92) --- src/app.cpp | 187 +++++++++++++- src/app.h | 5 +- src/appInterface.h | 3 + src/config/settings.h | 1 + src/plugins/zeroExport/powermeter.h | 127 ++++++---- src/plugins/zeroExport/zeroExport.h | 365 ++++++++++++++-------------- src/publisher/pubMqtt.h | 98 +++----- src/web/RestApi.h | 60 ++++- src/web/html/setup.html | 4 +- src/web/lang.json | 15 ++ 10 files changed, 550 insertions(+), 315 deletions(-) diff --git a/src/app.cpp b/src/app.cpp index 4b1b2c81..5ebae6da 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -102,7 +102,7 @@ void app::setup() { if (mMqttEnabled) { mMqtt.setup(this, &mConfig->mqtt, mConfig->sys.deviceName, mVersion, &mSys, &mTimestamp, &mUptime); mMqtt.setConnectionCb(std::bind(&app::mqttConnectCb, this)); - mMqtt.setSubscriptionCb(std::bind(&app::mqttSubRxCb, this, std::placeholders::_1)); + mMqtt.setSubscriptionCb(std::bind(&app::mqttSubRxCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5)); mCommunication.addAlarmListener([this](Inverter<> *iv) { mMqtt.alarmEvent(iv); }); } #endif @@ -610,13 +610,177 @@ void app::mqttConnectCb(void) { } //----------------------------------------------------------------------------- -void app::mqttSubRxCb(JsonObject obj) { - mApi.ctrlRequest(obj); -#if defined(PLUGIN_ZEROEXPORT) - mZeroExport.onMqttMessage(obj); -#endif +void app::mqttSubRxCb(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) +{ + if (mConfig->serial.debug) + { + DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]); + DBGPRINTLN(String(topic)); + } + + #if defined(PLUGIN_ZEROEXPORT) + // FremdTopic ist für ZeroExport->Powermeter + // AhoyTopic ist für ZeroExport + if (mZeroExport.onMqttMessage(topic, payload, len)) return; + #endif + + // AhoyTopic ist für Ahoy + int baseTopicLen = strlen(mConfig->mqtt.topic) + strlen("/ctrl") + 1; + char baseTopic[baseTopicLen]; + + strcpy(baseTopic, mConfig->mqtt.topic); // copy mqtt.topic + strcat(baseTopic, "/ctrl"); // '/ctrl' concat + + if (strncmp(topic, baseTopic, strlen(baseTopic)) == 0) + { + DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]); + DBGPRINT("alles super"); + DBGPRINTLN(String(topic)); + + const char* p = topic + strlen(baseTopic); + + // extract number from topic + int IvID = -1; + while (*p) { + if (isdigit(*p)) { + IvID = atoi(p); + break; + } + p++; + } + + // reset to pointer with offset + p = topic + strlen(baseTopic); + + Inverter<> *iv = mSys.getInverterByPos(IvID); + String sPayload = String((const char*)payload).substring(0, len); + + // ???/ctrl/limit/+ 100 % oder 400 W + if (strncmp(p, "/limit", strlen("/limit")) == 0) { + // immer + DBGPRINT(String("limit ")); + DBGPRINTLN(String(IvID)); + + iv->powerLimit[0] = static_cast(sPayload.toInt() * 10.0); + + if (sPayload.endsWith("W")) + iv->powerLimit[1] = AbsolutNonPersistent; + else if (sPayload.endsWith("%")) + iv->powerLimit[1] = RelativNonPersistent; + + if (iv->setDevControlRequest(ActivePowerContr)) + triggerTickSend(iv->id); + + return; + } + + // ???/ctrl/power/+ 0/1 + if (strncmp(p, "/power", strlen("/power")) == 0) { + // immer + DBGPRINT(String("power ")); + DBGPRINTLN(String(IvID)); + + if (sPayload.equals("1") || sPayload.equals("true")) + { + if (iv->setDevControlRequest(TurnOn)) + triggerTickSend(iv->id); + } + else if (sPayload.equals("0") || sPayload.equals("false")) + { + if (iv->setDevControlRequest(TurnOff)) + triggerTickSend(iv->id); + } + return; + } + + // ???/ctrl/restart/+ 0/1 + if (strncmp(p, "/restart", strlen("/restart")) == 0) { + // mit NR = WR + if (IvID != -1) + { + DBGPRINT(String("restart Iv ")); + DBGPRINTLN(String(IvID)); + + if (sPayload.equals("1") || sPayload.equals("true")) + { + if (iv->setDevControlRequest(Restart)) { + triggerTickSend(iv->id); + } + mMqtt.publish(topic, "successful", false, QOS_2); + } + + } + // ohne NR = Ahoy + else + { + //TODO: set mqtt-topic back to false (=>successful)? wait a moment + DBGPRINTLN(String("restart Ahoy")); + if (sPayload.equals("1") || sPayload.equals("true")) + { + mMqtt.publish(topic, "successful", false, QOS_2); + yield(); + delay(1000); + yield(); + ESP.restart(); + } + } + return; + } + return; + } + +/// TODO: discuss setup??? + // ???/setup/set_time unix timestamp +/// TODO: Wunschdenken? + } +/* + bool limitAbs = false; + if(len > 0) { + char *pyld = new char[len + 1]; + memcpy(pyld, payload, len); + pyld[len] = '\0'; + if(NULL == strstr(topic, "limit")) + root[F("val")] = atoi(pyld); + else + root[F("val")] = atof(pyld); + + if(pyld[len-1] == 'W') + limitAbs = true; + delete[] pyld; + } + + const char *p = topic + strlen(mCfgMqtt->topic); + uint8_t pos = 0, elm = 0; + char tmp[30]; + + while(1) { + if(('/' == p[pos]) || ('\0' == p[pos])) { + memcpy(tmp, p, pos); + tmp[pos] = '\0'; + switch(elm++) { + case 1: root[F("path")] = String(tmp); break; + case 2: + if(strncmp("limit", tmp, 5) == 0) { + if(limitAbs) + root[F("cmd")] = F("limit_nonpersistent_absolute"); + else + root[F("cmd")] = F("limit_nonpersistent_relative"); + } else + root[F("cmd")] = String(tmp); + break; + case 3: root[F("id")] = atoi(tmp); break; + default: break; + } + if('\0' == p[pos]) + break; + p = p + pos + 1; + pos = 0; + } + pos++; + } +*/ //----------------------------------------------------------------------------- void app::setupLed(void) { uint8_t led_off = (mConfig->led.high_active) ? 0 : 255; @@ -663,3 +827,14 @@ void app::updateLed(void) { analogWrite(mConfig->led.led[2], led_off); } } + + + + +void app::subscribe(const char *subTopic, uint8_t qos) { + mMqtt.subscribe(subTopic, qos); +} + +void app::unsubscribe(const char *subTopic) { + mMqtt.unsubscribe(subTopic); +} \ No newline at end of file diff --git a/src/app.h b/src/app.h index f432e52f..18d656c3 100644 --- a/src/app.h +++ b/src/app.h @@ -360,6 +360,9 @@ class app : public IApp, public ah::Scheduler { } #endif + void subscribe(const char *subTopic, uint8_t qos = QOS_0); + void unsubscribe(const char *subTopic); + private: #define CHECK_AVAIL true #define SKIP_YIELD_DAY true @@ -381,7 +384,7 @@ class app : public IApp, public ah::Scheduler { } void mqttConnectCb(void); - void mqttSubRxCb(JsonObject obj); + void mqttSubRxCb(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total); void setupLed(); void updateLed(); diff --git a/src/appInterface.h b/src/appInterface.h index f2292ec8..e81580cf 100644 --- a/src/appInterface.h +++ b/src/appInterface.h @@ -73,6 +73,9 @@ class IApp { virtual void addValueToHistory(uint8_t historyType, uint8_t valueType, uint32_t value) = 0; #endif virtual void* getRadioObj(bool nrf) = 0; + + virtual void subscribe(const char *subTopic, uint8_t qos) = 0; + virtual void unsubscribe(const char *subTopic) = 0; }; #endif /*__IAPP_H__*/ diff --git a/src/config/settings.h b/src/config/settings.h index eda4a7b4..120177c4 100644 --- a/src/config/settings.h +++ b/src/config/settings.h @@ -221,6 +221,7 @@ typedef struct { // Plugin ZeroExport #if defined(PLUGIN_ZEROEXPORT) +//#define ZEROEXPORT_DEBUG #define ZEROEXPORT_MAX_QUEUE_ENTRIES 64 #define ZEROEXPORT_MAX_GROUPS 8 #define ZEROEXPORT_GROUP_MAX_LEN_NAME 25 diff --git a/src/plugins/zeroExport/powermeter.h b/src/plugins/zeroExport/powermeter.h index ed49950e..17bce333 100644 --- a/src/plugins/zeroExport/powermeter.h +++ b/src/plugins/zeroExport/powermeter.h @@ -20,6 +20,7 @@ #include "plugins/zeroExport/lib/sml.h" #include "utils/DynamicJsonHandler.h" + typedef struct { const unsigned char OBIS[6]; void (*Fn)(double &); @@ -46,9 +47,10 @@ class powermeter { * @param *log * @returns void */ - bool setup(IApp *app, zeroExport_t *cfg, PubMqttType *mqtt, DynamicJsonHandler *log) { + bool setup(IApp *app, zeroExport_t *cfg, settings_t *config, PubMqttType *mqtt, DynamicJsonHandler *log) { mApp = app; mCfg = cfg; + mConfig = config; mMqtt = mqtt; mLog = log; @@ -65,7 +67,9 @@ class powermeter { if (millis() - mPreviousTsp <= 1000) return; // skip when it is to fast mPreviousTsp = millis(); - if (mCfg->debug) DBGPRINTLN(F("pm Takt:")); + #ifdef ZEROEXPORT_DEBUG + if (mCfg->debug) DBGPRINTLN(F("pm Takt:")); + #endif /*ZEROEXPORT_DEBUG*/ bool result = false; float power = 0.0; @@ -76,7 +80,9 @@ class powermeter { if ((millis() - mCfg->groups[group].pm_peviousTsp) < ((uint16_t)mCfg->groups[group].pm_refresh * 1000)) continue; mCfg->groups[group].pm_peviousTsp = millis(); - if (mCfg->debug) DBGPRINTLN(F("pm Do:")); + #ifdef ZEROEXPORT_DEBUG + if (mCfg->debug) DBGPRINTLN(F("pm Do:")); + #endif /*ZEROEXPORT_DEBUG*/ result = false; power = 0.0; @@ -120,11 +126,9 @@ class powermeter { mCfg->groups[group].power = power; // MQTT - Powermeter -/// BUG: 002 Anfang - Muss dieser Teil raus? Führt er zu abstürzen wie BUG 001? if (mMqtt->isConnected()) { mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false); } -/// BUG: 002 Ende } } } @@ -203,50 +207,63 @@ class powermeter { /** onMqttMessage * This function is needed for all mqtt connections between ahoy and other devices. */ - void onMqttMessage(JsonObject obj) { - String topic = String(obj["topic"]); - -#if defined(ZEROEXPORT_POWERMETER_MQTT) - - for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) { - if (!mCfg->groups[group].enabled) continue; - - if (!mCfg->groups[group].pm_type == zeroExportPowermeterType_t::Mqtt) continue; + bool onMqttMessage(const char* topic, const uint8_t* payload, size_t len) + { + bool result = false; - if (!strcmp(mCfg->groups[group].pm_src, "")) continue; + #if defined(ZEROEXPORT_POWERMETER_MQTT) + for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) + { + if (!mCfg->groups[group].enabled) continue; + if (!mCfg->groups[group].pm_type == zeroExportPowermeterType_t::Mqtt) continue; + if (!strcmp(mCfg->groups[group].pm_src, "")) continue; + if (strcmp(mCfg->groups[group].pm_src, topic) != 0) continue; // strcmp liefert 0 wenn gleich + + float power = 0.0; + String sPayload = String((const char*)payload).substring(0, len); + + if (sPayload.startsWith("{") && sPayload.endsWith("}") || sPayload.startsWith("[") && sPayload.endsWith("]")) + { + #ifdef ZEROEXPORT_DEBUG + DPRINTLN(DBG_INFO, String("ze: mqtt powermeter val: ") + sPayload); + #endif /*ZEROEXPORT_DEBUG*/ + + DynamicJsonDocument datajson(2048); // TODO: JSON größe dynamisch machen? + if(!deserializeJson(datajson, sPayload.c_str())) + { + #ifdef ZEROEXPORT_DEBUG + DPRINTLN(DBG_INFO, String("ze: mqtt powermeter deserialize ok")); + DPRINTLN(DBG_INFO, String(datajson.as())); + #endif /*ZEROEXPORT_DEBUG*/ + power = extractJsonKey(datajson, mCfg->groups[group].pm_jsonPath); + } - if (strcmp(mCfg->groups[group].pm_src, String(topic).c_str())) continue; + } + else + { + #ifdef ZEROEXPORT_DEBUG + DPRINTLN(DBG_INFO, String("ze: mqtt powermeter kein json")); + #endif /*ZEROEXPORT_DEBUG*/ + power = sPayload.toFloat(); + } - float power = 0.0; + bufferWrite(power, group); + mCfg->groups[group].power = power; - DynamicJsonDocument datajson(512); - if (!deserializeJson(datajson, String(obj["val"]))) - { - switch (mCfg->groups[group].pm_target) { - case 0: power = (float)datajson["total_act_power"]; break; - case 1: power = (float)datajson["a_act_power"]; break; - case 2: power = (float)datajson["b_act_power"]; break; - case 3: power = (float)datajson["c_act_power"]; break; + // MQTT - Powermeter + DPRINTLN(DBG_INFO, String("ze: mqtt powermeter ") + String(power)); + if (mCfg->debug) { + if (mMqtt->isConnected()) { + mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false); + } } - } else { - power = (float)obj["val"]; + + result = true; } - bufferWrite(power, group); - mCfg->groups[group].power = power; // TODO: join two sites together (PM & MQTT) - - // MQTT - Powermeter -DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); -/// BUG: 001 Anfang - Dieser Teil ist deaktiviert weil er zu abstürzen der DTU führt -// if (mCfg->debug) { -// if (mMqtt->isConnected()) { -// mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false); -// } -// } -/// BUG: 001 Ende - } + #endif /*defined(ZEROEXPORT_POWERMETER_MQTT)*/ -#endif /*defined(ZEROEXPORT_POWERMETER_MQTT)*/ + return result; } private: @@ -261,6 +278,11 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); mMqtt->subscribe(gr.c_str(), QOS_2); } + /*uint16_t mqttUnsubscribe(const char *subTopic,) + { TODO: hier weiter? + return mMqtt->unsubscribe(topic); // add as many topics as you like + }*/ + /** mqttPublish * when a MQTT Msg is needed to Publish, but not to subscribe. * @param gr @@ -275,8 +297,9 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); HTTPClient http; zeroExport_t *mCfg; + settings_t *mConfig = nullptr; PubMqttType *mMqtt = nullptr; - DynamicJsonHandler* mLog; + DynamicJsonHandler *mLog; IApp *mApp = nullptr; unsigned long mPreviousTsp = millis(); @@ -292,8 +315,8 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); */ void setHeader(HTTPClient *h, String auth = "", u8_t realm = 0) { h->setFollowRedirects(HTTPC_STRICT_FOLLOW_REDIRECTS); -/// h->setUserAgent("Ahoy-Agent"); -/// // TODO: Ahoy-0.8.850024-zero + /// h->setUserAgent("Ahoy-Agent"); + /// // TODO: Ahoy-0.8.850024-zero h->setUserAgent(mApp->getVersion()); h->setConnectTimeout(500); h->setTimeout(1000); @@ -322,7 +345,6 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); */ - /*if (auth != NULL && realm) http.addHeader("WWW-Authenticate", "Digest qop=\"auth\", realm=\"" + shellypro4pm-f008d1d8b8b8 + "\", nonce=\"60dc59c6\", algorithm=SHA-256"); else if (auth != NULL) http.addHeader("Authorization", "Basic " + auth);*/ /* @@ -338,6 +360,20 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); */ } + /** + * + * + */ + float extractJsonKey(DynamicJsonDocument data, const char* key) + { + if (data.containsKey(key)) + return (float)data[key]; + else { + DPRINTLN(DBG_INFO, String("ze: mqtt powermeter deserialize no key ") + String(key)); + return 0.0F; + } + } + #if defined(ZEROEXPORT_POWERMETER_SHELLY) /** getPowermeterWattsShelly * ... @@ -578,6 +614,7 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power)); switch (smlCurrentState) { case SML_FINAL: *power = _powerMeterTotal; +// TODO: pm_taget auswerten und damit eine Regelung auf Sum, L1, L2, L3 ermöglichen (setup.html nicht vergessen) result = true; break; case SML_LISTEND: diff --git a/src/plugins/zeroExport/zeroExport.h b/src/plugins/zeroExport/zeroExport.h index b522212a..356b14e0 100644 --- a/src/plugins/zeroExport/zeroExport.h +++ b/src/plugins/zeroExport/zeroExport.h @@ -15,6 +15,9 @@ #include "AsyncJson.h" #include "powermeter.h" #include "utils/DynamicJsonHandler.h" +#include "utils/mqttHelper.h" + +using namespace mqttHelper; template @@ -48,15 +51,9 @@ class ZeroExport { mApi = api; mMqtt = mqtt; - mIsInitialized = mPowermeter.setup(mApp, mCfg, mqtt, &_log); + mIsInitialized = mPowermeter.setup(mApp, mCfg, mConfig, mMqtt, &_log); } - /*void printJson() { - serializeJson(doc, Serial); - Serial.println(); - serializeJsonPretty(doc, Serial); - }*/ - /** loop * Arbeitsschleife * @param void @@ -75,13 +72,17 @@ class ZeroExport { if (mLastRun > (Tsp - 1000)) return; mLastRun = Tsp; - if (mCfg->debug) DBGPRINTLN(F("Takt:")); + #ifdef ZEROEXPORT_DEBUG + if (mCfg->debug) DBGPRINTLN(F("Takt:")); + #endif /*ZEROEXPORT_DEBUG*/ // Exit if Queue is empty zeroExportQueue_t Queue; if (!getQueue(&Queue)) return; - if (mCfg->debug) DBGPRINTLN(F("Queue:")); + #ifdef ZEROEXPORT_DEBUG + if (mCfg->debug) DBGPRINTLN(F("Queue:")); + #endif /*ZEROEXPORT_DEBUG*/ // Load Data from Queue uint8_t group = Queue.group; @@ -641,6 +642,8 @@ class ZeroExport { * @returns void */ void onMqttConnect(void) { + mMqtt->subscribe("zero/ctrl/#", QOS_2); + if (!mCfg->enabled) return; mPowermeter.onMqttConnect(); @@ -662,150 +665,31 @@ class ZeroExport { * @param * @returns void */ - void onMqttMessage(JsonObject obj) { - if (!mIsInitialized) return; - - mPowermeter.onMqttMessage(obj); - - String topic = String(obj["topic"]); - - // "topic":"userdefined battSoCTopic" oder "userdefinedUTopic" - for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) { - if (!mCfg->groups[group].enabled) continue; - - if ((!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttU) && (!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttSoC)) continue; - - if (!strcmp(mCfg->groups[group].battTopic, "")) continue; - - if (strcmp(mCfg->groups[group].battTopic, String(topic).c_str())) { - mCfg->groups[group].battValue = (bool)obj["val"]; - - _log.addProperty("k", mCfg->groups[group].battTopic); - _log.addProperty("v", mCfg->groups[group].battValue); - } - } - - // "topic":"ctrl/zero" - if (topic.indexOf("ctrl/zero") == -1) return; - - _log.addProperty("d", obj); - - if (obj["path"] == "ctrl" && obj["cmd"] == "zero") { - int8_t topicGroup = getGroupFromTopic(topic.c_str()); - int8_t topicInverter = getInverterFromTopic(topic.c_str()); - - if (topicGroup != -1) { - _log.addProperty("g", topicGroup); - } - if (topicInverter == -1) { - _log.addProperty("i", topicInverter); - } - - _log.addProperty("k", topic); - - // "topic":"ctrl/zero/enabled" - if (topic.indexOf("ctrl/zero/enabled") != -1) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->enabled = (bool)obj["val"]; - } - - // "topic":"ctrl/zero/sleep" - else if (topic.indexOf("ctrl/zero/sleep") != -1) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->sleep = (bool)obj["val"]; - } - - else if ((topicGroup >= 0) && (topicGroup < ZEROEXPORT_MAX_GROUPS)) { - String stopicGroup = String(topicGroup); - - // "topic":"ctrl/zero/groups/+/enabled" - if (topic.endsWith("/enabled")) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->groups[topicGroup].enabled = (bool)obj["val"]; - } - - // "topic":"ctrl/zero/groups/+/sleep" - else if (topic.endsWith("/sleep")) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->groups[topicGroup].sleep = (bool)obj["val"]; - } - - // Auf Eis gelegt, dafür 2 Gruppen mehr - // 0.8.103008.2 - // // "topic":"ctrl/zero/groups/+/pm_ip" - // if (topic.indexOf("ctrl/zero/groups/" + String(topicGroup) + "/pm_ip") != -1) { - // snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", obj[F("val")].as()); - /// TODO: - // snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", obj[F("val")].as()); - // strncpy(mCfg->groups[topicGroup].pm_src, obj[F("val")], ZEROEXPORT_GROUP_MAX_LEN_PM_SRC); - // strncpy(mCfg->groups[topicGroup].pm_src, String(obj[F("val")]).c_str(), ZEROEXPORT_GROUP_MAX_LEN_PM_SRC); - // snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", String(obj[F("val")]).c_str()); - // mLog["k"] = "ctrl/zero/groups/" + String(topicGroup) + "/pm_ip"; - // mLog["v"] = mCfg->groups[topicGroup].pm_src; - // } - // - // // "topic":"ctrl/zero/groups/+/pm_jsonPath" - // if (topic.indexOf("ctrl/zero/groups/" + String(topicGroup) + "/pm_jsonPath") != -1) { - /// TODO: - // snprintf(mCfg->groups[topicGroup].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, "%s", obj[F("val")].as()); - // mLog["k"] = "ctrl/zero/groups/" + String(topicGroup) + "/pm_jsonPath"; - // mLog["v"] = mCfg->groups[topicGroup].pm_jsonPath; - // } - - // "topic":"ctrl/zero/groups/+/battery/switch" - else if (topic.endsWith("/battery/switch")) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->groups[topicGroup].battSwitch = (bool)obj["val"]; - } - - else if (topic.indexOf("/advanced/") != -1) { - // "topic":"ctrl/zero/groups/+/advanced/setPoint" - if (topic.endsWith("/setPoint")) { - _log.addProperty("v", (int16_t)obj["val"]); - mCfg->groups[topicGroup].setPoint = (int16_t)obj["val"]; - } - - // "topic":"ctrl/zero/groups/+/advanced/powerTolerance" - else if (topic.endsWith("/powerTolerance")) { - _log.addProperty("v", (uint8_t)obj["val"]); - mCfg->groups[topicGroup].powerTolerance = (uint8_t)obj["val"]; - } - - // "topic":"ctrl/zero/groups/+/advanced/powerMax" - else if (topic.endsWith("/powerMax")) { - _log.addProperty("v", (uint16_t)obj["val"]); - mCfg->groups[topicGroup].powerMax = (uint16_t)obj["val"]; - } - } else if (topic.indexOf("/inverter/") != -1) { - if ((topicInverter >= 0) && (topicInverter < ZEROEXPORT_GROUP_MAX_INVERTERS)) { - // "topic":"ctrl/zero/groups/+/inverter/+/enabled" - if (topic.endsWith("/enabled")) { - _log.addProperty("v", (bool)obj["val"]); - mCfg->groups[topicGroup].inverters[topicInverter].enabled = (bool)obj["val"]; - } - - // "topic":"ctrl/zero/groups/+/inverter/+/powerMin" - else if (topic.endsWith("/powerMin")) { - _log.addProperty("v", (uint16_t)obj["val"]); - mCfg->groups[topicGroup].inverters[topicInverter].powerMin = (uint16_t)obj["val"]; - } - // "topic":"ctrl/zero/groups/+/inverter/+/powerMax" - else if (topic.endsWith("/powerMax")) { - _log.addProperty("v", (uint16_t)obj["val"]); - mCfg->groups[topicGroup].inverters[topicInverter].powerMax = (uint16_t)obj["val"]; - } else { - _log.addProperty("k", "error"); - } - } - } else { - _log.addProperty("k", "error"); + bool onMqttMessage(const char* topic, const uint8_t* payload, size_t len) + { + // Check if ZE is init, when not, directly out of here! + if (!mIsInitialized) return false; + + bool result = true; + + // FremdTopic "topic":"userdefined power" ist für ZeroExport->Powermeter + if (!mPowermeter.onMqttMessage(topic, payload, len)) + { + // FremdTopic "topic":"userdefined battSoCTopic" oder "userdefinedUTopic" ist für ZeroExport(Batterie) + if (!onMqttMessageBattery(topic, payload, len)) + { + // LokalerTopic "topic": ???/zero ist für ZeroExport + if (!onMqttMessageZeroExport(topic, payload, len)) + { + result = false; } } } sendLog(); clearLog(); - return; + + return result; } private: @@ -833,42 +717,6 @@ class ZeroExport { return true; } - /** getGroupFromTopic - * Extahiert die Gruppe aus dem mqttTopic. - * @param *topic - * @returns group - */ - int8_t getGroupFromTopic(const char *topic) { - const char *pGroupSection = strstr(topic, "groups/"); - if (pGroupSection == NULL) return -1; - pGroupSection += 7; - char strGroup[3]; - uint8_t digitsCopied = 0; - while (*pGroupSection != '/' && digitsCopied < 2) strGroup[digitsCopied++] = *pGroupSection++; - strGroup[digitsCopied] = '\0'; - int8_t group = atoi(strGroup); - - _log.addProperty("getGroupFromTopic", "group"); - return group; - } - - /** getInverterFromTopic - * Extrahiert dden Inverter aus dem mqttTopic - * @param *topic - * @returns inv - */ - int8_t getInverterFromTopic(const char *topic) { - const char *pInverterSection = strstr(topic, "inverters/"); - if (pInverterSection == NULL) return -1; - pInverterSection += 10; - char strInverter[3]; - uint8_t digitsCopied = 0; - while (*pInverterSection != '/' && digitsCopied < 2) strInverter[digitsCopied++] = *pInverterSection++; - strInverter[digitsCopied] = '\0'; - int8_t inverter = atoi(strInverter); - return inverter; - } - /** mqttSubscribe * when a MQTT Msg is needed to subscribe, then a publish is leading * @param gr @@ -891,6 +739,154 @@ class ZeroExport { mMqtt->publish(gr.c_str(), payload.c_str(), retain); } + /** onMqttMessageBattery + * Subscribe section + * @param + * @returns void + */ + bool onMqttMessageBattery(const char* topic, const uint8_t* payload, size_t len) { + // check if topic is Fremdtopic + String baseTopic = String(mConfig->mqtt.topic); + + if (strncmp(topic, baseTopic.c_str(), baseTopic.length()) != 0) + { + + for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) { + if (!mCfg->groups[group].enabled) continue; + + if ((!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttU) && (!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttSoC)) continue; + + if (!strcmp(mCfg->groups[group].battTopic, "")) continue; + + if (checkIntegerProperty(topic, mCfg->groups[group].battTopic, payload, len, &mCfg->groups[group].battValue, &_log)) return true; + + } + + } + + return false; + } + + /** onMqttMessageZeroExport + * Subscribe section + * @param + * @returns true when topic is for this class specified or false when its not fit in here. + */ + bool onMqttMessageZeroExport(const char* topic, const uint8_t* payload, size_t len) + { + // check if topic is for zeroExport + String baseTopic = String(mConfig->mqtt.topic) + String("/zero/ctrl"); + + if (strncmp(topic, baseTopic.c_str(), baseTopic.length()) == 0) + { + _log.addProperty("k", topic); + + const char* p = topic + strlen(baseTopic.c_str()); + + // "topic":"???/zero/ctrl/enabled" + if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->enabled, &_log)) return true; +// reconnect + // "topic":"???/zero/ctrl/sleep" + if (checkBoolProperty(p, "/sleep", payload, len, &mCfg->sleep, &_log)) return true; + + // "topic":"???/zero/ctrl/groups" + if (strncmp(p, "/groups", strlen("/groups")) == 0) { + + baseTopic += String("/groups"); // add '/groups' + p = topic + strlen(baseTopic.c_str()); + + // extract number from topic + int topicGroup = -1; + while (*p) { + if (isdigit(*p)) { + topicGroup = atoi(p); + break; + } + p++; + } + + // reset to pointer with offset + p = topic + strlen(baseTopic.c_str()); + + #ifdef ZEROEXPORT_DEBUG + DBGPRINT(String("groups ")); + DBGPRINTLN(String(topicGroup)); + #endif /*ZEROEXPORT_DEBUG*/ + + baseTopic += String("/") + String(topicGroup); // add '/+' + p = topic + strlen(baseTopic.c_str()); + + // "topic":"???/zero/ctrl/groups/+/enabled" + if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->groups[topicGroup].enabled, &_log)) return true; +// Reconnect + // "topic":"???/zero/ctrl/groups/+/sleep" + if (checkBoolProperty(p, "/sleep", payload, len, &mCfg->groups[topicGroup].sleep, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/pm_ip" + if (checkCharProperty(p, "/pm_ip", payload, len, mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, &_log)) return true; +// Reconnect + + // "topic":"???/zero/ctrl/groups/+/pm_jsonPath" + if (checkCharProperty(p, "/pm_jsonPath", payload, len, mCfg->groups[topicGroup].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, &_log)) return true; +// Reconnect + + // "topic":"???/zero/ctrl/groups/+/battery/switch" + if (checkBoolProperty(p, "/battery/switch", payload, len, &mCfg->groups[topicGroup].battSwitch, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/advanced/setPoint" + if (checkIntegerProperty(p, "/advanced/setPoint", payload, len, &mCfg->groups[topicGroup].setPoint, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/advanced/powerTolerance" + if (checkIntegerProperty(p, "/advanced/powerTolerance", payload, len, &mCfg->groups[topicGroup].powerTolerance, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/advanced/powerMax" + if (checkIntegerProperty(p, "/advanced/powerMax", payload, len, &mCfg->groups[topicGroup].powerMax, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/inverter" + if (strncmp(p, "/inverter", strlen("/inverter")) == 0) { + + baseTopic += String("/inverter"); // add '/inverter' + p = topic + strlen(baseTopic.c_str()); + + // extract number from topic + int topicInverter = -1; + while (*p) { + if (isdigit(*p)) { + topicInverter = atoi(p); + break; + } + p++; + } + + // reset to pointer with offset + p = topic + strlen(baseTopic.c_str()); + + #ifdef ZEROEXPORT_DEBUG + DBGPRINT(String("inverter ")); + DBGPRINTLN(String(topicInverter)); + #endif /*ZEROEXPORT_DEBUG*/ + + baseTopic += String("/") + String(topicInverter); // add '/+' + p = topic + strlen(baseTopic.c_str()); + + // "topic":"???/zero/ctrl/groups/+/inverter/+/enabled" + if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].enabled, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/inverter/+/powerMin" + if (checkIntegerProperty(p, "/powerMin", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].powerMin, &_log)) return true; + + // "topic":"???/zero/ctrl/groups/+/inverter/+/powerMax" + if (checkIntegerProperty(p, "/powerMax", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].powerMax, &_log)) return true; + + } + + return true; + } + } + + return false; + } + /** sendLog * Sendet den LogSpeicher über Webserial und/oder MQTT */ @@ -915,6 +911,9 @@ class ZeroExport { _log.clear(); } + + + // private member variables bool mIsInitialized = false; diff --git a/src/publisher/pubMqtt.h b/src/publisher/pubMqtt.h index 90750881..ebe03a52 100644 --- a/src/publisher/pubMqtt.h +++ b/src/publisher/pubMqtt.h @@ -28,7 +28,7 @@ #include "pubMqttDefs.h" #include "pubMqttIvData.h" -typedef std::function subscriptionCb; +typedef std::function subscriptionCb; typedef std::function connectionCb; typedef struct { @@ -115,9 +115,16 @@ class PubMqtt { queue.swap(mReceiveQueue); xSemaphoreGive(mutex); - while (!queue.empty()) { + while (!queue.empty()) + { +#warning "TODO: sTopic und sPayload hier fällen und dann übergeben?"; +#warning "Ist es wirklich so, dass die Payload eines Topics aus mehreren Nachrichten bestehen kann und erst zusammengesetzt werden muss? Was ist dann mit der Reihenfolge?"; message_s *entry = &queue.front(); - handleMessage(entry->topic, entry->payload, entry->len, entry->index, entry->total); + if(NULL != mSubscriptionCb) + { + (mSubscriptionCb)(entry->topic, entry->payload, entry->len, entry->index, entry->total); + mRxCnt++; + } queue.pop(); } @@ -244,6 +251,12 @@ class PubMqtt { mClient.subscribe(topic, qos); } + // new - need to unsubscribe the topics. + void unsubscribe(const char *subTopic) + { + mClient.unsubscribe(subTopic); // add as many topics as you like + } + void subscribeExtern(const char *subTopic, uint8_t qos = QOS_0) { char topic[MQTT_TOPIC_LEN + 20]; snprintf(topic, (MQTT_TOPIC_LEN + 20), "%s", subTopic); @@ -339,77 +352,24 @@ class PubMqtt { } } - void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { - if(len == 0) + void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) + { +#warning "TODO: if aktivieren nach Logprüfung. Was bedeutet index und total?"; +// if (total != 1) { +// DPRINTLN(DBG_ERROR, String("pubMqtt.h: onMessage ERROR: index=") + String(index) + String(" total=") + String(total)); +// return; +// } + + if (len == 0) { + DPRINT(DBG_INFO, String("MQTT-topic: ")); + DPRINT(DBG_INFO, String(topic)); + DPRINTLN(DBG_INFO, String(" is empty.")); return; + } xSemaphoreTake(mutex, portMAX_DELAY); mReceiveQueue.push(message_s(topic, payload, len, index, total)); xSemaphoreGive(mutex); - - } - - inline void handleMessage(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { - DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]); - DBGPRINTLN(String(topic)); - if(NULL == mSubscriptionCb) - return; - - DynamicJsonDocument json(128); - JsonObject root = json.to(); - root["topic"] = String(topic); - - bool limitAbs = false; - if(len > 0) { - char *pyld = new char[len + 1]; - memcpy(pyld, payload, len); - pyld[len] = '\0'; - if(NULL == strstr(topic, "limit")) - root[F("val")] = atoi(pyld); - else - root[F("val")] = atof(pyld); - - if(pyld[len-1] == 'W') - limitAbs = true; - delete[] pyld; - } - - const char *p = topic + strlen(mCfgMqtt->topic); - uint8_t pos = 0, elm = 0; - char tmp[30]; - - while(1) { - if(('/' == p[pos]) || ('\0' == p[pos])) { - memcpy(tmp, p, pos); - tmp[pos] = '\0'; - switch(elm++) { - case 1: root[F("path")] = String(tmp); break; - case 2: - if(strncmp("limit", tmp, 5) == 0) { - if(limitAbs) - root[F("cmd")] = F("limit_nonpersistent_absolute"); - else - root[F("cmd")] = F("limit_nonpersistent_relative"); - } else - root[F("cmd")] = String(tmp); - break; - case 3: root[F("id")] = atoi(tmp); break; - default: break; - } - if('\0' == p[pos]) - break; - p = p + pos + 1; - pos = 0; - } - pos++; - } - - /*char out[128]; - serializeJson(root, out, 128); - DPRINTLN(DBG_INFO, "json: " + String(out));*/ - (mSubscriptionCb)(root); - - mRxCnt++; } void discoveryConfigLoop(void) { diff --git a/src/web/RestApi.h b/src/web/RestApi.h index a6662181..71a8a304 100644 --- a/src/web/RestApi.h +++ b/src/web/RestApi.h @@ -46,6 +46,7 @@ class RestApi { mRadioCmt = (CmtRadio<>*)mApp->getRadioObj(false); #endif mConfig = config; + #if defined(ENABLE_HISTORY_LOAD_DATA) mSrv->on("/api/addYDHist", HTTP_POST, std::bind(&RestApi::onApiPost, this, std::placeholders::_1), @@ -65,14 +66,14 @@ class RestApi { return mTimezoneOffset; } - void ctrlRequest(JsonObject obj) { - DynamicJsonDocument json(128); - JsonObject dummy = json.as(); - if(obj[F("path")] == "ctrl") - setCtrl(obj, dummy, "*"); - else if(obj[F("path")] == "setup") - setSetup(obj, dummy, "*"); - } +// void ctrlRequest(const char* topic, const uint8_t* payload, size_t len) { +// DynamicJsonDocument json(128); +// JsonObject dummy = json.as(); +// if(obj[F("path")] == "ctrl") +// setCtrl(obj, dummy, "*"); +// else if(obj[F("path")] == "setup") +// setSetup(obj, dummy, "*"); +// } private: void onApi(AsyncWebServerRequest *request) { @@ -884,6 +885,7 @@ class RestApi { // General objGroup[F("id")] = (uint8_t)group; objGroup[F("enabled")] = (bool)mConfig->plugin.zeroExport.groups[group].enabled; + objGroup[F("sleep")] = (bool)mConfig->plugin.zeroExport.groups[group].sleep; objGroup[F("name")] = String(mConfig->plugin.zeroExport.groups[group].name); // Powermeter objGroup[F("pm_refresh")] = (uint8_t)mConfig->plugin.zeroExport.groups[group].pm_refresh; @@ -1202,7 +1204,25 @@ class RestApi { // Powermeter mConfig->plugin.zeroExport.groups[group].pm_refresh = jsonIn[F("pm_refresh")]; mConfig->plugin.zeroExport.groups[group].pm_type = jsonIn[F("pm_type")]; - snprintf(mConfig->plugin.zeroExport.groups[group].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", jsonIn[F("pm_src")].as()); + + // pm_src + const char *neu = jsonIn[F("pm_src")].as(); + if (strncmp(mConfig->plugin.zeroExport.groups[group].pm_src, neu, strlen(neu)) != 0) { + // unsubscribe + if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportPowermeterType_t::Mqtt) + { + mApp->unsubscribe(mConfig->plugin.zeroExport.groups[group].pm_src); + + } + // save + snprintf(mConfig->plugin.zeroExport.groups[group].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", jsonIn[F("pm_src")].as()); + // subsrcribe + if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportPowermeterType_t::Mqtt) + { + mApp->subscribe(mConfig->plugin.zeroExport.groups[group].pm_src, QOS_2); + } + } + snprintf(mConfig->plugin.zeroExport.groups[group].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, "%s", jsonIn[F("pm_jsonPath")].as()); @@ -1224,7 +1244,27 @@ class RestApi { } // Battery mConfig->plugin.zeroExport.groups[group].battCfg = jsonIn[F("battCfg")]; - snprintf(mConfig->plugin.zeroExport.groups[group].battTopic, ZEROEXPORT_GROUP_MAX_LEN_BATT_TOPIC, "%s", jsonIn[F("battTopic")].as()); + + // battTopic + const char *battneu = jsonIn[F("battTopic")].as(); + if (strncmp(mConfig->plugin.zeroExport.groups[group].battTopic, battneu, strlen(battneu)) != 0) { + // unsubscribe + if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttSoC || + mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttU ) + { + mApp->unsubscribe(mConfig->plugin.zeroExport.groups[group].battTopic); + + } + // save + snprintf(mConfig->plugin.zeroExport.groups[group].battTopic, ZEROEXPORT_GROUP_MAX_LEN_BATT_TOPIC, "%s", jsonIn[F("battTopic")].as()); + // subsrcribe + if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttSoC || + mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttU) + { + mApp->subscribe(mConfig->plugin.zeroExport.groups[group].battTopic, QOS_2); + } + } + mConfig->plugin.zeroExport.groups[group].battLimitOn = jsonIn[F("battLimitOn")]; mConfig->plugin.zeroExport.groups[group].battLimitOff = jsonIn[F("battLimitOff")]; // Advanced diff --git a/src/web/html/setup.html b/src/web/html/setup.html index 323d161d..3e434464 100644 --- a/src/web/html/setup.html +++ b/src/web/html/setup.html @@ -1671,7 +1671,7 @@ } else if(value == "Mqtt") { divsToHide.childNodes[1].style.display = 'none'; - divsToHide.childNodes[4].style.display = 'none'; + divsToHide.childNodes[2].style.display = 'none'; divsToHide.childNodes[5].style.display = 'none'; divsToHide.childNodes[6].style.display = 'none'; } @@ -1835,6 +1835,7 @@ lines.push(ml("tr", {}, [ ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_ENABLED}"), + ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_MODE}"), ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_ID}"), ml("th", {style: "text-align: center;"}, "{#ZE_GROUP_NAME}"), ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_POWERTOTAL}"), @@ -1845,6 +1846,7 @@ for(let group = 0; group < obj.groups.length; group++) { lines.push(ml("tr", {}, [ ml("td", {style: "text-align: left;", }, badge(obj.groups[group].enabled, (obj.groups[group].enabled) ? "{#ENABLED}" : "{#DISABLED}")), + ml("td", {style: "text-align: left;", }, badge(!obj.groups[group].sleep, (obj.groups[group].sleep) ? "{#ZE_MODE_SLEEP}" : "{#ZE_MODE_NORMAL}" , "warning")), ml("td", {style: "text-align: center;", }, String(obj.groups[group].id)), ml("td", {style: "text-align: left;", }, String(obj.groups[group].name)), // ml("td", {style: "text-align: right;", id: "groupPowerTotal"+group}, "n/a"), diff --git a/src/web/lang.json b/src/web/lang.json index 196538e8..aa759473 100644 --- a/src/web/lang.json +++ b/src/web/lang.json @@ -843,6 +843,21 @@ "en": "State:", "de": "Status:" }, + { + "token": "ZE_GROUP_MODE", + "en": "Mode:", + "de": "Modus:" + }, + { + "token": "ZE_MODE_SLEEP", + "en": "sleep", + "de": "Standby" + }, + { + "token": "ZE_MODE_NORMAL", + "en": "normal", + "de": "Normal" + }, { "token": "ZE_GROUP_ID", "en": "Group:",