Browse Source

fix mqtt send all received records

renamed inverter address to serial number in setup
pull/374/head
lumapu 2 years ago
parent
commit
57b8a0abab
  1. 232
      tools/esp8266/app.cpp
  2. 2
      tools/esp8266/app.h
  3. 2
      tools/esp8266/defines.h
  4. 2
      tools/esp8266/html/setup.html
  5. 4
      tools/esp8266/html/style.css

232
tools/esp8266/app.cpp

@ -292,110 +292,106 @@ bool app::buildPayload(uint8_t id) {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
void app::processPayload(bool retransmit) { void app::processPayload(bool retransmit) {
bool doMQTT = false;
// DPRINTLN(DBG_INFO, F("processPayload"));
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) { for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id); Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL != iv) { if (NULL != iv)
if ((mPayload[iv->id].txId != (TX_REQ_INFO + ALL_FRAMES)) && (0 != mPayload[iv->id].txId)) { break; // skip to next inverter
// no processing needed if txId is not 0x95
// DPRINTLN(DBG_INFO, F("processPayload - set complete, txId: ") + String(mPayload[iv->id].txId, HEX));
mPayload[iv->id].complete = true;
}
if (!mPayload[iv->id].complete) { if ((mPayload[iv->id].txId != (TX_REQ_INFO + ALL_FRAMES)) && (0 != mPayload[iv->id].txId)) {
if (!buildPayload(iv->id)) // payload not complete // no processing needed if txId is not 0x95
{ // DPRINTLN(DBG_INFO, F("processPayload - set complete, txId: ") + String(mPayload[iv->id].txId, HEX));
if (mPayload[iv->id].requested) { mPayload[iv->id].complete = true;
if (retransmit) { }
if (iv->devControlCmd == Restart || iv->devControlCmd == CleanState_LockAndAlarm) {
// This is required to prevent retransmissions without answer. if (!mPayload[iv->id].complete) {
DPRINTLN(DBG_INFO, F("Prevent retransmit on Restart / CleanState_LockAndAlarm...")); if (!buildPayload(iv->id)) { // payload not complete
mPayload[iv->id].retransmits = mConfig.maxRetransPerPyld; if ((mPayload[iv->id].requested) && (retransmit)) {
} else { if (iv->devControlCmd == Restart || iv->devControlCmd == CleanState_LockAndAlarm) {
if (mPayload[iv->id].retransmits < mConfig.maxRetransPerPyld) { // This is required to prevent retransmissions without answer.
mPayload[iv->id].retransmits++; DPRINTLN(DBG_INFO, F("Prevent retransmit on Restart / CleanState_LockAndAlarm..."));
if (mPayload[iv->id].maxPackId != 0) { mPayload[iv->id].retransmits = mConfig.maxRetransPerPyld;
for (uint8_t i = 0; i < (mPayload[iv->id].maxPackId - 1); i++) { } else {
if (mPayload[iv->id].len[i] == 0) { if (mPayload[iv->id].retransmits < mConfig.maxRetransPerPyld) {
if (mConfig.serialDebug) mPayload[iv->id].retransmits++;
DPRINTLN(DBG_WARN, F("while retrieving data: Frame ") + String(i + 1) + F(" missing: Request Retransmit")); if (mPayload[iv->id].maxPackId != 0) {
mSys->Radio.sendCmdPacket(iv->radioId.u64, TX_REQ_INFO, (SINGLE_FRAME + i), true); for (uint8_t i = 0; i < (mPayload[iv->id].maxPackId - 1); i++) {
break; // only retransmit one frame per loop if (mPayload[iv->id].len[i] == 0) {
}
yield();
}
} else {
if (mConfig.serialDebug) if (mConfig.serialDebug)
DPRINTLN(DBG_WARN, F("while retrieving data: last frame missing: Request Retransmit")); DPRINTLN(DBG_WARN, F("while retrieving data: Frame ") + String(i + 1) + F(" missing: Request Retransmit"));
if (0x00 != mLastPacketId) mSys->Radio.sendCmdPacket(iv->radioId.u64, TX_REQ_INFO, (SINGLE_FRAME + i), true);
mSys->Radio.sendCmdPacket(iv->radioId.u64, TX_REQ_INFO, mLastPacketId, true); break; // only retransmit one frame per loop
else {
mPayload[iv->id].txCmd = iv->getQueuedCmd();
mSys->Radio.sendTimePacket(iv->radioId.u64, mPayload[iv->id].txCmd, mPayload[iv->id].ts, iv->alarmMesIndex);
}
} }
mSys->Radio.switchRxCh(100); yield();
}
} else {
if (mConfig.serialDebug)
DPRINTLN(DBG_WARN, F("while retrieving data: last frame missing: Request Retransmit"));
if (0x00 != mLastPacketId)
mSys->Radio.sendCmdPacket(iv->radioId.u64, TX_REQ_INFO, mLastPacketId, true);
else {
mPayload[iv->id].txCmd = iv->getQueuedCmd();
mSys->Radio.sendTimePacket(iv->radioId.u64, mPayload[iv->id].txCmd, mPayload[iv->id].ts, iv->alarmMesIndex);
} }
} }
mSys->Radio.switchRxCh(100);
} }
} }
} else { // payload complete }
DPRINTLN(DBG_INFO, F("procPyld: cmd: ") + String(mPayload[iv->id].txCmd)); } else { // payload complete
DPRINTLN(DBG_INFO, F("procPyld: txid: 0x") + String(mPayload[iv->id].txId, HEX)); DPRINTLN(DBG_INFO, F("procPyld: cmd: ") + String(mPayload[iv->id].txCmd));
DPRINTLN(DBG_DEBUG, F("procPyld: max: ") + String(mPayload[iv->id].maxPackId)); DPRINTLN(DBG_INFO, F("procPyld: txid: 0x") + String(mPayload[iv->id].txId, HEX));
record_t<> *rec = iv->getRecordStruct(mPayload[iv->id].txCmd); // choose the parser DPRINTLN(DBG_DEBUG, F("procPyld: max: ") + String(mPayload[iv->id].maxPackId));
mPayload[iv->id].complete = true; record_t<> *rec = iv->getRecordStruct(mPayload[iv->id].txCmd); // choose the parser
mPayload[iv->id].complete = true;
uint8_t payload[128]; uint8_t payload[128];
uint8_t payloadLen = 0; uint8_t payloadLen = 0;
memset(payload, 0, 128); memset(payload, 0, 128);
for (uint8_t i = 0; i < (mPayload[iv->id].maxPackId); i++) { for (uint8_t i = 0; i < (mPayload[iv->id].maxPackId); i++) {
memcpy(&payload[payloadLen], mPayload[iv->id].data[i], (mPayload[iv->id].len[i])); memcpy(&payload[payloadLen], mPayload[iv->id].data[i], (mPayload[iv->id].len[i]));
payloadLen += (mPayload[iv->id].len[i]); payloadLen += (mPayload[iv->id].len[i]);
yield(); yield();
} }
payloadLen -= 2; payloadLen -= 2;
if (mConfig.serialDebug) {
DPRINT(DBG_INFO, F("Payload (") + String(payloadLen) + "): ");
mSys->Radio.dumpBuf(NULL, payload, payloadLen);
}
if (NULL == rec) { if (mConfig.serialDebug) {
DPRINTLN(DBG_ERROR, F("record is NULL!")); DPRINT(DBG_INFO, F("Payload (") + String(payloadLen) + "): ");
} else if ((rec->pyldLen == payloadLen) || (0 == rec->pyldLen)) { mSys->Radio.dumpBuf(NULL, payload, payloadLen);
if (mPayload[iv->id].txId == (TX_REQ_INFO + 0x80)) }
mStat.rxSuccess++;
rec->ts = mPayload[iv->id].ts; if (NULL == rec) {
for (uint8_t i = 0; i < rec->length; i++) { DPRINTLN(DBG_ERROR, F("record is NULL!"));
iv->addValue(i, payload, rec); } else if ((rec->pyldLen == payloadLen) || (0 == rec->pyldLen)) {
yield(); if (mPayload[iv->id].txId == (TX_REQ_INFO + 0x80))
} mStat.rxSuccess++;
iv->doCalculations();
doMQTT = true; rec->ts = mPayload[iv->id].ts;
} else { for (uint8_t i = 0; i < rec->length; i++) {
DPRINTLN(DBG_ERROR, F("plausibility check failed, expected ") + String(rec->pyldLen) + F(" bytes")); iv->addValue(i, payload, rec);
mStat.rxFail++; yield();
} }
iv->doCalculations();
iv->setQueuedCmdFinished(); mMqttSendList.push(mPayload[iv->id].txCmd);
} else {
DPRINTLN(DBG_ERROR, F("plausibility check failed, expected ") + String(rec->pyldLen) + F(" bytes"));
mStat.rxFail++;
} }
}
yield(); iv->setQueuedCmdFinished();
}
} }
yield();
} }
// ist MQTT aktiviert und es wurden Daten vom einem oder mehreren WR aufbereitet ( doMQTT = true) // ist MQTT aktiviert und es wurden Daten vom einem oder mehreren WR aufbereitet
// dann die den mMqttTicker auf mMqttIntervall -2 setzen, also // dann die den mMqttTicker auf mMqttIntervall -2 setzen, also
// MQTT aussenden in 2 sek aktivieren // MQTT aussenden in 2 sek aktivieren
if ((mMqttInterval != 0xffff) && doMQTT) { if ((mMqttInterval != 0xffff) && (!mMqttSendList.empty())) {
mMqttTicker = mMqttInterval - 2; mMqttTicker = mMqttInterval - 2;
} }
} }
@ -529,8 +525,6 @@ void app::sendMqttDiscoveryConfig(void) {
Inverter<> *iv = mSys->getInverterByPos(id); Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL != iv) { if (NULL != iv) {
record_t<> *rec = iv->getRecordStruct(RealTimeRunData_Debug); record_t<> *rec = iv->getRecordStruct(RealTimeRunData_Debug);
// TODO: next line makes no sense if discovery config is send manually by button
// if(iv->isAvailable(mUtcTimestamp, rec) && mMqttConfigSendState[id] != true) {
DynamicJsonDocument deviceDoc(128); DynamicJsonDocument deviceDoc(128);
deviceDoc["name"] = iv->name; deviceDoc["name"] = iv->name;
deviceDoc["ids"] = String(iv->serial.u64, HEX); deviceDoc["ids"] = String(iv->serial.u64, HEX);
@ -573,7 +567,6 @@ void app::sendMqttDiscoveryConfig(void) {
mMqttConfigSendState[id] = true; mMqttConfigSendState[id] = true;
yield(); yield();
//}
} }
} }
} }
@ -589,41 +582,52 @@ void app::sendMqtt(void) {
mMqtt.sendMsg("uptime", val); mMqtt.sendMsg("uptime", val);
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) { if(mMqttSendList.empty())
Inverter<> *iv = mSys->getInverterByPos(id); return;
if (NULL != iv) {
record_t<> *rec = iv->getRecordStruct(RealTimeRunData_Debug);
uint8_t status = MQTT_STATUS_AVAIL_PROD;
if (!iv->isAvailable(mUtcTimestamp, rec))
status = MQTT_STATUS_NOT_AVAIL_NOT_PROD;
if (!iv->isProducing(mUtcTimestamp, rec)) {
if (MQTT_STATUS_AVAIL_PROD == status)
status = MQTT_STATUS_AVAIL_NOT_PROD;
}
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/available_text", iv->name); while(!mMqttSendList.empty()) {
snprintf(val, 32, "%s%s%s%s", for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
(MQTT_STATUS_NOT_AVAIL_NOT_PROD) ? "not " : "", Inverter<> *iv = mSys->getInverterByPos(id);
"available and ", if (NULL != iv)
(MQTT_STATUS_NOT_AVAIL_NOT_PROD || MQTT_STATUS_AVAIL_NOT_PROD) ? "not " : "", break; // skip to next inverter
"producing"
); record_t<> *rec = iv->getRecordStruct(mMqttSendList.front());
mMqtt.sendMsg(topic, val);
if(mMqttSendList.front() == RealTimeRunData_Debug) {
// inverter status
uint8_t status = MQTT_STATUS_AVAIL_PROD;
if (!iv->isAvailable(mUtcTimestamp, rec))
status = MQTT_STATUS_NOT_AVAIL_NOT_PROD;
if (!iv->isProducing(mUtcTimestamp, rec)) {
if (MQTT_STATUS_AVAIL_PROD == status)
status = MQTT_STATUS_AVAIL_NOT_PROD;
}
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/available_text", iv->name);
snprintf(val, 32, "%s%s%s%s",
(MQTT_STATUS_NOT_AVAIL_NOT_PROD) ? "not " : "",
"available and ",
(MQTT_STATUS_NOT_AVAIL_NOT_PROD || MQTT_STATUS_AVAIL_NOT_PROD) ? "not " : "",
"producing"
);
mMqtt.sendMsg(topic, val);
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/available", iv->name); snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/available", iv->name);
snprintf(val, 32, "%d", status); snprintf(val, 32, "%d", status);
mMqtt.sendMsg(topic, val); mMqtt.sendMsg(topic, val);
if (iv->isAvailable(mUtcTimestamp, rec)) {
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/last_success", iv->name); snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/last_success", iv->name);
snprintf(val, 48, "%i", iv->getLastTs(rec) * 1000); snprintf(val, 48, "%i", iv->getLastTs(rec) * 1000);
mMqtt.sendMsg(topic, val); mMqtt.sendMsg(topic, val);
}
// data
for (uint8_t i = 0; i < rec->length; i++) {
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", iv->name, rec->assign[i].ch, fields[rec->assign[i].fieldId]);
snprintf(val, 10, "%.3f", iv->getValue(i, rec));
mMqtt.sendMsg(topic, val);
for (uint8_t i = 0; i < rec->length; i++) { // calculate total values for RealTimeRunData_Debug
snprintf(topic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", iv->name, rec->assign[i].ch, fields[rec->assign[i].fieldId]); if (mMqttSendList.front() == RealTimeRunData_Debug) {
snprintf(val, 10, "%.3f", iv->getValue(i, rec));
mMqtt.sendMsg(topic, val);
if (CH0 == rec->assign[i].ch) { if (CH0 == rec->assign[i].ch) {
switch (rec->assign[i].fieldId) { switch (rec->assign[i].fieldId) {
case FLD_PAC: case FLD_PAC:
@ -640,11 +644,13 @@ void app::sendMqtt(void) {
break; break;
} }
} }
yield(); sendTotal = true;
} }
sendTotal = true; yield();
} }
} }
mMqttSendList.pop(); // remove from list once all inverters were processed
} }
if (true == sendTotal) { if (true == sendTotal) {

2
tools/esp8266/app.h

@ -10,6 +10,7 @@
#include "Arduino.h" #include "Arduino.h"
#include <queue>
#include <RF24.h> #include <RF24.h>
#include <RF24_config.h> #include <RF24_config.h>
#include <ArduinoJson.h> #include <ArduinoJson.h>
@ -287,6 +288,7 @@ class app {
uint16_t mMqttInterval; uint16_t mMqttInterval;
bool mMqttActive; bool mMqttActive;
bool mMqttConfigSendState[MAX_NUM_INVERTERS]; bool mMqttConfigSendState[MAX_NUM_INVERTERS];
std::queue<uint8_t> mMqttSendList;
// serial // serial
uint16_t mSerialTicker; uint16_t mSerialTicker;

2
tools/esp8266/defines.h

@ -13,7 +13,7 @@
//------------------------------------- //-------------------------------------
#define VERSION_MAJOR 0 #define VERSION_MAJOR 0
#define VERSION_MINOR 5 #define VERSION_MINOR 5
#define VERSION_PATCH 26 #define VERSION_PATCH 27
//------------------------------------- //-------------------------------------

2
tools/esp8266/html/setup.html

@ -242,7 +242,7 @@
id = "inv" + id; id = "inv" + id;
iv.appendChild(lbl(id + "Addr", "Address*")); iv.appendChild(lbl(id + "Addr", "Serial Number (12 digits)*"));
var addr = inp(id + "Addr", obj["serial"], 12) var addr = inp(id + "Addr", obj["serial"], 12)
iv.appendChild(addr); iv.appendChild(addr);
['keyup', 'change'].forEach(function(evt) { ['keyup', 'change'].forEach(function(evt) {

4
tools/esp8266/html/style.css

@ -81,8 +81,8 @@ span.seperator {
} }
#footer { #footer {
height: 120px; height: 121px;
margin-top: -120px; margin-top: -121px;
background-color: #555; background-color: #555;
width: 100%; width: 100%;
font-size: 13px; font-size: 13px;

Loading…
Cancel
Save