diff --git a/src/publisher/pubMqtt.h b/src/publisher/pubMqtt.h index 7b766c92..e2bb96f5 100644 --- a/src/publisher/pubMqtt.h +++ b/src/publisher/pubMqtt.h @@ -22,6 +22,7 @@ #include "../hm/hmSystem.h" #include "pubMqttDefs.h" +#include "pubMqttIvData.h" #define QOS_0 0 @@ -34,6 +35,13 @@ struct alarm_t { alarm_t(uint16_t c, uint32_t s, uint32_t e) : code(c), start(s), end(e) {} }; +typedef struct { + bool running; + uint8_t lastIvId; + uint8_t sub; + uint8_t foundIvCnt; +} discovery_t; + template class PubMqtt { public: @@ -56,8 +64,8 @@ class PubMqtt { mUtcTimestamp = utcTs; mIntervalTimeout = 1; + mSendIvData.setup(sys, utcTs); mDiscovery.running = false; - mSendIvData.running = false; snprintf(mLwtTopic, MQTT_TOPIC_LEN + 5, "%s/mqtt", mCfgMqtt->topic); @@ -82,16 +90,13 @@ class PubMqtt { } void loop() { + mSendIvData.loop(); + #if defined(ESP8266) mClient.loop(); yield(); #endif - if(mSendIvData.running) { - sendIvDataLoop(); - return; - } - if(mDiscovery.running) discoveryConfigLoop(); } @@ -106,14 +111,13 @@ class PubMqtt { return; // next try in a second } - if(0 == mCfgMqtt->interval) { // no fixed interval, publish once new data were received (from inverter) - sendIvDataStart(); - } + if(0 == mCfgMqtt->interval) // no fixed interval, publish once new data were received (from inverter) + sendIvData(); else { // send mqtt data in a fixed interval if(mIntervalTimeout == 0) { mIntervalTimeout = mCfgMqtt->interval; mSendList.push(RealTimeRunData_Debug); - sendIvDataStart(); + sendIvData(); } } } @@ -554,65 +558,55 @@ class PubMqtt { } } - void sendIvDataStart() { - mSendIvData.RTRDataHasBeenSent = false; - memset(mSendIvData.total, 0, sizeof(float) * 4); - for (uint8_t id = 0; id < mSys->getNumInverters(); id++) { - Inverter<> *iv = mSys->getInverterByPos(id); - if (NULL != iv) { - if (iv->config->enabled) { - mSendIvData.lastIvId = id; - mSendIvData.running = true; - mSendIvData.lastIvReached = false; - mSendIvData.sendTotals = false; - break; - } - } - } - } - - void sendIvDataLoop(void) { + void sendIvData() { bool anyAvail = processIvStatus(); if (mLastAnyAvail != anyAvail) - mSendList.push(RealTimeRunData_Debug); // makes sure that total values are calculated + mSendList.push(RealTimeRunData_Debug); // makes shure that total values are calculated - if(mSendList.empty()) { - mSendIvData.running = false; + if(mSendList.empty()) return; - } + float total[4]; + bool RTRDataHasBeenSent = false; - //while(!mSendList.empty()) { + while(!mSendList.empty()) { + memset(total, 0, sizeof(float) * 4); uint8_t curInfoCmd = mSendList.front(); - if ((curInfoCmd != RealTimeRunData_Debug) || !mSendIvData.RTRDataHasBeenSent) { // send RTR Data only once - mSendIvData.sendTotals = (curInfoCmd == RealTimeRunData_Debug); - if(!mSendIvData.lastIvReached) { - Inverter<> *iv = mSys->getInverterByPos(mSendIvData.lastIvId); + if ((curInfoCmd != RealTimeRunData_Debug) || !RTRDataHasBeenSent) { // send RTR Data only once + bool sendTotals = (curInfoCmd == RealTimeRunData_Debug); + + for (uint8_t id = 0; id < mSys->getNumInverters(); id++) { + Inverter<> *iv = mSys->getInverterByPos(id); + if (NULL == iv) + continue; // skip to next inverter + if (!iv->config->enabled) + continue; // skip to next inverter + // send RTR Data only if status is available - if ((curInfoCmd != RealTimeRunData_Debug) || (MQTT_STATUS_NOT_AVAIL_NOT_PROD != mLastIvState[mSendIvData.lastIvId])) + if ((curInfoCmd != RealTimeRunData_Debug) || (MQTT_STATUS_NOT_AVAIL_NOT_PROD != mLastIvState[id])) sendData(iv, curInfoCmd); // calculate total values for RealTimeRunData_Debug - if (mSendIvData.sendTotals) { + if (sendTotals) { record_t<> *rec = iv->getRecordStruct(curInfoCmd); - mSendIvData.sendTotals &= (iv->getLastTs(rec) > 0); - if (mSendIvData.sendTotals) { + sendTotals &= (iv->getLastTs(rec) > 0); + if (sendTotals) { for (uint8_t i = 0; i < rec->length; i++) { if (CH0 == rec->assign[i].ch) { switch (rec->assign[i].fieldId) { case FLD_PAC: - mSendIvData.total[0] += iv->getValue(i, rec); + total[0] += iv->getValue(i, rec); break; case FLD_YT: - mSendIvData.total[1] += iv->getValue(i, rec); + total[1] += iv->getValue(i, rec); break; case FLD_YD: - mSendIvData.total[2] += iv->getValue(i, rec); + total[2] += iv->getValue(i, rec); break; case FLD_PDC: - mSendIvData.total[3] += iv->getValue(i, rec); + total[3] += iv->getValue(i, rec); break; } } @@ -620,21 +614,9 @@ class PubMqtt { } } yield(); - - // get next inverter - for (uint8_t id = mSendIvData.lastIvId; id < mSys->getNumInverters(); id++) { - Inverter<> *iv = mSys->getInverterByPos(id); - if (NULL != iv) { - if (iv->config->enabled) { - mSendIvData.lastIvId = id; - return; - } - } - } - mSendIvData.lastIvReached = true; } - if (mSendIvData.sendTotals) { + if (sendTotals) { uint8_t fieldId; for (uint8_t i = 0; i < 4; i++) { bool retained = true; @@ -656,36 +638,20 @@ class PubMqtt { break; } snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]); - snprintf(mVal, 40, "%g", ah::round3(mSendIvData.total[i])); + snprintf(mVal, 40, "%g", ah::round3(total[i])); publish(mSubTopic, mVal, retained); } - mSendIvData.RTRDataHasBeenSent = true; + RTRDataHasBeenSent = true; yield(); } } mSendList.pop(); // remove from list once all inverters were processed - //} // end while + } mLastAnyAvail = anyAvail; } - typedef struct { - bool running; - uint8_t lastIvId; - bool lastIvReached; - bool sendTotals; - float total[4]; - bool RTRDataHasBeenSent; - } publish_t; - - typedef struct { - bool running; - uint8_t lastIvId; - uint8_t sub; - uint8_t foundIvCnt; - } discovery_t; - espMqttClient mClient; cfgMqtt_t *mCfgMqtt; #if defined(ESP8266) @@ -693,6 +659,8 @@ class PubMqtt { #endif HMSYSTEM *mSys; + PubMqttIvData mSendIvData; + uint32_t *mUtcTimestamp; uint32_t mRxCnt, mTxCnt; std::queue mSendList; @@ -712,7 +680,6 @@ class PubMqtt { char mSubTopic[32 + MAX_NAME_LENGTH + 1]; char mVal[40]; discovery_t mDiscovery; - publish_t mSendIvData; }; #endif /*__PUB_MQTT_H__*/ diff --git a/src/publisher/pubMqttIvData.h b/src/publisher/pubMqttIvData.h new file mode 100644 index 00000000..361ce307 --- /dev/null +++ b/src/publisher/pubMqttIvData.h @@ -0,0 +1,187 @@ +#ifndef __PUB_MQTT_IV_DATA_H__ +#define __PUB_MQTT_IV_DATA_H__ + +#include "../utils/dbg.h" +#include "../hm/hmSystem.h" +#include "pubMqttDefs.h" + +typedef std::function pubMqttPublisherType; + +template +class PubMqttIvData { + public: + void setup(HMSYSTEM *sys, uint32_t *utcTs) { + mSys = sys; + mUtcTimestamp = utcTs; + mState = IDLE; + + memset(mIvLastRTRpub, 0, MAX_NUM_INVERTERS * 4); + + mTable[IDLE] = &PubMqttIvData::stateIdle; + mTable[START] = &PubMqttIvData::stateStart; + mTable[FIND_NXT_IV] = &PubMqttIvData::stateFindNxtIv; + mTable[SEND_DATA] = &PubMqttIvData::stateSend; + mTable[SEND_TOTALS] = &PubMqttIvData::stateSendTotals; + } + + void loop() { + (this->*mTable[mState])(); + yield(); + } + + bool start(uint8_t cmd) { + if(IDLE != mState) + return false; + + mCmd = cmd; + mState = START; + return true; + } + + void setPublishFunc(pubMqttPublisherType cb) { + mPublish = cb; + } + + private: + enum State {IDLE, START, FIND_NXT_IV, SEND_DATA, SEND_TOTALS, NUM_STATES}; + typedef void (PubMqttIvData::*StateFunction)(); + + void stateIdle() { + ; // nothing to do + } + + void stateStart() { + mLastIvId = 0; + mSendTotals = (RealTimeRunData_Debug == mCmd); + memset(mTotal, 0, sizeof(float) * 4); + + mState = FIND_NXT_IV; + } + + void stateFindNxtIv() { + bool found = false; + + for (; mLastIvId < mSys->getNumInverters(); mLastIvId++) { + mIv = mSys->getInverterByPos(mLastIvId); + if (NULL != mIv) { + if (mIv->config->enabled) { + found = true; + break; + } + } + } + + mPos = 0; + if(found) + mState = SEND_DATA; + else if(mSendTotals) + mState = SEND_TOTALS; + else + mState = IDLE; + } + + void stateSend() { + record_t<> *rec = mIv->getRecordStruct(mCmd); + uint32_t lastTs = mIv->getLastTs(rec); + bool pubData = (lastTs > 0); + if (mCmd == RealTimeRunData_Debug) + pubData &= (lastTs != mIvLastRTRpub[mIv->id]); + + if (pubData) { + mIvLastRTRpub[mIv->id] = lastTs; + //for (uint8_t i = 0; i < rec->length; i++) { + if(mPos < rec->length) { + bool retained = false; + if (mCmd == RealTimeRunData_Debug) { + switch (rec->assign[mPos].fieldId) { + case FLD_YT: + case FLD_YD: + if ((rec->assign[mPos].ch == CH0) && (!mIv->isProducing(*mUtcTimestamp))) { // avoids returns to 0 on restart + mPos++; + return; + } + retained = true; + break; + } + + // calculate total values for RealTimeRunData_Debug + if (CH0 == rec->assign[mPos].ch) { + switch (rec->assign[mPos].fieldId) { + case FLD_PAC: + mTotal[0] += mIv->getValue(mPos, rec); + break; + case FLD_YT: + mTotal[1] += mIv->getValue(mPos, rec); + break; + case FLD_YD: + mTotal[2] += mIv->getValue(mPos, rec); + break; + case FLD_PDC: + mTotal[3] += mIv->getValue(mPos, rec); + break; + } + } + } + + snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "%s/ch%d/%s", mIv->config->name, rec->assign[mPos].ch, fields[rec->assign[mPos].fieldId]); + snprintf(mVal, 40, "%g", ah::round3(mIv->getValue(mPos, rec))); + mPublish(mSubTopic, mVal, retained); + mPos++; + } else + mState = FIND_NXT_IV; + } else + mState = FIND_NXT_IV; + } + + void stateSendTotals() { + uint8_t fieldId; + //for (uint8_t i = 0; i < 4; i++) { + if(mPos < 4) { + bool retained = true; + switch (mPos) { + default: + case 0: + fieldId = FLD_PAC; + retained = false; + break; + case 1: + fieldId = FLD_YT; + break; + case 2: + fieldId = FLD_YD; + break; + case 3: + fieldId = FLD_PDC; + retained = false; + break; + } + snprintf(mSubTopic, 32 + MAX_NAME_LENGTH, "total/%s", fields[fieldId]); + snprintf(mVal, 40, "%g", ah::round3(mTotal[mPos])); + mPublish(mSubTopic, mVal, retained); + mPos++; + } else + mState = IDLE; + + RTRDataHasBeenSent = true; + } + + HMSYSTEM *mSys; + uint32_t *mUtcTimestamp; + pubMqttPublisherType mPublish; + State mState; + StateFunction mTable[NUM_STATES]; + + uint8_t mCmd; + uint8_t mLastIvId; + bool mSendTotals; + float mTotal[4]; + + Inverter<> *mIv; + uint8_t mPos; + uint32_t mIvLastRTRpub[MAX_NUM_INVERTERS]; + + char mSubTopic[32 + MAX_NAME_LENGTH + 1]; + char mVal[40]; +}; + +#endif /*__PUB_MQTT_IV_DATA_H__*/