Browse Source

added statemachine

pull/935/head
lumapu 2 years ago
parent
commit
c004c83bf9
  1. 123
      src/publisher/pubMqtt.h
  2. 187
      src/publisher/pubMqttIvData.h

123
src/publisher/pubMqtt.h

@ -22,6 +22,7 @@
#include "../hm/hmSystem.h" #include "../hm/hmSystem.h"
#include "pubMqttDefs.h" #include "pubMqttDefs.h"
#include "pubMqttIvData.h"
#define QOS_0 0 #define QOS_0 0
@ -34,6 +35,13 @@ struct alarm_t {
alarm_t(uint16_t c, uint32_t s, uint32_t e) : code(c), start(s), end(e) {} alarm_t(uint16_t c, uint32_t s, uint32_t e) : code(c), start(s), end(e) {}
}; };
typedef struct {
bool running;
uint8_t lastIvId;
uint8_t sub;
uint8_t foundIvCnt;
} discovery_t;
template<class HMSYSTEM> template<class HMSYSTEM>
class PubMqtt { class PubMqtt {
public: public:
@ -56,8 +64,8 @@ class PubMqtt {
mUtcTimestamp = utcTs; mUtcTimestamp = utcTs;
mIntervalTimeout = 1; mIntervalTimeout = 1;
mSendIvData.setup(sys, utcTs);
mDiscovery.running = false; mDiscovery.running = false;
mSendIvData.running = false;
snprintf(mLwtTopic, MQTT_TOPIC_LEN + 5, "%s/mqtt", mCfgMqtt->topic); snprintf(mLwtTopic, MQTT_TOPIC_LEN + 5, "%s/mqtt", mCfgMqtt->topic);
@ -82,16 +90,13 @@ class PubMqtt {
} }
void loop() { void loop() {
mSendIvData.loop();
#if defined(ESP8266) #if defined(ESP8266)
mClient.loop(); mClient.loop();
yield(); yield();
#endif #endif
if(mSendIvData.running) {
sendIvDataLoop();
return;
}
if(mDiscovery.running) if(mDiscovery.running)
discoveryConfigLoop(); discoveryConfigLoop();
} }
@ -106,14 +111,13 @@ class PubMqtt {
return; // next try in a second return; // next try in a second
} }
if(0 == mCfgMqtt->interval) { // no fixed interval, publish once new data were received (from inverter) if(0 == mCfgMqtt->interval) // no fixed interval, publish once new data were received (from inverter)
sendIvDataStart(); sendIvData();
}
else { // send mqtt data in a fixed interval else { // send mqtt data in a fixed interval
if(mIntervalTimeout == 0) { if(mIntervalTimeout == 0) {
mIntervalTimeout = mCfgMqtt->interval; mIntervalTimeout = mCfgMqtt->interval;
mSendList.push(RealTimeRunData_Debug); mSendList.push(RealTimeRunData_Debug);
sendIvDataStart(); sendIvData();
} }
} }
} }
@ -554,65 +558,55 @@ class PubMqtt {
} }
} }
void sendIvDataStart() { void sendIvData() {
mSendIvData.RTRDataHasBeenSent = false;
memset(mSendIvData.total, 0, sizeof(float) * 4);
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL != iv) {
if (iv->config->enabled) {
mSendIvData.lastIvId = id;
mSendIvData.running = true;
mSendIvData.lastIvReached = false;
mSendIvData.sendTotals = false;
break;
}
}
}
}
void sendIvDataLoop(void) {
bool anyAvail = processIvStatus(); bool anyAvail = processIvStatus();
if (mLastAnyAvail != anyAvail) if (mLastAnyAvail != anyAvail)
mSendList.push(RealTimeRunData_Debug); // makes sure that total values are calculated mSendList.push(RealTimeRunData_Debug); // makes shure that total values are calculated
if(mSendList.empty()) { if(mSendList.empty())
mSendIvData.running = false;
return; return;
}
float total[4];
bool RTRDataHasBeenSent = false;
//while(!mSendList.empty()) { while(!mSendList.empty()) {
memset(total, 0, sizeof(float) * 4);
uint8_t curInfoCmd = mSendList.front(); uint8_t curInfoCmd = mSendList.front();
if ((curInfoCmd != RealTimeRunData_Debug) || !mSendIvData.RTRDataHasBeenSent) { // send RTR Data only once
mSendIvData.sendTotals = (curInfoCmd == RealTimeRunData_Debug);
if(!mSendIvData.lastIvReached) { if ((curInfoCmd != RealTimeRunData_Debug) || !RTRDataHasBeenSent) { // send RTR Data only once
Inverter<> *iv = mSys->getInverterByPos(mSendIvData.lastIvId); bool sendTotals = (curInfoCmd == RealTimeRunData_Debug);
for (uint8_t id = 0; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL == iv)
continue; // skip to next inverter
if (!iv->config->enabled)
continue; // skip to next inverter
// send RTR Data only if status is available // send RTR Data only if status is available
if ((curInfoCmd != RealTimeRunData_Debug) || (MQTT_STATUS_NOT_AVAIL_NOT_PROD != mLastIvState[mSendIvData.lastIvId])) if ((curInfoCmd != RealTimeRunData_Debug) || (MQTT_STATUS_NOT_AVAIL_NOT_PROD != mLastIvState[id]))
sendData(iv, curInfoCmd); sendData(iv, curInfoCmd);
// calculate total values for RealTimeRunData_Debug // calculate total values for RealTimeRunData_Debug
if (mSendIvData.sendTotals) { if (sendTotals) {
record_t<> *rec = iv->getRecordStruct(curInfoCmd); record_t<> *rec = iv->getRecordStruct(curInfoCmd);
mSendIvData.sendTotals &= (iv->getLastTs(rec) > 0); sendTotals &= (iv->getLastTs(rec) > 0);
if (mSendIvData.sendTotals) { if (sendTotals) {
for (uint8_t i = 0; i < rec->length; i++) { for (uint8_t i = 0; i < rec->length; i++) {
if (CH0 == rec->assign[i].ch) { if (CH0 == rec->assign[i].ch) {
switch (rec->assign[i].fieldId) { switch (rec->assign[i].fieldId) {
case FLD_PAC: case FLD_PAC:
mSendIvData.total[0] += iv->getValue(i, rec); total[0] += iv->getValue(i, rec);
break; break;
case FLD_YT: case FLD_YT:
mSendIvData.total[1] += iv->getValue(i, rec); total[1] += iv->getValue(i, rec);
break; break;
case FLD_YD: case FLD_YD:
mSendIvData.total[2] += iv->getValue(i, rec); total[2] += iv->getValue(i, rec);
break; break;
case FLD_PDC: case FLD_PDC:
mSendIvData.total[3] += iv->getValue(i, rec); total[3] += iv->getValue(i, rec);
break; break;
} }
} }
@ -620,21 +614,9 @@ class PubMqtt {
} }
} }
yield(); yield();
// get next inverter
for (uint8_t id = mSendIvData.lastIvId; id < mSys->getNumInverters(); id++) {
Inverter<> *iv = mSys->getInverterByPos(id);
if (NULL != iv) {
if (iv->config->enabled) {
mSendIvData.lastIvId = id;
return;
}
}
}
mSendIvData.lastIvReached = true;
} }
if (mSendIvData.sendTotals) { if (sendTotals) {
uint8_t fieldId; uint8_t fieldId;
for (uint8_t i = 0; i < 4; i++) { for (uint8_t i = 0; i < 4; i++) {
bool retained = true; bool retained = true;
@ -656,36 +638,20 @@ class PubMqtt {
break; break;
} }
snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]); snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]);
snprintf(mVal, 40, "%g", ah::round3(mSendIvData.total[i])); snprintf(mVal, 40, "%g", ah::round3(total[i]));
publish(mSubTopic, mVal, retained); publish(mSubTopic, mVal, retained);
} }
mSendIvData.RTRDataHasBeenSent = true; RTRDataHasBeenSent = true;
yield(); yield();
} }
} }
mSendList.pop(); // remove from list once all inverters were processed mSendList.pop(); // remove from list once all inverters were processed
//} // end while }
mLastAnyAvail = anyAvail; mLastAnyAvail = anyAvail;
} }
typedef struct {
bool running;
uint8_t lastIvId;
bool lastIvReached;
bool sendTotals;
float total[4];
bool RTRDataHasBeenSent;
} publish_t;
typedef struct {
bool running;
uint8_t lastIvId;
uint8_t sub;
uint8_t foundIvCnt;
} discovery_t;
espMqttClient mClient; espMqttClient mClient;
cfgMqtt_t *mCfgMqtt; cfgMqtt_t *mCfgMqtt;
#if defined(ESP8266) #if defined(ESP8266)
@ -693,6 +659,8 @@ class PubMqtt {
#endif #endif
HMSYSTEM *mSys; HMSYSTEM *mSys;
PubMqttIvData<HMSYSTEM> mSendIvData;
uint32_t *mUtcTimestamp; uint32_t *mUtcTimestamp;
uint32_t mRxCnt, mTxCnt; uint32_t mRxCnt, mTxCnt;
std::queue<uint8_t> mSendList; std::queue<uint8_t> mSendList;
@ -712,7 +680,6 @@ class PubMqtt {
char mSubTopic[32 + MAX_NAME_LENGTH + 1]; char mSubTopic[32 + MAX_NAME_LENGTH + 1];
char mVal[40]; char mVal[40];
discovery_t mDiscovery; discovery_t mDiscovery;
publish_t mSendIvData;
}; };
#endif /*__PUB_MQTT_H__*/ #endif /*__PUB_MQTT_H__*/

187
src/publisher/pubMqttIvData.h

@ -0,0 +1,187 @@
#ifndef __PUB_MQTT_IV_DATA_H__
#define __PUB_MQTT_IV_DATA_H__
#include "../utils/dbg.h"
#include "../hm/hmSystem.h"
#include "pubMqttDefs.h"
typedef std::function<void(const char *subTopic, const char *payload, bool retained)> pubMqttPublisherType;
template<class HMSYSTEM>
class PubMqttIvData {
public:
void setup(HMSYSTEM *sys, uint32_t *utcTs) {
mSys = sys;
mUtcTimestamp = utcTs;
mState = IDLE;
memset(mIvLastRTRpub, 0, MAX_NUM_INVERTERS * 4);
mTable[IDLE] = &PubMqttIvData::stateIdle;
mTable[START] = &PubMqttIvData::stateStart;
mTable[FIND_NXT_IV] = &PubMqttIvData::stateFindNxtIv;
mTable[SEND_DATA] = &PubMqttIvData::stateSend;
mTable[SEND_TOTALS] = &PubMqttIvData::stateSendTotals;
}
void loop() {
(this->*mTable[mState])();
yield();
}
bool start(uint8_t cmd) {
if(IDLE != mState)
return false;
mCmd = cmd;
mState = START;
return true;
}
void setPublishFunc(pubMqttPublisherType cb) {
mPublish = cb;
}
private:
enum State {IDLE, START, FIND_NXT_IV, SEND_DATA, SEND_TOTALS, NUM_STATES};
typedef void (PubMqttIvData::*StateFunction)();
void stateIdle() {
; // nothing to do
}
void stateStart() {
mLastIvId = 0;
mSendTotals = (RealTimeRunData_Debug == mCmd);
memset(mTotal, 0, sizeof(float) * 4);
mState = FIND_NXT_IV;
}
void stateFindNxtIv() {
bool found = false;
for (; mLastIvId < mSys->getNumInverters(); mLastIvId++) {
mIv = mSys->getInverterByPos(mLastIvId);
if (NULL != mIv) {
if (mIv->config->enabled) {
found = true;
break;
}
}
}
mPos = 0;
if(found)
mState = SEND_DATA;
else if(mSendTotals)
mState = SEND_TOTALS;
else
mState = IDLE;
}
void stateSend() {
record_t<> *rec = mIv->getRecordStruct(mCmd);
uint32_t lastTs = mIv->getLastTs(rec);
bool pubData = (lastTs > 0);
if (mCmd == RealTimeRunData_Debug)
pubData &= (lastTs != mIvLastRTRpub[mIv->id]);
if (pubData) {
mIvLastRTRpub[mIv->id] = lastTs;
//for (uint8_t i = 0; i < rec->length; i++) {
if(mPos < rec->length) {
bool retained = false;
if (mCmd == RealTimeRunData_Debug) {
switch (rec->assign[mPos].fieldId) {
case FLD_YT:
case FLD_YD:
if ((rec->assign[mPos].ch == CH0) && (!mIv->isProducing(*mUtcTimestamp))) { // avoids returns to 0 on restart
mPos++;
return;
}
retained = true;
break;
}
// calculate total values for RealTimeRunData_Debug
if (CH0 == rec->assign[mPos].ch) {
switch (rec->assign[mPos].fieldId) {
case FLD_PAC:
mTotal[0] += mIv->getValue(mPos, rec);
break;
case FLD_YT:
mTotal[1] += mIv->getValue(mPos, rec);
break;
case FLD_YD:
mTotal[2] += mIv->getValue(mPos, rec);
break;
case FLD_PDC:
mTotal[3] += mIv->getValue(mPos, rec);
break;
}
}
}
snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", mIv->config->name, rec->assign[mPos].ch, fields[rec->assign[mPos].fieldId]);
snprintf(mVal, 40, "%g", ah::round3(mIv->getValue(mPos, rec)));
mPublish(mSubTopic, mVal, retained);
mPos++;
} else
mState = FIND_NXT_IV;
} else
mState = FIND_NXT_IV;
}
void stateSendTotals() {
uint8_t fieldId;
//for (uint8_t i = 0; i < 4; i++) {
if(mPos < 4) {
bool retained = true;
switch (mPos) {
default:
case 0:
fieldId = FLD_PAC;
retained = false;
break;
case 1:
fieldId = FLD_YT;
break;
case 2:
fieldId = FLD_YD;
break;
case 3:
fieldId = FLD_PDC;
retained = false;
break;
}
snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]);
snprintf(mVal, 40, "%g", ah::round3(mTotal[mPos]));
mPublish(mSubTopic, mVal, retained);
mPos++;
} else
mState = IDLE;
RTRDataHasBeenSent = true;
}
HMSYSTEM *mSys;
uint32_t *mUtcTimestamp;
pubMqttPublisherType mPublish;
State mState;
StateFunction mTable[NUM_STATES];
uint8_t mCmd;
uint8_t mLastIvId;
bool mSendTotals;
float mTotal[4];
Inverter<> *mIv;
uint8_t mPos;
uint32_t mIvLastRTRpub[MAX_NUM_INVERTERS];
char mSubTopic[32 + MAX_NAME_LENGTH + 1];
char mVal[40];
};
#endif /*__PUB_MQTT_IV_DATA_H__*/
Loading…
Cancel
Save