Browse Source

MQTT Yield Day zero, next try to fix #671, thx @beegee3

added Solenso inverter to supported devices
improved reconnection of MQTT #650
pull/698/head
lumapu 2 years ago
parent
commit
b69ab06637
  1. 5
      src/CHANGES.md
  2. 3
      src/app.h
  3. 183
      src/publisher/pubMqtt.h

5
src/CHANGES.md

@ -2,6 +2,11 @@
(starting from release version `0.5.66`) (starting from release version `0.5.66`)
## 0.5.88
* MQTT Yield Day zero, next try to fix #671, thx @beegee3
* added Solenso inverter to supported devices
* improved reconnection of MQTT #650
## 0.5.87 ## 0.5.87
* fix yield total correction as module (inverter input) value #570 * fix yield total correction as module (inverter input) value #570
* reneabled instant start communication (once NTP is synced) #674 * reneabled instant start communication (once NTP is synced) #674

3
src/app.h

@ -198,7 +198,8 @@ class app : public IApp, public ah::Scheduler {
void payloadEventListener(uint8_t cmd) { void payloadEventListener(uint8_t cmd) {
#if !defined(AP_ONLY) #if !defined(AP_ONLY)
mMqtt.payloadEventListener(cmd); if (mMqttEnabled)
mMqtt.payloadEventListener(cmd);
#endif #endif
if(mConfig->plugin.display.type != 0) if(mConfig->plugin.display.type != 0)
mMonoDisplay.payloadEventListener(cmd); mMonoDisplay.payloadEventListener(cmd);

183
src/publisher/pubMqtt.h

@ -40,6 +40,7 @@ class PubMqtt {
mTxCnt = 0; mTxCnt = 0;
mSubscriptionCb = NULL; mSubscriptionCb = NULL;
memset(mLastIvState, MQTT_STATUS_NOT_AVAIL_NOT_PROD, MAX_NUM_INVERTERS); memset(mLastIvState, MQTT_STATUS_NOT_AVAIL_NOT_PROD, MAX_NUM_INVERTERS);
mLastAnyAvail = false;
} }
~PubMqtt() { } ~PubMqtt() { }
@ -51,7 +52,6 @@ class PubMqtt {
mSys = sys; mSys = sys;
mUtcTimestamp = utcTs; mUtcTimestamp = utcTs;
mIntervalTimeout = 1; mIntervalTimeout = 1;
mReconnectRequest = false;
snprintf(mLwtTopic, MQTT_TOPIC_LEN + 5, "%s/mqtt", mCfgMqtt->topic); snprintf(mLwtTopic, MQTT_TOPIC_LEN + 5, "%s/mqtt", mCfgMqtt->topic);
@ -73,29 +73,31 @@ class PubMqtt {
} }
inline void connect() { inline void connect() {
mReconnectRequest = false;
if(!mClient.connected()) if(!mClient.connected())
mClient.connect(); mClient.connect();
} }
void tickerSecond() { void tickerSecond() {
if (mIntervalTimeout > 0)
mIntervalTimeout--;
if(!mClient.connected()) {
mClient.connect();
return; // next try in a second
}
if(0 == mCfgMqtt->interval) // no fixed interval, publish once new data were received (from inverter) if(0 == mCfgMqtt->interval) // no fixed interval, publish once new data were received (from inverter)
sendIvData(); sendIvData();
else { // send mqtt data in a fixed interval else { // send mqtt data in a fixed interval
if(--mIntervalTimeout == 0) { if(mIntervalTimeout == 0) {
mIntervalTimeout = mCfgMqtt->interval; mIntervalTimeout = mCfgMqtt->interval;
mSendList.push(RealTimeRunData_Debug); mSendList.push(RealTimeRunData_Debug);
sendIvData(); sendIvData();
} }
} }
if(mReconnectRequest) {
connect();
return;
}
} }
void tickerMinute() { void tickerMinute() {
processIvStatus();
char val[12]; char val[12];
snprintf(val, 12, "%ld", millis() / 1000); snprintf(val, 12, "%ld", millis() / 1000);
publish("uptime", val); publish("uptime", val);
@ -317,7 +319,6 @@ class PubMqtt {
switch (reason) { switch (reason) {
case espMqttClientTypes::DisconnectReason::TCP_DISCONNECTED: case espMqttClientTypes::DisconnectReason::TCP_DISCONNECTED:
DBGPRINTLN(F("TCP disconnect")); DBGPRINTLN(F("TCP disconnect"));
mReconnectRequest = true;
break; break;
case espMqttClientTypes::DisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION: case espMqttClientTypes::DisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION:
DBGPRINTLN(F("wrong protocol version")); DBGPRINTLN(F("wrong protocol version"));
@ -414,7 +415,7 @@ class PubMqtt {
} }
bool processIvStatus() { bool processIvStatus() {
// returns true if all inverters are available // returns true if any inverter is available
bool allAvail = true; // shows if all enabled inverters are available bool allAvail = true; // shows if all enabled inverters are available
bool anyAvail = false; // shows if at least one enabled inverter is available bool anyAvail = false; // shows if at least one enabled inverter is available
bool changed = false; bool changed = false;
@ -439,6 +440,10 @@ class PubMqtt {
} }
if(mLastIvState[id] != status) { if(mLastIvState[id] != status) {
// if status changed from producing to not producing send last data immediately
if (MQTT_STATUS_AVAIL_PROD == mLastIvState[id])
sendData(iv, RealTimeRunData_Debug);
mLastIvState[id] = status; mLastIvState[id] = status;
changed = true; changed = true;
@ -455,10 +460,9 @@ class PubMqtt {
if(changed) { if(changed) {
snprintf(val, 32, "%d", ((allAvail) ? MQTT_STATUS_ONLINE : ((anyAvail) ? MQTT_STATUS_PARTIAL : MQTT_STATUS_OFFLINE))); snprintf(val, 32, "%d", ((allAvail) ? MQTT_STATUS_ONLINE : ((anyAvail) ? MQTT_STATUS_PARTIAL : MQTT_STATUS_OFFLINE)));
publish("status", val, true); publish("status", val, true);
//sendIvData(false); // false prevents loop of same function
} }
return allAvail; return anyAvail;
} }
void sendAlarmData() { void sendAlarmData() {
@ -474,93 +478,114 @@ class PubMqtt {
} }
} }
void sendData(Inverter<> *iv, uint8_t curInfoCmd) {
char topic[7 + MQTT_TOPIC_LEN], val[40];
record_t<> *rec = iv->getRecordStruct(curInfoCmd);
for (uint8_t i = 0; i < rec->length; i++) {
bool retained = false;
if (curInfoCmd == RealTimeRunData_Debug) {
switch (rec->assign[i].fieldId) {
case FLD_YT:
case FLD_YD:
if ((rec->assign[i].ch == CH0) && (!iv->isProducing(*mUtcTimestamp))) // avoids returns to 0 on restart
continue;
retained = true;
break;
}
}
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", iv->config->name, rec->assign[i].ch, fields[rec->assign[i].fieldId]);
snprintf(val, 40, "%g", ah::round3(iv->getValue(i, rec)));
publish(topic, val, retained);
yield();
}
}
void sendIvData() { void sendIvData() {
bool anyAvail = processIvStatus();
if (mLastAnyAvail != anyAvail)
mSendList.push(RealTimeRunData_Debug); // makes shure that total values are calculated
if(mSendList.empty()) if(mSendList.empty())
return; return;
char topic[7 + MQTT_TOPIC_LEN], val[40]; char topic[7 + MQTT_TOPIC_LEN], val[40];
float total[4]; float total[4];
bool sendTotal = false; bool RTRDataHasBeenSent = false;
while(!mSendList.empty()) { while(!mSendList.empty()) {
memset(total, 0, sizeof(float) * 4); memset(total, 0, sizeof(float) * 4);
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) { uint8_t curInfoCmd = mSendList.front();
Inverter<> *iv = mSys->getInverterByPos(id);
if ((NULL == iv) || (MQTT_STATUS_NOT_AVAIL_NOT_PROD == mLastIvState[id]))
continue; // skip to next inverter
record_t<> *rec = iv->getRecordStruct(mSendList.front());
// data
//if(iv->isAvailable(*mUtcTimestamp, rec) || (0 != mCfgMqtt->interval)) { // is avail or fixed pulish interval was set
for (uint8_t i = 0; i < rec->length; i++) {
bool retained = false;
if (mSendList.front() == RealTimeRunData_Debug) {
switch (rec->assign[i].fieldId) {
case FLD_YT:
case FLD_YD:
if ((rec->assign[i].ch == CH0) && (!iv->isProducing(*mUtcTimestamp))) // avoids returns to 0 on restart
continue;
retained = true;
break;
}
}
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", iv->config->name, rec->assign[i].ch, fields[rec->assign[i].fieldId]); if ((curInfoCmd != RealTimeRunData_Debug) || !RTRDataHasBeenSent) { // send RTR Data only once
snprintf(val, 40, "%g", ah::round3(iv->getValue(i, rec))); for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
publish(topic, val, retained); Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL == iv)
continue; // skip to next inverter
// send RTR Data only if status is available
if ((curInfoCmd != RealTimeRunData_Debug) || (MQTT_STATUS_AVAIL_PROD == mLastIvState[id]))
sendData(iv, curInfoCmd);
// calculate total values for RealTimeRunData_Debug // calculate total values for RealTimeRunData_Debug
if (mSendList.front() == RealTimeRunData_Debug) { if (curInfoCmd == RealTimeRunData_Debug) {
if (CH0 == rec->assign[i].ch) { record_t<> *rec = iv->getRecordStruct(curInfoCmd);
switch (rec->assign[i].fieldId) {
case FLD_PAC: for (uint8_t i = 0; i < rec->length; i++) {
total[0] += iv->getValue(i, rec); if (CH0 == rec->assign[i].ch) {
break; switch (rec->assign[i].fieldId) {
case FLD_YT: case FLD_PAC:
total[1] += iv->getValue(i, rec); total[0] += iv->getValue(i, rec);
break; break;
case FLD_YD: case FLD_YT:
total[2] += iv->getValue(i, rec); total[1] += iv->getValue(i, rec);
break; break;
case FLD_PDC: case FLD_YD:
total[3] += iv->getValue(i, rec); total[2] += iv->getValue(i, rec);
break; break;
case FLD_PDC:
total[3] += iv->getValue(i, rec);
break;
}
} }
sendTotal = true;
} }
yield();
} }
yield();
} }
//}
}
mSendList.pop(); // remove from list once all inverters were processed
if ((true == sendTotal) && processIvStatus()) { if (curInfoCmd == RealTimeRunData_Debug) {
uint8_t fieldId; uint8_t fieldId;
for (uint8_t i = 0; i < 4; i++) { for (uint8_t i = 0; i < 4; i++) {
switch (i) { switch (i) {
default: default:
case 0: case 0:
fieldId = FLD_PAC; fieldId = FLD_PAC;
break; break;
case 1: case 1:
fieldId = FLD_YT; fieldId = FLD_YT;
break; break;
case 2: case 2:
fieldId = FLD_YD; fieldId = FLD_YD;
break; break;
case 3: case 3:
fieldId = FLD_PDC; fieldId = FLD_PDC;
break; break;
}
snprintf(topic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]);
snprintf(val, 40, "%g", ah::round3(total[i]));
publish(topic, val, true);
} }
snprintf(topic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]); RTRDataHasBeenSent = true;
snprintf(val, 40, "%g", ah::round3(total[i])); yield();
publish(topic, val, true);
} }
} }
mSendList.pop(); // remove from list once all inverters were processed
} }
mLastAnyAvail = anyAvail;
} }
espMqttClient mClient; espMqttClient mClient;
@ -575,7 +600,7 @@ class PubMqtt {
std::queue<uint8_t> mSendList; std::queue<uint8_t> mSendList;
std::queue<alarm_t> mAlarmList; std::queue<alarm_t> mAlarmList;
subscriptionCb mSubscriptionCb; subscriptionCb mSubscriptionCb;
bool mReconnectRequest; bool mLastAnyAvail;
uint8_t mLastIvState[MAX_NUM_INVERTERS]; uint8_t mLastIvState[MAX_NUM_INVERTERS];
uint16_t mIntervalTimeout; uint16_t mIntervalTimeout;

Loading…
Cancel
Save