Browse Source

MQTT Reconnect Fehler behoben, MQTT Anmeldung mit Device Name, MQTT Aussendung der Daten nach Verarbeitung der WR Daten

pull/120/head
HorstG-57 3 years ago
parent
commit
5d00cba981
  1. 37
      tools/esp8266/app.cpp
  2. 102
      tools/esp8266/mqtt.h

37
tools/esp8266/app.cpp

@ -129,12 +129,15 @@ void app::setup(uint32_t timeout) {
char mqttUser[MQTT_USER_LEN];
char mqttPwd[MQTT_PWD_LEN];
char mqttTopic[MQTT_TOPIC_LEN];
char mDeviceName[DEVNAME_LEN];
mEep->read(ADDR_MQTT_ADDR, mqttAddr, MQTT_ADDR_LEN);
mEep->read(ADDR_MQTT_USER, mqttUser, MQTT_USER_LEN);
mEep->read(ADDR_MQTT_PWD, mqttPwd, MQTT_PWD_LEN);
mEep->read(ADDR_MQTT_TOPIC, mqttTopic, MQTT_TOPIC_LEN);
//mEep->read(ADDR_MQTT_INTERVAL, &mMqttInterval);
mEep->read(ADDR_MQTT_PORT, &mqttPort);
mEep->read(ADDR_DEVNAME, mDeviceName, DEVNAME_LEN);
if(mqttAddr[0] > 0) {
mMqttActive = true;
@ -147,13 +150,14 @@ void app::setup(uint32_t timeout) {
if(0 == mqttPort)
mqttPort = 1883;
mMqtt.setup(mqttAddr, mqttTopic, mqttUser, mqttPwd, mqttPort);
mMqtt.setup(mqttAddr, mqttTopic, mqttUser, mqttPwd, mqttPort, mDeviceName);
mMqttTicker = 0;
mSerialTicker = 0;
if(mqttAddr[0] > 0) {
char topic[30];
mMqtt.sendMsg("Device", mDeviceName);
mMqtt.sendMsg("version", mVersion);
for(uint8_t i = 0; i < MAX_NUM_INVERTERS; i ++) {
iv = mSys->getInverterByPos(i);
@ -167,6 +171,8 @@ void app::setup(uint32_t timeout) {
}
}
}
// erste MQTT Übertragung aus der Loop in 2 Sekunden
mMqttTicker = mMqttInterval - 2;
}
}
else {
@ -231,9 +237,6 @@ void app::loop(void) {
Inverter<> *iv = mSys->findInverter(&p->packet[1]);
if(NULL != iv) {
uint8_t *pid = &p->packet[9];
if (*pid == 0x00) {
DPRINT(DBG_DEBUG, "fragment number zero received and ignored");
} else {
if((*pid & 0x7F) < 5) {
memcpy(mPayload[iv->id].data[(*pid & 0x7F) - 1], &p->packet[10], len-11);
mPayload[iv->id].len[(*pid & 0x7F) - 1] = len-11;
@ -249,7 +252,6 @@ void app::loop(void) {
}
}
}
}
mSys->BufCtrl.popBack();
}
@ -265,6 +267,10 @@ void app::loop(void) {
mMqtt.loop();
if(checkTicker(&mTicker, 1000)) {
// Zählt den MQTT Disconnect Counter runter, Damit wird ein sofortiger reconnect im Fall von Loss_Connect
// unterdrückt.
mMqtt.decReconnect();
if((++mMqttTicker >= mMqttInterval) && (mMqttInterval != 0xffff)) {
mMqttTicker = 0;
mMqtt.isConnected(true);
@ -284,7 +290,12 @@ void app::loop(void) {
}
snprintf(val, 10, "%ld", millis()/1000);
sendMqttDiscoveryConfig();
mMqtt.sendMsg("uptime", val);
// für einfacheren Test mit MQTT, den MQTT abschnitt in 10 Sekunden wieder ausführen
// mMqttTicker = mMqttInterval -10;
}
if(mSerialValues) {
@ -295,6 +306,7 @@ void app::loop(void) {
Inverter<> *iv = mSys->getInverterByPos(id);
if(NULL != iv) {
if(iv->isAvailable(mTimestamp)) {
DPRINTLN(DBG_INFO, " Inverter : " + String(id));
for(uint8_t i = 0; i < iv->listLen; i++) {
if(0.0f != iv->getValue(i)) {
snprintf(topic, 30, "%s/ch%d/%s", iv->name, iv->assign[i].ch, iv->getFieldName(i));
@ -303,6 +315,7 @@ void app::loop(void) {
}
yield();
}
DPRINTLN(DBG_INFO, " ");
}
}
}
@ -400,6 +413,8 @@ bool app::buildPayload(uint8_t id) {
//-----------------------------------------------------------------------------
void app::processPayload(bool retransmit) {
DPRINTLN(DBG_VERBOSE, F("app::processPayload"));
boolean doMQTT = false;
for(uint8_t id = 0; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id);
if(NULL != iv) {
@ -455,11 +470,23 @@ void app::processPayload(bool retransmit) {
yield();
}
iv->doCalculations();
doMQTT = true;
}
}
yield();
}
}
// ist MQTT aktiviert und es wurden Daten vom einem oder mehreren WR aufbereitet ( doMQTT = true)
// dann die den mMqttTicker auf mMqttIntervall -2 setzen, also
// MQTT aussenden in 2 sek aktivieren
// dies sollte noch über einen Schalter im Setup aktivier / deaktivierbar gemacht werden
if( (mMqttInterval != 0xffff) && doMQTT ) {
++mMqttTicker = mMqttInterval -2;
DPRINT(DBG_DEBUG, F("MQTTticker auf Intervall -2 sec ")) ;
}
}

102
tools/esp8266/mqtt.h

@ -15,24 +15,34 @@ class mqtt {
mqtt() {
mClient = new PubSubClient(mEspClient);
mAddressSet = false;
mInsend = false;
memset(mUser, 0, MQTT_USER_LEN);
memset(mPwd, 0, MQTT_PWD_LEN);
memset(mTopic, 0, MQTT_TOPIC_LEN);
memset(mDeviceName, 0, DEVNAME_LEN);
memset(mBroker, 0, MQTT_ADDR_LEN);
}
~mqtt() { }
void setup(const char *broker, const char *topic, const char *user, const char *pwd, uint16_t port) {
void setup(const char *broker, const char *topic, const char *user, const char *pwd, uint16_t port, const char *devname ) {
DPRINTLN(DBG_VERBOSE, F("mqtt.h:setup"));
mAddressSet = true;
mClient->setServer(broker, port);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
mInsend = false;
mReconnect = 1;
mPort = port;
snprintf(mUser, MQTT_USER_LEN, "%s", user);
snprintf(mPwd, MQTT_PWD_LEN, "%s", pwd);
snprintf(mTopic, MQTT_TOPIC_LEN, "%s", topic);
snprintf(mDeviceName, DEVNAME_LEN, "%s", devname);
snprintf(mBroker, MQTT_ADDR_LEN, "%s", broker);
mClient->setServer(mBroker, mPort);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
}
void sendMsg(const char *topic, const char *msg) {
@ -44,16 +54,25 @@ class mqtt {
void sendMsg2(const char *topic, const char *msg, boolean retained) {
if(mAddressSet) {
if (!mInsend) {
mInsend = true;
if(!mClient->connected())
reconnect();
if(mClient->connected())
if(mClient->connected()) {
mClient->publish(topic, msg, retained);
}
mInsend = false;
}
else
DPRINTLN(DBG_INFO, F(" SendMsg2 inSend is set ") + String(mClient->state()) + " ");
}
}
bool isConnected(bool doRecon = false) {
//DPRINTLN(DBG_VERBOSE, F("mqtt.h:isConnected"));
DPRINTLN(DBG_DEBUG, F("mqtt.h:isConnected"));
if(doRecon)
DPRINTLN(DBG_DEBUG, F(" doRecon ") + String(doRecon) + " ");
reconnect();
return mClient->connected();
}
@ -77,6 +96,14 @@ class mqtt {
return mPort;
}
void decReconnect(void) {
if ( mReconnect > 1) {
mReconnect--;
DPRINTLN(DBG_INFO, F(" mqtt.h:decReconnect mReconnect = ") + String(mReconnect) + " ");
}
// return mReconnect;
}
void loop() {
//DPRINT(F("m"));
//if(!mClient->connected())
@ -86,12 +113,65 @@ class mqtt {
private:
void reconnect(void) {
//DPRINTLN(DBG_VERBOSE, F("mqtt.h:reconnect"));
if(!mClient->connected()) {
boolean rc;
if (mReconnect < 2 ) {
DPRINTLN(DBG_DEBUG, F("----------------"));
DPRINTLN(DBG_DEBUG, F("mqtt.h:reconnect"));
rc = mClient->connected();
DPRINTLN(DBG_DEBUG, F(" Connected 1 = ") + String(rc) + " ");
DPRINTLN(DBG_DEBUG, F(" MQTT Client->_state ") + String(mClient->state()) + " ");
DPRINTLN(DBG_DEBUG, F(" WIFI Client.status ") + String(mEspClient.status()) + " ");
}
else
rc = false;
if(!rc) {
// HorstG-57; Test wegen MQTT Reconnect problem
if ( mReconnect == 0 ) {
// es hat schon mal einen Connect gegeben
DPRINTLN(DBG_INFO, F("mqtt.h:Disconnect"));
DPRINTLN(DBG_INFO, F(" 1. MQTT Client->_state ") + String(mClient->state()) + " ");
DPRINTLN(DBG_INFO, F(" WIFI Client.status ") + String(mEspClient.status()) + " ");
// der Server und der Port müssen neu gesetzt werden,
// da ein Loss_Connect die Werte zerstört hat.
mClient->setServer(mBroker, mPort);
mClient->setBufferSize(MQTT_MAX_PACKET_SIZE);
// Verzögerung des MQTT reconnects um 5 Sekunden
mReconnect = 5;
}
if ( mReconnect < 2 ) {
// ein MQTT Connect findet statt wenn mreconnect auf 1 bzw 0 steht.
// 1 = es hat noch nie ein MQTT Connect stattgefunden
// 0 = ein MQTT Connect war schon mal erfogreich
if((strlen(mUser) > 0) && (strlen(mPwd) > 0))
mClient->connect(DEF_DEVICE_NAME, mUser, mPwd);
rc = mClient->connect(mDeviceName, mUser, mPwd);
else
mClient->connect(DEF_DEVICE_NAME);
rc = mClient->connect(mDeviceName);
DPRINTLN(DBG_DEBUG, F(" Connect = ") + String(rc) + " ");
DPRINTLN(DBG_DEBUG, F(" 2. MQTT Client->_state ") + String(mClient->state()) + " ");
DPRINTLN(DBG_DEBUG, F(" WIFI Client.status ") + String(mEspClient.status()) + " ");
if ( rc )
mReconnect = 0;
else
mReconnect = 5;
// rc = mClient->connected();
// DPRINTLN(DBG_DEBUG, F(" Connected = ") + String(rc) + " ");
}
}
}
@ -99,10 +179,14 @@ class mqtt {
PubSubClient *mClient;
bool mAddressSet;
bool mInsend;
uint16_t mPort;
uint8_t mReconnect;
char mUser[MQTT_USER_LEN];
char mPwd[MQTT_PWD_LEN];
char mTopic[MQTT_TOPIC_LEN];
char mDeviceName[DEVNAME_LEN];
char mBroker[MQTT_ADDR_LEN];
};
#endif /*__MQTT_H_*/

Loading…
Cancel
Save