From 5d00cba981a9ad15333574c6c95150593fbb8c4d Mon Sep 17 00:00:00 2001 From: HorstG-57 Date: Wed, 3 Aug 2022 21:40:44 +0200 Subject: [PATCH] MQTT Reconnect Fehler behoben, MQTT Anmeldung mit Device Name, MQTT Aussendung der Daten nach Verarbeitung der WR Daten --- tools/esp8266/app.cpp | 57 +++++++++++++++------ tools/esp8266/mqtt.h | 112 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 140 insertions(+), 29 deletions(-) diff --git a/tools/esp8266/app.cpp b/tools/esp8266/app.cpp index c1c3d78c..0a333990 100644 --- a/tools/esp8266/app.cpp +++ b/tools/esp8266/app.cpp @@ -129,12 +129,15 @@ void app::setup(uint32_t timeout) { char mqttUser[MQTT_USER_LEN]; char mqttPwd[MQTT_PWD_LEN]; char mqttTopic[MQTT_TOPIC_LEN]; + char mDeviceName[DEVNAME_LEN]; + mEep->read(ADDR_MQTT_ADDR, mqttAddr, MQTT_ADDR_LEN); mEep->read(ADDR_MQTT_USER, mqttUser, MQTT_USER_LEN); mEep->read(ADDR_MQTT_PWD, mqttPwd, MQTT_PWD_LEN); mEep->read(ADDR_MQTT_TOPIC, mqttTopic, MQTT_TOPIC_LEN); //mEep->read(ADDR_MQTT_INTERVAL, &mMqttInterval); mEep->read(ADDR_MQTT_PORT, &mqttPort); + mEep->read(ADDR_DEVNAME, mDeviceName, DEVNAME_LEN); if(mqttAddr[0] > 0) { mMqttActive = true; @@ -147,13 +150,14 @@ void app::setup(uint32_t timeout) { if(0 == mqttPort) mqttPort = 1883; - mMqtt.setup(mqttAddr, mqttTopic, mqttUser, mqttPwd, mqttPort); + mMqtt.setup(mqttAddr, mqttTopic, mqttUser, mqttPwd, mqttPort, mDeviceName); mMqttTicker = 0; - + mSerialTicker = 0; if(mqttAddr[0] > 0) { char topic[30]; + mMqtt.sendMsg("Device", mDeviceName); mMqtt.sendMsg("version", mVersion); for(uint8_t i = 0; i < MAX_NUM_INVERTERS; i ++) { iv = mSys->getInverterByPos(i); @@ -167,6 +171,8 @@ void app::setup(uint32_t timeout) { } } } + // erste MQTT Übertragung aus der Loop in 2 Sekunden + mMqttTicker = mMqttInterval - 2; } } else { @@ -231,20 +237,16 @@ void app::loop(void) { Inverter<> *iv = mSys->findInverter(&p->packet[1]); if(NULL != iv) { uint8_t *pid = &p->packet[9]; - if (*pid == 0x00) { - DPRINT(DBG_DEBUG, "fragment number zero received and ignored"); - } else { - if((*pid & 0x7F) < 5) { - memcpy(mPayload[iv->id].data[(*pid & 0x7F) - 1], &p->packet[10], len-11); - mPayload[iv->id].len[(*pid & 0x7F) - 1] = len-11; - } + if((*pid & 0x7F) < 5) { + memcpy(mPayload[iv->id].data[(*pid & 0x7F) - 1], &p->packet[10], len-11); + mPayload[iv->id].len[(*pid & 0x7F) - 1] = len-11; + } - if((*pid & 0x80) == 0x80) { - if((*pid & 0x7f) > mPayload[iv->id].maxPackId) { - mPayload[iv->id].maxPackId = (*pid & 0x7f); - if(*pid > 0x81) - mLastPacketId = *pid; - } + if((*pid & 0x80) == 0x80) { + if((*pid & 0x7f) > mPayload[iv->id].maxPackId) { + mPayload[iv->id].maxPackId = (*pid & 0x7f); + if(*pid > 0x81) + mLastPacketId = *pid; } } } @@ -265,6 +267,10 @@ void app::loop(void) { mMqtt.loop(); if(checkTicker(&mTicker, 1000)) { + // Zählt den MQTT Disconnect Counter runter, Damit wird ein sofortiger reconnect im Fall von Loss_Connect + // unterdrückt. + mMqtt.decReconnect(); + if((++mMqttTicker >= mMqttInterval) && (mMqttInterval != 0xffff)) { mMqttTicker = 0; mMqtt.isConnected(true); @@ -284,7 +290,12 @@ void app::loop(void) { } snprintf(val, 10, "%ld", millis()/1000); sendMqttDiscoveryConfig(); + mMqtt.sendMsg("uptime", val); + + // für einfacheren Test mit MQTT, den MQTT abschnitt in 10 Sekunden wieder ausführen + // mMqttTicker = mMqttInterval -10; + } if(mSerialValues) { @@ -295,6 +306,7 @@ void app::loop(void) { Inverter<> *iv = mSys->getInverterByPos(id); if(NULL != iv) { if(iv->isAvailable(mTimestamp)) { + DPRINTLN(DBG_INFO, " Inverter : " + String(id)); for(uint8_t i = 0; i < iv->listLen; i++) { if(0.0f != iv->getValue(i)) { snprintf(topic, 30, "%s/ch%d/%s", iv->name, iv->assign[i].ch, iv->getFieldName(i)); @@ -303,6 +315,7 @@ void app::loop(void) { } yield(); } + DPRINTLN(DBG_INFO, " "); } } } @@ -400,6 +413,8 @@ bool app::buildPayload(uint8_t id) { //----------------------------------------------------------------------------- void app::processPayload(bool retransmit) { DPRINTLN(DBG_VERBOSE, F("app::processPayload")); + boolean doMQTT = false; + for(uint8_t id = 0; id < mSys->getNumInverters(); id++) { Inverter<> *iv = mSys->getInverterByPos(id); if(NULL != iv) { @@ -455,11 +470,23 @@ void app::processPayload(bool retransmit) { yield(); } iv->doCalculations(); + + doMQTT = true; } } yield(); } } + +// ist MQTT aktiviert und es wurden Daten vom einem oder mehreren WR aufbereitet ( doMQTT = true) +// dann die den mMqttTicker auf mMqttIntervall -2 setzen, also +// MQTT aussenden in 2 sek aktivieren +// dies sollte noch über einen Schalter im Setup aktivier / deaktivierbar gemacht werden + if( (mMqttInterval != 0xffff) && doMQTT ) { + ++mMqttTicker = mMqttInterval -2; + DPRINT(DBG_DEBUG, F("MQTTticker auf Intervall -2 sec ")) ; + } + } diff --git a/tools/esp8266/mqtt.h b/tools/esp8266/mqtt.h index 89570344..c92398a8 100644 --- a/tools/esp8266/mqtt.h +++ b/tools/esp8266/mqtt.h @@ -15,24 +15,34 @@ class mqtt { mqtt() { mClient = new PubSubClient(mEspClient); mAddressSet = false; + mInsend = false; memset(mUser, 0, MQTT_USER_LEN); memset(mPwd, 0, MQTT_PWD_LEN); memset(mTopic, 0, MQTT_TOPIC_LEN); + memset(mDeviceName, 0, DEVNAME_LEN); + memset(mBroker, 0, MQTT_ADDR_LEN); } ~mqtt() { } - void setup(const char *broker, const char *topic, const char *user, const char *pwd, uint16_t port) { + void setup(const char *broker, const char *topic, const char *user, const char *pwd, uint16_t port, const char *devname ) { DPRINTLN(DBG_VERBOSE, F("mqtt.h:setup")); mAddressSet = true; - mClient->setServer(broker, port); - mClient->setBufferSize(MQTT_MAX_PACKET_SIZE); + mInsend = false; + + mReconnect = 1; mPort = port; snprintf(mUser, MQTT_USER_LEN, "%s", user); snprintf(mPwd, MQTT_PWD_LEN, "%s", pwd); snprintf(mTopic, MQTT_TOPIC_LEN, "%s", topic); + snprintf(mDeviceName, DEVNAME_LEN, "%s", devname); + snprintf(mBroker, MQTT_ADDR_LEN, "%s", broker); + + mClient->setServer(mBroker, mPort); + mClient->setBufferSize(MQTT_MAX_PACKET_SIZE); + } void sendMsg(const char *topic, const char *msg) { @@ -44,16 +54,25 @@ class mqtt { void sendMsg2(const char *topic, const char *msg, boolean retained) { if(mAddressSet) { - if(!mClient->connected()) - reconnect(); - if(mClient->connected()) - mClient->publish(topic, msg, retained); + if (!mInsend) { + mInsend = true; + if(!mClient->connected()) + reconnect(); + if(mClient->connected()) { + mClient->publish(topic, msg, retained); + } + mInsend = false; + } + else + DPRINTLN(DBG_INFO, F(" SendMsg2 inSend is set ") + String(mClient->state()) + " "); + } } bool isConnected(bool doRecon = false) { - //DPRINTLN(DBG_VERBOSE, F("mqtt.h:isConnected")); + DPRINTLN(DBG_DEBUG, F("mqtt.h:isConnected")); if(doRecon) + DPRINTLN(DBG_DEBUG, F(" doRecon ") + String(doRecon) + " "); reconnect(); return mClient->connected(); } @@ -77,6 +96,14 @@ class mqtt { return mPort; } + void decReconnect(void) { + if ( mReconnect > 1) { + mReconnect--; + DPRINTLN(DBG_INFO, F(" mqtt.h:decReconnect mReconnect = ") + String(mReconnect) + " "); + } + // return mReconnect; + } + void loop() { //DPRINT(F("m")); //if(!mClient->connected()) @@ -86,12 +113,65 @@ class mqtt { private: void reconnect(void) { - //DPRINTLN(DBG_VERBOSE, F("mqtt.h:reconnect")); - if(!mClient->connected()) { - if((strlen(mUser) > 0) && (strlen(mPwd) > 0)) - mClient->connect(DEF_DEVICE_NAME, mUser, mPwd); - else - mClient->connect(DEF_DEVICE_NAME); + boolean rc; + if (mReconnect < 2 ) { + DPRINTLN(DBG_DEBUG, F("----------------")); + DPRINTLN(DBG_DEBUG, F("mqtt.h:reconnect")); + + rc = mClient->connected(); + DPRINTLN(DBG_DEBUG, F(" Connected 1 = ") + String(rc) + " "); + DPRINTLN(DBG_DEBUG, F(" MQTT Client->_state ") + String(mClient->state()) + " "); + DPRINTLN(DBG_DEBUG, F(" WIFI Client.status ") + String(mEspClient.status()) + " "); + } + else + rc = false; + + + if(!rc) { + // HorstG-57; Test wegen MQTT Reconnect problem + + if ( mReconnect == 0 ) { + // es hat schon mal einen Connect gegeben + DPRINTLN(DBG_INFO, F("mqtt.h:Disconnect")); + DPRINTLN(DBG_INFO, F(" 1. MQTT Client->_state ") + String(mClient->state()) + " "); + DPRINTLN(DBG_INFO, F(" WIFI Client.status ") + String(mEspClient.status()) + " "); + + // der Server und der Port müssen neu gesetzt werden, + // da ein Loss_Connect die Werte zerstört hat. + mClient->setServer(mBroker, mPort); + mClient->setBufferSize(MQTT_MAX_PACKET_SIZE); + + // Verzögerung des MQTT reconnects um 5 Sekunden + mReconnect = 5; + + } + + + if ( mReconnect < 2 ) { + // ein MQTT Connect findet statt wenn mreconnect auf 1 bzw 0 steht. + // 1 = es hat noch nie ein MQTT Connect stattgefunden + // 0 = ein MQTT Connect war schon mal erfogreich + + if((strlen(mUser) > 0) && (strlen(mPwd) > 0)) + rc = mClient->connect(mDeviceName, mUser, mPwd); + else + rc = mClient->connect(mDeviceName); + + DPRINTLN(DBG_DEBUG, F(" Connect = ") + String(rc) + " "); + DPRINTLN(DBG_DEBUG, F(" 2. MQTT Client->_state ") + String(mClient->state()) + " "); + DPRINTLN(DBG_DEBUG, F(" WIFI Client.status ") + String(mEspClient.status()) + " "); + + if ( rc ) + mReconnect = 0; + else + mReconnect = 5; + + // rc = mClient->connected(); + // DPRINTLN(DBG_DEBUG, F(" Connected = ") + String(rc) + " "); + + } + + } } @@ -99,10 +179,14 @@ class mqtt { PubSubClient *mClient; bool mAddressSet; + bool mInsend; uint16_t mPort; + uint8_t mReconnect; char mUser[MQTT_USER_LEN]; char mPwd[MQTT_PWD_LEN]; char mTopic[MQTT_TOPIC_LEN]; + char mDeviceName[DEVNAME_LEN]; + char mBroker[MQTT_ADDR_LEN]; }; #endif /*__MQTT_H_*/