Browse Source

mqtt redesign mit subscribe / unsubcribe der topic (Teamwork tictrick + Daniel92)

pull/1701/head
Patrick Amrhein 8 months ago
parent
commit
ec8590c30f
  1. 183
      src/app.cpp
  2. 5
      src/app.h
  3. 3
      src/appInterface.h
  4. 1
      src/config/settings.h
  5. 95
      src/plugins/zeroExport/powermeter.h
  6. 357
      src/plugins/zeroExport/zeroExport.h
  7. 98
      src/publisher/pubMqtt.h
  8. 56
      src/web/RestApi.h
  9. 4
      src/web/html/setup.html
  10. 15
      src/web/lang.json

183
src/app.cpp

@ -102,7 +102,7 @@ void app::setup() {
if (mMqttEnabled) {
mMqtt.setup(this, &mConfig->mqtt, mConfig->sys.deviceName, mVersion, &mSys, &mTimestamp, &mUptime);
mMqtt.setConnectionCb(std::bind(&app::mqttConnectCb, this));
mMqtt.setSubscriptionCb(std::bind(&app::mqttSubRxCb, this, std::placeholders::_1));
mMqtt.setSubscriptionCb(std::bind(&app::mqttSubRxCb, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
mCommunication.addAlarmListener([this](Inverter<> *iv) { mMqtt.alarmEvent(iv); });
}
#endif
@ -610,13 +610,177 @@ void app::mqttConnectCb(void) {
}
//-----------------------------------------------------------------------------
void app::mqttSubRxCb(JsonObject obj) {
mApi.ctrlRequest(obj);
void app::mqttSubRxCb(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
if (mConfig->serial.debug)
{
DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]);
DBGPRINTLN(String(topic));
}
#if defined(PLUGIN_ZEROEXPORT)
mZeroExport.onMqttMessage(obj);
// FremdTopic ist für ZeroExport->Powermeter
// AhoyTopic ist für ZeroExport
if (mZeroExport.onMqttMessage(topic, payload, len)) return;
#endif
// AhoyTopic ist für Ahoy
int baseTopicLen = strlen(mConfig->mqtt.topic) + strlen("/ctrl") + 1;
char baseTopic[baseTopicLen];
strcpy(baseTopic, mConfig->mqtt.topic); // copy mqtt.topic
strcat(baseTopic, "/ctrl"); // '/ctrl' concat
if (strncmp(topic, baseTopic, strlen(baseTopic)) == 0)
{
DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]);
DBGPRINT("alles super");
DBGPRINTLN(String(topic));
const char* p = topic + strlen(baseTopic);
// extract number from topic
int IvID = -1;
while (*p) {
if (isdigit(*p)) {
IvID = atoi(p);
break;
}
p++;
}
// reset to pointer with offset
p = topic + strlen(baseTopic);
Inverter<> *iv = mSys.getInverterByPos(IvID);
String sPayload = String((const char*)payload).substring(0, len);
// ???/ctrl/limit/+ 100 % oder 400 W
if (strncmp(p, "/limit", strlen("/limit")) == 0) {
// immer
DBGPRINT(String("limit "));
DBGPRINTLN(String(IvID));
iv->powerLimit[0] = static_cast<uint16_t>(sPayload.toInt() * 10.0);
if (sPayload.endsWith("W"))
iv->powerLimit[1] = AbsolutNonPersistent;
else if (sPayload.endsWith("%"))
iv->powerLimit[1] = RelativNonPersistent;
if (iv->setDevControlRequest(ActivePowerContr))
triggerTickSend(iv->id);
return;
}
// ???/ctrl/power/+ 0/1
if (strncmp(p, "/power", strlen("/power")) == 0) {
// immer
DBGPRINT(String("power "));
DBGPRINTLN(String(IvID));
if (sPayload.equals("1") || sPayload.equals("true"))
{
if (iv->setDevControlRequest(TurnOn))
triggerTickSend(iv->id);
}
else if (sPayload.equals("0") || sPayload.equals("false"))
{
if (iv->setDevControlRequest(TurnOff))
triggerTickSend(iv->id);
}
return;
}
// ???/ctrl/restart/+ 0/1
if (strncmp(p, "/restart", strlen("/restart")) == 0) {
// mit NR = WR
if (IvID != -1)
{
DBGPRINT(String("restart Iv "));
DBGPRINTLN(String(IvID));
if (sPayload.equals("1") || sPayload.equals("true"))
{
if (iv->setDevControlRequest(Restart)) {
triggerTickSend(iv->id);
}
mMqtt.publish(topic, "successful", false, QOS_2);
}
}
// ohne NR = Ahoy
else
{
//TODO: set mqtt-topic back to false (=>successful)? wait a moment
DBGPRINTLN(String("restart Ahoy"));
if (sPayload.equals("1") || sPayload.equals("true"))
{
mMqtt.publish(topic, "successful", false, QOS_2);
yield();
delay(1000);
yield();
ESP.restart();
}
}
return;
}
return;
}
/// TODO: discuss setup???
// ???/setup/set_time unix timestamp
/// TODO: Wunschdenken?
}
/*
bool limitAbs = false;
if(len > 0) {
char *pyld = new char[len + 1];
memcpy(pyld, payload, len);
pyld[len] = '\0';
if(NULL == strstr(topic, "limit"))
root[F("val")] = atoi(pyld);
else
root[F("val")] = atof(pyld);
if(pyld[len-1] == 'W')
limitAbs = true;
delete[] pyld;
}
const char *p = topic + strlen(mCfgMqtt->topic);
uint8_t pos = 0, elm = 0;
char tmp[30];
while(1) {
if(('/' == p[pos]) || ('\0' == p[pos])) {
memcpy(tmp, p, pos);
tmp[pos] = '\0';
switch(elm++) {
case 1: root[F("path")] = String(tmp); break;
case 2:
if(strncmp("limit", tmp, 5) == 0) {
if(limitAbs)
root[F("cmd")] = F("limit_nonpersistent_absolute");
else
root[F("cmd")] = F("limit_nonpersistent_relative");
} else
root[F("cmd")] = String(tmp);
break;
case 3: root[F("id")] = atoi(tmp); break;
default: break;
}
if('\0' == p[pos])
break;
p = p + pos + 1;
pos = 0;
}
pos++;
}
*/
//-----------------------------------------------------------------------------
void app::setupLed(void) {
uint8_t led_off = (mConfig->led.high_active) ? 0 : 255;
@ -663,3 +827,14 @@ void app::updateLed(void) {
analogWrite(mConfig->led.led[2], led_off);
}
}
void app::subscribe(const char *subTopic, uint8_t qos) {
mMqtt.subscribe(subTopic, qos);
}
void app::unsubscribe(const char *subTopic) {
mMqtt.unsubscribe(subTopic);
}

5
src/app.h

@ -360,6 +360,9 @@ class app : public IApp, public ah::Scheduler {
}
#endif
void subscribe(const char *subTopic, uint8_t qos = QOS_0);
void unsubscribe(const char *subTopic);
private:
#define CHECK_AVAIL true
#define SKIP_YIELD_DAY true
@ -381,7 +384,7 @@ class app : public IApp, public ah::Scheduler {
}
void mqttConnectCb(void);
void mqttSubRxCb(JsonObject obj);
void mqttSubRxCb(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
void setupLed();
void updateLed();

3
src/appInterface.h

@ -73,6 +73,9 @@ class IApp {
virtual void addValueToHistory(uint8_t historyType, uint8_t valueType, uint32_t value) = 0;
#endif
virtual void* getRadioObj(bool nrf) = 0;
virtual void subscribe(const char *subTopic, uint8_t qos) = 0;
virtual void unsubscribe(const char *subTopic) = 0;
};
#endif /*__IAPP_H__*/

1
src/config/settings.h

@ -221,6 +221,7 @@ typedef struct {
// Plugin ZeroExport
#if defined(PLUGIN_ZEROEXPORT)
//#define ZEROEXPORT_DEBUG
#define ZEROEXPORT_MAX_QUEUE_ENTRIES 64
#define ZEROEXPORT_MAX_GROUPS 8
#define ZEROEXPORT_GROUP_MAX_LEN_NAME 25

95
src/plugins/zeroExport/powermeter.h

@ -20,6 +20,7 @@
#include "plugins/zeroExport/lib/sml.h"
#include "utils/DynamicJsonHandler.h"
typedef struct {
const unsigned char OBIS[6];
void (*Fn)(double &);
@ -46,9 +47,10 @@ class powermeter {
* @param *log
* @returns void
*/
bool setup(IApp *app, zeroExport_t *cfg, PubMqttType *mqtt, DynamicJsonHandler *log) {
bool setup(IApp *app, zeroExport_t *cfg, settings_t *config, PubMqttType *mqtt, DynamicJsonHandler *log) {
mApp = app;
mCfg = cfg;
mConfig = config;
mMqtt = mqtt;
mLog = log;
@ -65,7 +67,9 @@ class powermeter {
if (millis() - mPreviousTsp <= 1000) return; // skip when it is to fast
mPreviousTsp = millis();
#ifdef ZEROEXPORT_DEBUG
if (mCfg->debug) DBGPRINTLN(F("pm Takt:"));
#endif /*ZEROEXPORT_DEBUG*/
bool result = false;
float power = 0.0;
@ -76,7 +80,9 @@ class powermeter {
if ((millis() - mCfg->groups[group].pm_peviousTsp) < ((uint16_t)mCfg->groups[group].pm_refresh * 1000)) continue;
mCfg->groups[group].pm_peviousTsp = millis();
#ifdef ZEROEXPORT_DEBUG
if (mCfg->debug) DBGPRINTLN(F("pm Do:"));
#endif /*ZEROEXPORT_DEBUG*/
result = false;
power = 0.0;
@ -120,11 +126,9 @@ class powermeter {
mCfg->groups[group].power = power;
// MQTT - Powermeter
/// BUG: 002 Anfang - Muss dieser Teil raus? Führt er zu abstürzen wie BUG 001?
if (mMqtt->isConnected()) {
mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false);
}
/// BUG: 002 Ende
}
}
}
@ -203,50 +207,63 @@ class powermeter {
/** onMqttMessage
* This function is needed for all mqtt connections between ahoy and other devices.
*/
void onMqttMessage(JsonObject obj) {
String topic = String(obj["topic"]);
bool onMqttMessage(const char* topic, const uint8_t* payload, size_t len)
{
bool result = false;
#if defined(ZEROEXPORT_POWERMETER_MQTT)
for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) {
for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++)
{
if (!mCfg->groups[group].enabled) continue;
if (!mCfg->groups[group].pm_type == zeroExportPowermeterType_t::Mqtt) continue;
if (!strcmp(mCfg->groups[group].pm_src, "")) continue;
if (strcmp(mCfg->groups[group].pm_src, String(topic).c_str())) continue;
if (strcmp(mCfg->groups[group].pm_src, topic) != 0) continue; // strcmp liefert 0 wenn gleich
float power = 0.0;
String sPayload = String((const char*)payload).substring(0, len);
DynamicJsonDocument datajson(512);
if (!deserializeJson(datajson, String(obj["val"])))
if (sPayload.startsWith("{") && sPayload.endsWith("}") || sPayload.startsWith("[") && sPayload.endsWith("]"))
{
switch (mCfg->groups[group].pm_target) {
case 0: power = (float)datajson["total_act_power"]; break;
case 1: power = (float)datajson["a_act_power"]; break;
case 2: power = (float)datajson["b_act_power"]; break;
case 3: power = (float)datajson["c_act_power"]; break;
#ifdef ZEROEXPORT_DEBUG
DPRINTLN(DBG_INFO, String("ze: mqtt powermeter val: ") + sPayload);
#endif /*ZEROEXPORT_DEBUG*/
DynamicJsonDocument datajson(2048); // TODO: JSON größe dynamisch machen?
if(!deserializeJson(datajson, sPayload.c_str()))
{
#ifdef ZEROEXPORT_DEBUG
DPRINTLN(DBG_INFO, String("ze: mqtt powermeter deserialize ok"));
DPRINTLN(DBG_INFO, String(datajson.as<String>()));
#endif /*ZEROEXPORT_DEBUG*/
power = extractJsonKey(datajson, mCfg->groups[group].pm_jsonPath);
}
} else {
power = (float)obj["val"];
}
else
{
#ifdef ZEROEXPORT_DEBUG
DPRINTLN(DBG_INFO, String("ze: mqtt powermeter kein json"));
#endif /*ZEROEXPORT_DEBUG*/
power = sPayload.toFloat();
}
bufferWrite(power, group);
mCfg->groups[group].power = power; // TODO: join two sites together (PM & MQTT)
mCfg->groups[group].power = power;
// MQTT - Powermeter
DPRINTLN(DBG_INFO, String("ze: mqtt powermeter ") + String(power));
/// BUG: 001 Anfang - Dieser Teil ist deaktiviert weil er zu abstürzen der DTU führt
// if (mCfg->debug) {
// if (mMqtt->isConnected()) {
// mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false);
// }
// }
/// BUG: 001 Ende
if (mCfg->debug) {
if (mMqtt->isConnected()) {
mMqtt->publish(String("zero/state/groups/" + String(group) + "/powermeter/P").c_str(), String(ah::round1(power)).c_str(), false);
}
}
result = true;
}
#endif /*defined(ZEROEXPORT_POWERMETER_MQTT)*/
return result;
}
private:
@ -261,6 +278,11 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power));
mMqtt->subscribe(gr.c_str(), QOS_2);
}
/*uint16_t mqttUnsubscribe(const char *subTopic,)
{ TODO: hier weiter?
return mMqtt->unsubscribe(topic); // add as many topics as you like
}*/
/** mqttPublish
* when a MQTT Msg is needed to Publish, but not to subscribe.
* @param gr
@ -275,6 +297,7 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power));
HTTPClient http;
zeroExport_t *mCfg;
settings_t *mConfig = nullptr;
PubMqttType *mMqtt = nullptr;
DynamicJsonHandler *mLog;
IApp *mApp = nullptr;
@ -322,7 +345,6 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power));
*/
/*if (auth != NULL && realm) http.addHeader("WWW-Authenticate", "Digest qop=\"auth\", realm=\"" + shellypro4pm-f008d1d8b8b8 + "\", nonce=\"60dc59c6\", algorithm=SHA-256");
else if (auth != NULL) http.addHeader("Authorization", "Basic " + auth);*/
/*
@ -338,6 +360,20 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power));
*/
}
/**
*
*
*/
float extractJsonKey(DynamicJsonDocument data, const char* key)
{
if (data.containsKey(key))
return (float)data[key];
else {
DPRINTLN(DBG_INFO, String("ze: mqtt powermeter deserialize no key ") + String(key));
return 0.0F;
}
}
#if defined(ZEROEXPORT_POWERMETER_SHELLY)
/** getPowermeterWattsShelly
* ...
@ -578,6 +614,7 @@ DPRINTLN(DBG_INFO, String("ze: mqtt powermeter") + String(power));
switch (smlCurrentState) {
case SML_FINAL:
*power = _powerMeterTotal;
// TODO: pm_taget auswerten und damit eine Regelung auf Sum, L1, L2, L3 ermöglichen (setup.html nicht vergessen)
result = true;
break;
case SML_LISTEND:

357
src/plugins/zeroExport/zeroExport.h

@ -15,6 +15,9 @@
#include "AsyncJson.h"
#include "powermeter.h"
#include "utils/DynamicJsonHandler.h"
#include "utils/mqttHelper.h"
using namespace mqttHelper;
template <class HMSYSTEM>
@ -48,15 +51,9 @@ class ZeroExport {
mApi = api;
mMqtt = mqtt;
mIsInitialized = mPowermeter.setup(mApp, mCfg, mqtt, &_log);
mIsInitialized = mPowermeter.setup(mApp, mCfg, mConfig, mMqtt, &_log);
}
/*void printJson() {
serializeJson(doc, Serial);
Serial.println();
serializeJsonPretty(doc, Serial);
}*/
/** loop
* Arbeitsschleife
* @param void
@ -75,13 +72,17 @@ class ZeroExport {
if (mLastRun > (Tsp - 1000)) return;
mLastRun = Tsp;
#ifdef ZEROEXPORT_DEBUG
if (mCfg->debug) DBGPRINTLN(F("Takt:"));
#endif /*ZEROEXPORT_DEBUG*/
// Exit if Queue is empty
zeroExportQueue_t Queue;
if (!getQueue(&Queue)) return;
#ifdef ZEROEXPORT_DEBUG
if (mCfg->debug) DBGPRINTLN(F("Queue:"));
#endif /*ZEROEXPORT_DEBUG*/
// Load Data from Queue
uint8_t group = Queue.group;
@ -641,6 +642,8 @@ class ZeroExport {
* @returns void
*/
void onMqttConnect(void) {
mMqtt->subscribe("zero/ctrl/#", QOS_2);
if (!mCfg->enabled) return;
mPowermeter.onMqttConnect();
@ -662,150 +665,31 @@ class ZeroExport {
* @param
* @returns void
*/
void onMqttMessage(JsonObject obj) {
if (!mIsInitialized) return;
mPowermeter.onMqttMessage(obj);
String topic = String(obj["topic"]);
// "topic":"userdefined battSoCTopic" oder "userdefinedUTopic"
for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) {
if (!mCfg->groups[group].enabled) continue;
if ((!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttU) && (!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttSoC)) continue;
if (!strcmp(mCfg->groups[group].battTopic, "")) continue;
if (strcmp(mCfg->groups[group].battTopic, String(topic).c_str())) {
mCfg->groups[group].battValue = (bool)obj["val"];
_log.addProperty("k", mCfg->groups[group].battTopic);
_log.addProperty("v", mCfg->groups[group].battValue);
}
}
// "topic":"ctrl/zero"
if (topic.indexOf("ctrl/zero") == -1) return;
_log.addProperty("d", obj);
if (obj["path"] == "ctrl" && obj["cmd"] == "zero") {
int8_t topicGroup = getGroupFromTopic(topic.c_str());
int8_t topicInverter = getInverterFromTopic(topic.c_str());
if (topicGroup != -1) {
_log.addProperty("g", topicGroup);
}
if (topicInverter == -1) {
_log.addProperty("i", topicInverter);
}
_log.addProperty("k", topic);
// "topic":"ctrl/zero/enabled"
if (topic.indexOf("ctrl/zero/enabled") != -1) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->enabled = (bool)obj["val"];
}
// "topic":"ctrl/zero/sleep"
else if (topic.indexOf("ctrl/zero/sleep") != -1) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->sleep = (bool)obj["val"];
}
else if ((topicGroup >= 0) && (topicGroup < ZEROEXPORT_MAX_GROUPS)) {
String stopicGroup = String(topicGroup);
// "topic":"ctrl/zero/groups/+/enabled"
if (topic.endsWith("/enabled")) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->groups[topicGroup].enabled = (bool)obj["val"];
}
// "topic":"ctrl/zero/groups/+/sleep"
else if (topic.endsWith("/sleep")) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->groups[topicGroup].sleep = (bool)obj["val"];
}
// Auf Eis gelegt, dafür 2 Gruppen mehr
// 0.8.103008.2
// // "topic":"ctrl/zero/groups/+/pm_ip"
// if (topic.indexOf("ctrl/zero/groups/" + String(topicGroup) + "/pm_ip") != -1) {
// snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", obj[F("val")].as<const char *>());
/// TODO:
// snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", obj[F("val")].as<const char *>());
// strncpy(mCfg->groups[topicGroup].pm_src, obj[F("val")], ZEROEXPORT_GROUP_MAX_LEN_PM_SRC);
// strncpy(mCfg->groups[topicGroup].pm_src, String(obj[F("val")]).c_str(), ZEROEXPORT_GROUP_MAX_LEN_PM_SRC);
// snprintf(mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", String(obj[F("val")]).c_str());
// mLog["k"] = "ctrl/zero/groups/" + String(topicGroup) + "/pm_ip";
// mLog["v"] = mCfg->groups[topicGroup].pm_src;
// }
//
// // "topic":"ctrl/zero/groups/+/pm_jsonPath"
// if (topic.indexOf("ctrl/zero/groups/" + String(topicGroup) + "/pm_jsonPath") != -1) {
/// TODO:
// snprintf(mCfg->groups[topicGroup].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, "%s", obj[F("val")].as<const char *>());
// mLog["k"] = "ctrl/zero/groups/" + String(topicGroup) + "/pm_jsonPath";
// mLog["v"] = mCfg->groups[topicGroup].pm_jsonPath;
// }
// "topic":"ctrl/zero/groups/+/battery/switch"
else if (topic.endsWith("/battery/switch")) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->groups[topicGroup].battSwitch = (bool)obj["val"];
}
bool onMqttMessage(const char* topic, const uint8_t* payload, size_t len)
{
// Check if ZE is init, when not, directly out of here!
if (!mIsInitialized) return false;
else if (topic.indexOf("/advanced/") != -1) {
// "topic":"ctrl/zero/groups/+/advanced/setPoint"
if (topic.endsWith("/setPoint")) {
_log.addProperty("v", (int16_t)obj["val"]);
mCfg->groups[topicGroup].setPoint = (int16_t)obj["val"];
}
bool result = true;
// "topic":"ctrl/zero/groups/+/advanced/powerTolerance"
else if (topic.endsWith("/powerTolerance")) {
_log.addProperty("v", (uint8_t)obj["val"]);
mCfg->groups[topicGroup].powerTolerance = (uint8_t)obj["val"];
}
// "topic":"ctrl/zero/groups/+/advanced/powerMax"
else if (topic.endsWith("/powerMax")) {
_log.addProperty("v", (uint16_t)obj["val"]);
mCfg->groups[topicGroup].powerMax = (uint16_t)obj["val"];
}
} else if (topic.indexOf("/inverter/") != -1) {
if ((topicInverter >= 0) && (topicInverter < ZEROEXPORT_GROUP_MAX_INVERTERS)) {
// "topic":"ctrl/zero/groups/+/inverter/+/enabled"
if (topic.endsWith("/enabled")) {
_log.addProperty("v", (bool)obj["val"]);
mCfg->groups[topicGroup].inverters[topicInverter].enabled = (bool)obj["val"];
}
// "topic":"ctrl/zero/groups/+/inverter/+/powerMin"
else if (topic.endsWith("/powerMin")) {
_log.addProperty("v", (uint16_t)obj["val"]);
mCfg->groups[topicGroup].inverters[topicInverter].powerMin = (uint16_t)obj["val"];
}
// "topic":"ctrl/zero/groups/+/inverter/+/powerMax"
else if (topic.endsWith("/powerMax")) {
_log.addProperty("v", (uint16_t)obj["val"]);
mCfg->groups[topicGroup].inverters[topicInverter].powerMax = (uint16_t)obj["val"];
} else {
_log.addProperty("k", "error");
}
}
} else {
_log.addProperty("k", "error");
// FremdTopic "topic":"userdefined power" ist für ZeroExport->Powermeter
if (!mPowermeter.onMqttMessage(topic, payload, len))
{
// FremdTopic "topic":"userdefined battSoCTopic" oder "userdefinedUTopic" ist für ZeroExport(Batterie)
if (!onMqttMessageBattery(topic, payload, len))
{
// LokalerTopic "topic": ???/zero ist für ZeroExport
if (!onMqttMessageZeroExport(topic, payload, len))
{
result = false;
}
}
}
sendLog();
clearLog();
return;
return result;
}
private:
@ -833,42 +717,6 @@ class ZeroExport {
return true;
}
/** getGroupFromTopic
* Extahiert die Gruppe aus dem mqttTopic.
* @param *topic
* @returns group
*/
int8_t getGroupFromTopic(const char *topic) {
const char *pGroupSection = strstr(topic, "groups/");
if (pGroupSection == NULL) return -1;
pGroupSection += 7;
char strGroup[3];
uint8_t digitsCopied = 0;
while (*pGroupSection != '/' && digitsCopied < 2) strGroup[digitsCopied++] = *pGroupSection++;
strGroup[digitsCopied] = '\0';
int8_t group = atoi(strGroup);
_log.addProperty("getGroupFromTopic", "group");
return group;
}
/** getInverterFromTopic
* Extrahiert dden Inverter aus dem mqttTopic
* @param *topic
* @returns inv
*/
int8_t getInverterFromTopic(const char *topic) {
const char *pInverterSection = strstr(topic, "inverters/");
if (pInverterSection == NULL) return -1;
pInverterSection += 10;
char strInverter[3];
uint8_t digitsCopied = 0;
while (*pInverterSection != '/' && digitsCopied < 2) strInverter[digitsCopied++] = *pInverterSection++;
strInverter[digitsCopied] = '\0';
int8_t inverter = atoi(strInverter);
return inverter;
}
/** mqttSubscribe
* when a MQTT Msg is needed to subscribe, then a publish is leading
* @param gr
@ -891,6 +739,154 @@ class ZeroExport {
mMqtt->publish(gr.c_str(), payload.c_str(), retain);
}
/** onMqttMessageBattery
* Subscribe section
* @param
* @returns void
*/
bool onMqttMessageBattery(const char* topic, const uint8_t* payload, size_t len) {
// check if topic is Fremdtopic
String baseTopic = String(mConfig->mqtt.topic);
if (strncmp(topic, baseTopic.c_str(), baseTopic.length()) != 0)
{
for (uint8_t group = 0; group < ZEROEXPORT_MAX_GROUPS; group++) {
if (!mCfg->groups[group].enabled) continue;
if ((!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttU) && (!mCfg->groups[group].battCfg == zeroExportBatteryCfg::mqttSoC)) continue;
if (!strcmp(mCfg->groups[group].battTopic, "")) continue;
if (checkIntegerProperty(topic, mCfg->groups[group].battTopic, payload, len, &mCfg->groups[group].battValue, &_log)) return true;
}
}
return false;
}
/** onMqttMessageZeroExport
* Subscribe section
* @param
* @returns true when topic is for this class specified or false when its not fit in here.
*/
bool onMqttMessageZeroExport(const char* topic, const uint8_t* payload, size_t len)
{
// check if topic is for zeroExport
String baseTopic = String(mConfig->mqtt.topic) + String("/zero/ctrl");
if (strncmp(topic, baseTopic.c_str(), baseTopic.length()) == 0)
{
_log.addProperty("k", topic);
const char* p = topic + strlen(baseTopic.c_str());
// "topic":"???/zero/ctrl/enabled"
if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->enabled, &_log)) return true;
// reconnect
// "topic":"???/zero/ctrl/sleep"
if (checkBoolProperty(p, "/sleep", payload, len, &mCfg->sleep, &_log)) return true;
// "topic":"???/zero/ctrl/groups"
if (strncmp(p, "/groups", strlen("/groups")) == 0) {
baseTopic += String("/groups"); // add '/groups'
p = topic + strlen(baseTopic.c_str());
// extract number from topic
int topicGroup = -1;
while (*p) {
if (isdigit(*p)) {
topicGroup = atoi(p);
break;
}
p++;
}
// reset to pointer with offset
p = topic + strlen(baseTopic.c_str());
#ifdef ZEROEXPORT_DEBUG
DBGPRINT(String("groups "));
DBGPRINTLN(String(topicGroup));
#endif /*ZEROEXPORT_DEBUG*/
baseTopic += String("/") + String(topicGroup); // add '/+'
p = topic + strlen(baseTopic.c_str());
// "topic":"???/zero/ctrl/groups/+/enabled"
if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->groups[topicGroup].enabled, &_log)) return true;
// Reconnect
// "topic":"???/zero/ctrl/groups/+/sleep"
if (checkBoolProperty(p, "/sleep", payload, len, &mCfg->groups[topicGroup].sleep, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/pm_ip"
if (checkCharProperty(p, "/pm_ip", payload, len, mCfg->groups[topicGroup].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, &_log)) return true;
// Reconnect
// "topic":"???/zero/ctrl/groups/+/pm_jsonPath"
if (checkCharProperty(p, "/pm_jsonPath", payload, len, mCfg->groups[topicGroup].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, &_log)) return true;
// Reconnect
// "topic":"???/zero/ctrl/groups/+/battery/switch"
if (checkBoolProperty(p, "/battery/switch", payload, len, &mCfg->groups[topicGroup].battSwitch, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/advanced/setPoint"
if (checkIntegerProperty(p, "/advanced/setPoint", payload, len, &mCfg->groups[topicGroup].setPoint, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/advanced/powerTolerance"
if (checkIntegerProperty(p, "/advanced/powerTolerance", payload, len, &mCfg->groups[topicGroup].powerTolerance, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/advanced/powerMax"
if (checkIntegerProperty(p, "/advanced/powerMax", payload, len, &mCfg->groups[topicGroup].powerMax, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/inverter"
if (strncmp(p, "/inverter", strlen("/inverter")) == 0) {
baseTopic += String("/inverter"); // add '/inverter'
p = topic + strlen(baseTopic.c_str());
// extract number from topic
int topicInverter = -1;
while (*p) {
if (isdigit(*p)) {
topicInverter = atoi(p);
break;
}
p++;
}
// reset to pointer with offset
p = topic + strlen(baseTopic.c_str());
#ifdef ZEROEXPORT_DEBUG
DBGPRINT(String("inverter "));
DBGPRINTLN(String(topicInverter));
#endif /*ZEROEXPORT_DEBUG*/
baseTopic += String("/") + String(topicInverter); // add '/+'
p = topic + strlen(baseTopic.c_str());
// "topic":"???/zero/ctrl/groups/+/inverter/+/enabled"
if (checkBoolProperty(p, "/enabled", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].enabled, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/inverter/+/powerMin"
if (checkIntegerProperty(p, "/powerMin", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].powerMin, &_log)) return true;
// "topic":"???/zero/ctrl/groups/+/inverter/+/powerMax"
if (checkIntegerProperty(p, "/powerMax", payload, len, &mCfg->groups[topicGroup].inverters[topicInverter].powerMax, &_log)) return true;
}
return true;
}
}
return false;
}
/** sendLog
* Sendet den LogSpeicher über Webserial und/oder MQTT
*/
@ -915,6 +911,9 @@ class ZeroExport {
_log.clear();
}
// private member variables
bool mIsInitialized = false;

98
src/publisher/pubMqtt.h

@ -28,7 +28,7 @@
#include "pubMqttDefs.h"
#include "pubMqttIvData.h"
typedef std::function<void(JsonObject)> subscriptionCb;
typedef std::function<void(const char*, const uint8_t*, size_t, size_t, size_t)> subscriptionCb;
typedef std::function<void(void)> connectionCb;
typedef struct {
@ -115,9 +115,16 @@ class PubMqtt {
queue.swap(mReceiveQueue);
xSemaphoreGive(mutex);
while (!queue.empty()) {
while (!queue.empty())
{
#warning "TODO: sTopic und sPayload hier fällen und dann übergeben?";
#warning "Ist es wirklich so, dass die Payload eines Topics aus mehreren Nachrichten bestehen kann und erst zusammengesetzt werden muss? Was ist dann mit der Reihenfolge?";
message_s *entry = &queue.front();
handleMessage(entry->topic, entry->payload, entry->len, entry->index, entry->total);
if(NULL != mSubscriptionCb)
{
(mSubscriptionCb)(entry->topic, entry->payload, entry->len, entry->index, entry->total);
mRxCnt++;
}
queue.pop();
}
@ -244,6 +251,12 @@ class PubMqtt {
mClient.subscribe(topic, qos);
}
// new - need to unsubscribe the topics.
void unsubscribe(const char *subTopic)
{
mClient.unsubscribe(subTopic); // add as many topics as you like
}
void subscribeExtern(const char *subTopic, uint8_t qos = QOS_0) {
char topic[MQTT_TOPIC_LEN + 20];
snprintf(topic, (MQTT_TOPIC_LEN + 20), "%s", subTopic);
@ -339,77 +352,24 @@ class PubMqtt {
}
}
void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
if(len == 0)
void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
#warning "TODO: if aktivieren nach Logprüfung. Was bedeutet index und total?";
// if (total != 1) {
// DPRINTLN(DBG_ERROR, String("pubMqtt.h: onMessage ERROR: index=") + String(index) + String(" total=") + String(total));
// return;
// }
if (len == 0) {
DPRINT(DBG_INFO, String("MQTT-topic: "));
DPRINT(DBG_INFO, String(topic));
DPRINTLN(DBG_INFO, String(" is empty."));
return;
}
xSemaphoreTake(mutex, portMAX_DELAY);
mReceiveQueue.push(message_s(topic, payload, len, index, total));
xSemaphoreGive(mutex);
}
inline void handleMessage(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) {
DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]);
DBGPRINTLN(String(topic));
if(NULL == mSubscriptionCb)
return;
DynamicJsonDocument json(128);
JsonObject root = json.to<JsonObject>();
root["topic"] = String(topic);
bool limitAbs = false;
if(len > 0) {
char *pyld = new char[len + 1];
memcpy(pyld, payload, len);
pyld[len] = '\0';
if(NULL == strstr(topic, "limit"))
root[F("val")] = atoi(pyld);
else
root[F("val")] = atof(pyld);
if(pyld[len-1] == 'W')
limitAbs = true;
delete[] pyld;
}
const char *p = topic + strlen(mCfgMqtt->topic);
uint8_t pos = 0, elm = 0;
char tmp[30];
while(1) {
if(('/' == p[pos]) || ('\0' == p[pos])) {
memcpy(tmp, p, pos);
tmp[pos] = '\0';
switch(elm++) {
case 1: root[F("path")] = String(tmp); break;
case 2:
if(strncmp("limit", tmp, 5) == 0) {
if(limitAbs)
root[F("cmd")] = F("limit_nonpersistent_absolute");
else
root[F("cmd")] = F("limit_nonpersistent_relative");
} else
root[F("cmd")] = String(tmp);
break;
case 3: root[F("id")] = atoi(tmp); break;
default: break;
}
if('\0' == p[pos])
break;
p = p + pos + 1;
pos = 0;
}
pos++;
}
/*char out[128];
serializeJson(root, out, 128);
DPRINTLN(DBG_INFO, "json: " + String(out));*/
(mSubscriptionCb)(root);
mRxCnt++;
}
void discoveryConfigLoop(void) {

56
src/web/RestApi.h

@ -46,6 +46,7 @@ class RestApi {
mRadioCmt = (CmtRadio<>*)mApp->getRadioObj(false);
#endif
mConfig = config;
#if defined(ENABLE_HISTORY_LOAD_DATA)
mSrv->on("/api/addYDHist",
HTTP_POST, std::bind(&RestApi::onApiPost, this, std::placeholders::_1),
@ -65,14 +66,14 @@ class RestApi {
return mTimezoneOffset;
}
void ctrlRequest(JsonObject obj) {
DynamicJsonDocument json(128);
JsonObject dummy = json.as<JsonObject>();
if(obj[F("path")] == "ctrl")
setCtrl(obj, dummy, "*");
else if(obj[F("path")] == "setup")
setSetup(obj, dummy, "*");
}
// void ctrlRequest(const char* topic, const uint8_t* payload, size_t len) {
// DynamicJsonDocument json(128);
// JsonObject dummy = json.as<JsonObject>();
// if(obj[F("path")] == "ctrl")
// setCtrl(obj, dummy, "*");
// else if(obj[F("path")] == "setup")
// setSetup(obj, dummy, "*");
// }
private:
void onApi(AsyncWebServerRequest *request) {
@ -884,6 +885,7 @@ class RestApi {
// General
objGroup[F("id")] = (uint8_t)group;
objGroup[F("enabled")] = (bool)mConfig->plugin.zeroExport.groups[group].enabled;
objGroup[F("sleep")] = (bool)mConfig->plugin.zeroExport.groups[group].sleep;
objGroup[F("name")] = String(mConfig->plugin.zeroExport.groups[group].name);
// Powermeter
objGroup[F("pm_refresh")] = (uint8_t)mConfig->plugin.zeroExport.groups[group].pm_refresh;
@ -1202,7 +1204,25 @@ class RestApi {
// Powermeter
mConfig->plugin.zeroExport.groups[group].pm_refresh = jsonIn[F("pm_refresh")];
mConfig->plugin.zeroExport.groups[group].pm_type = jsonIn[F("pm_type")];
// pm_src
const char *neu = jsonIn[F("pm_src")].as<const char*>();
if (strncmp(mConfig->plugin.zeroExport.groups[group].pm_src, neu, strlen(neu)) != 0) {
// unsubscribe
if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportPowermeterType_t::Mqtt)
{
mApp->unsubscribe(mConfig->plugin.zeroExport.groups[group].pm_src);
}
// save
snprintf(mConfig->plugin.zeroExport.groups[group].pm_src, ZEROEXPORT_GROUP_MAX_LEN_PM_SRC, "%s", jsonIn[F("pm_src")].as<const char*>());
// subsrcribe
if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportPowermeterType_t::Mqtt)
{
mApp->subscribe(mConfig->plugin.zeroExport.groups[group].pm_src, QOS_2);
}
}
snprintf(mConfig->plugin.zeroExport.groups[group].pm_jsonPath, ZEROEXPORT_GROUP_MAX_LEN_PM_JSONPATH, "%s", jsonIn[F("pm_jsonPath")].as<const char*>());
@ -1224,7 +1244,27 @@ class RestApi {
}
// Battery
mConfig->plugin.zeroExport.groups[group].battCfg = jsonIn[F("battCfg")];
// battTopic
const char *battneu = jsonIn[F("battTopic")].as<const char*>();
if (strncmp(mConfig->plugin.zeroExport.groups[group].battTopic, battneu, strlen(battneu)) != 0) {
// unsubscribe
if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttSoC ||
mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttU )
{
mApp->unsubscribe(mConfig->plugin.zeroExport.groups[group].battTopic);
}
// save
snprintf(mConfig->plugin.zeroExport.groups[group].battTopic, ZEROEXPORT_GROUP_MAX_LEN_BATT_TOPIC, "%s", jsonIn[F("battTopic")].as<const char*>());
// subsrcribe
if(mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttSoC ||
mConfig->plugin.zeroExport.groups[group].pm_type == zeroExportBatteryCfg::mqttU)
{
mApp->subscribe(mConfig->plugin.zeroExport.groups[group].battTopic, QOS_2);
}
}
mConfig->plugin.zeroExport.groups[group].battLimitOn = jsonIn[F("battLimitOn")];
mConfig->plugin.zeroExport.groups[group].battLimitOff = jsonIn[F("battLimitOff")];
// Advanced

4
src/web/html/setup.html

@ -1671,7 +1671,7 @@
}
else if(value == "Mqtt") {
divsToHide.childNodes[1].style.display = 'none';
divsToHide.childNodes[4].style.display = 'none';
divsToHide.childNodes[2].style.display = 'none';
divsToHide.childNodes[5].style.display = 'none';
divsToHide.childNodes[6].style.display = 'none';
}
@ -1835,6 +1835,7 @@
lines.push(ml("tr", {}, [
ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_ENABLED}"),
ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_MODE}"),
ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_ID}"),
ml("th", {style: "text-align: center;"}, "{#ZE_GROUP_NAME}"),
ml("th", {style: "width: 10%; text-align: center;"}, "{#ZE_GROUP_POWERTOTAL}"),
@ -1845,6 +1846,7 @@
for(let group = 0; group < obj.groups.length; group++) {
lines.push(ml("tr", {}, [
ml("td", {style: "text-align: left;", }, badge(obj.groups[group].enabled, (obj.groups[group].enabled) ? "{#ENABLED}" : "{#DISABLED}")),
ml("td", {style: "text-align: left;", }, badge(!obj.groups[group].sleep, (obj.groups[group].sleep) ? "{#ZE_MODE_SLEEP}" : "{#ZE_MODE_NORMAL}" , "warning")),
ml("td", {style: "text-align: center;", }, String(obj.groups[group].id)),
ml("td", {style: "text-align: left;", }, String(obj.groups[group].name)),
// ml("td", {style: "text-align: right;", id: "groupPowerTotal"+group}, "n/a"),

15
src/web/lang.json

@ -843,6 +843,21 @@
"en": "State:",
"de": "Status:"
},
{
"token": "ZE_GROUP_MODE",
"en": "Mode:",
"de": "Modus:"
},
{
"token": "ZE_MODE_SLEEP",
"en": "sleep",
"de": "Standby"
},
{
"token": "ZE_MODE_NORMAL",
"en": "normal",
"de": "Normal"
},
{
"token": "ZE_GROUP_ID",
"en": "Group:",

Loading…
Cancel
Save