From 6f04379cb142cae995e8b6793d4b642b20f17a7c Mon Sep 17 00:00:00 2001 From: lumapu Date: Sun, 29 Sep 2024 17:32:42 +0200 Subject: [PATCH] further improved queue --- src/hm/CommQueue.h | 160 ++++++++++++++++++++++++---------------- src/hm/Communication.h | 54 +++++++------- src/web/html/about.html | 2 +- 3 files changed, 126 insertions(+), 90 deletions(-) diff --git a/src/hm/CommQueue.h b/src/hm/CommQueue.h index bfb1e130..b07010ea 100644 --- a/src/hm/CommQueue.h +++ b/src/hm/CommQueue.h @@ -18,44 +18,47 @@ template class CommQueue { public: - CommQueue() { - mutex = xSemaphoreCreateBinaryStatic(&mutex_buffer); - xSemaphoreGive(mutex); + CommQueue() + : wrPtr {0} + , rdPtr {0} + { + this->mutex = xSemaphoreCreateBinaryStatic(&this->mutex_buffer); + xSemaphoreGive(this->mutex); } ~CommQueue() { - vSemaphoreDelete(mutex); + vSemaphoreDelete(this->mutex); } void addImportant(Inverter<> *iv, uint8_t cmd) { queue_s q(iv, cmd, true); - xSemaphoreTake(mutex, portMAX_DELAY); + xSemaphoreTake(this->mutex, portMAX_DELAY); if(!isIncluded(&q)) { - dec(&mRdPtr); - mQueue[mRdPtr] = q; - DPRINTLN(DBG_INFO, "addI, not incl.: " + String(iv->id)); - } else - DPRINTLN(DBG_INFO, "addI, incl.: " + String(iv->id)); - xSemaphoreGive(mutex); + dec(&this->rdPtr); + mQueue[this->rdPtr] = q; + } + xSemaphoreGive(this->mutex); } void add(Inverter<> *iv, uint8_t cmd) { - xSemaphoreTake(mutex, portMAX_DELAY); + xSemaphoreTake(this->mutex, portMAX_DELAY); queue_s q(iv, cmd, false); if(!isIncluded(&q)) { - mQueue[mWrPtr] = q; - inc(&mWrPtr); + mQueue[this->wrPtr] = q; + inc(&this->wrPtr); } - xSemaphoreGive(mutex); + xSemaphoreGive(this->mutex); } void chgCmd(Inverter<> *iv, uint8_t cmd) { - mQueue[mWrPtr] = queue_s(iv, cmd, false); + xSemaphoreTake(this->mutex, portMAX_DELAY); + mQueue[this->wrPtr] = queue_s(iv, cmd, false); + xSemaphoreGive(this->mutex); } uint8_t getFillState(void) const { - //DPRINTLN(DBG_INFO, "wr: " + String(mWrPtr) + ", rd: " + String(mRdPtr)); - return abs(mRdPtr - mWrPtr); + //DPRINTLN(DBG_INFO, "wr: " + String(this->wrPtr) + ", rd: " + String(this->rdPtr)); + return abs(this->rdPtr - this->wrPtr); } uint8_t getMaxFill(void) const { @@ -70,68 +73,100 @@ class CommQueue { uint8_t attemptsMax; uint32_t ts; bool isDevControl; + queue_s() {} - queue_s(Inverter<> *i, uint8_t c, bool dev) : - iv(i), cmd(c), attempts(DEFAULT_ATTEMPS), attemptsMax(DEFAULT_ATTEMPS), ts(0), isDevControl(dev) {} + + queue_s(Inverter<> *i, uint8_t c, bool dev) + : iv {i} + , cmd {c} + , attempts {DEFAULT_ATTEMPS} + , attemptsMax {DEFAULT_ATTEMPS} + , ts {0} + , isDevControl {dev} + {} + + queue_s(const queue_s &other) // copy constructor + : iv {other.iv} + , cmd {other.cmd} + , attempts {other.attempts} + , attemptsMax {other.attemptsMax} + , ts {other.ts} + , isDevControl {other.isDevControl} + {} }; protected: void add(queue_s q) { - xSemaphoreTake(mutex, portMAX_DELAY); - mQueue[mWrPtr] = q; - inc(&mWrPtr); - xSemaphoreGive(mutex); + xSemaphoreTake(this->mutex, portMAX_DELAY); + mQueue[this->wrPtr] = q; + inc(&this->wrPtr); + xSemaphoreGive(this->mutex); } - void add(const queue_s *q, bool rstAttempts = false) { - mQueue[mWrPtr] = *q; - xSemaphoreTake(mutex, portMAX_DELAY); + void add(queue_s *q, bool rstAttempts = false) { + xSemaphoreTake(this->mutex, portMAX_DELAY); + mQueue[this->wrPtr] = *q; if(rstAttempts) { - mQueue[mWrPtr].attempts = DEFAULT_ATTEMPS; - mQueue[mWrPtr].attemptsMax = DEFAULT_ATTEMPS; + mQueue[this->wrPtr].attempts = DEFAULT_ATTEMPS; + mQueue[this->wrPtr].attemptsMax = DEFAULT_ATTEMPS; } - inc(&mWrPtr); - xSemaphoreGive(mutex); + inc(&this->wrPtr); + xSemaphoreGive(this->mutex); } - void chgCmd(uint8_t cmd) { - mQueue[mRdPtr].cmd = cmd; - mQueue[mRdPtr].isDevControl = false; + void chgCmd(queue_s *q, uint8_t cmd) { + xSemaphoreTake(this->mutex, portMAX_DELAY); + q->cmd = cmd; + q->isDevControl = false; + xSemaphoreGive(this->mutex); } - void get(std::function cb) { - if(mRdPtr == mWrPtr) - cb(false, &mQueue[mRdPtr]); // empty - else - cb(true, &mQueue[mRdPtr]); + void get(std::function cb) { + if(this->rdPtr == this->wrPtr) + cb(false, nullptr); // empty + else { + //xSemaphoreTake(this->mutex, portMAX_DELAY); + //uint8_t tmp = this->rdPtr; + //xSemaphoreGive(this->mutex); + cb(true, &mQueue[this->rdPtr]); + } } - void cmdDone(bool keep = false) { - xSemaphoreTake(mutex, portMAX_DELAY); + void cmdDone(queue_s *q, bool keep = false) { + xSemaphoreTake(this->mutex, portMAX_DELAY); if(keep) { - mQueue[mRdPtr].attempts = DEFAULT_ATTEMPS; - mQueue[mRdPtr].attemptsMax = DEFAULT_ATTEMPS; - add(mQueue[mRdPtr]); // add to the end again + q->attempts = DEFAULT_ATTEMPS; + q->attemptsMax = DEFAULT_ATTEMPS; + xSemaphoreGive(this->mutex); + add(q); // add to the end again + xSemaphoreTake(this->mutex, portMAX_DELAY); } - inc(&mRdPtr); - xSemaphoreGive(mutex); + inc(&this->rdPtr); + xSemaphoreGive(this->mutex); } - void setTs(const uint32_t *ts) { - mQueue[mRdPtr].ts = *ts; + void setTs(queue_s *q, const uint32_t *ts) { + xSemaphoreTake(this->mutex, portMAX_DELAY); + q->ts = *ts; + xSemaphoreGive(this->mutex); } - void setAttempt(void) { - if(mQueue[mRdPtr].attempts) - mQueue[mRdPtr].attempts--; + void setAttempt(queue_s *q) { + xSemaphoreTake(this->mutex, portMAX_DELAY); + if(q->attempts) + q->attempts--; + xSemaphoreGive(this->mutex); } - void incrAttempt(uint8_t attempts = 1) { - mQueue[mRdPtr].attempts += attempts; - if (mQueue[mRdPtr].attempts > mQueue[mRdPtr].attemptsMax) - mQueue[mRdPtr].attemptsMax = mQueue[mRdPtr].attempts; + void incrAttempt(queue_s *q, uint8_t attempts = 1) { + xSemaphoreTake(this->mutex, portMAX_DELAY); + q->attempts += attempts; + if (q->attempts > q->attemptsMax) + q->attemptsMax = q->attempts; + xSemaphoreGive(this->mutex); } + private: void inc(uint8_t *ptr) { if(++(*ptr) >= N) *ptr = 0; @@ -143,13 +178,14 @@ class CommQueue { --(*ptr); } - private: bool isIncluded(const queue_s *q) { - uint8_t ptr = mRdPtr; - while (ptr != mWrPtr) { + uint8_t ptr = this->rdPtr; + while (ptr != this->wrPtr) { if(mQueue[ptr].cmd == q->cmd) { - if(mQueue[ptr].iv->id == q->iv->id) - return true; + if(mQueue[ptr].iv->id == q->iv->id) { + if(mQueue[ptr].isDevControl == q->isDevControl) + return true; + } } inc(&ptr); } @@ -158,10 +194,10 @@ class CommQueue { protected: std::array mQueue; - uint8_t mWrPtr = 0; - uint8_t mRdPtr = 0; private: + uint8_t wrPtr; + uint8_t rdPtr; SemaphoreHandle_t mutex; StaticSemaphore_t mutex_buffer; }; diff --git a/src/hm/Communication.h b/src/hm/Communication.h index 90462b2c..d60d0d38 100644 --- a/src/hm/Communication.h +++ b/src/hm/Communication.h @@ -52,7 +52,7 @@ class Communication : public CommQueue<> { } void loop() { - get([this](bool valid, const queue_s *q) { + get([this](bool valid, queue_s *q) { if(!valid) { if(mPrintSequenceDuration) { mPrintSequenceDuration = false; @@ -72,7 +72,7 @@ class Communication : public CommQueue<> { } private: - inline void innerLoop(const queue_s *q) { + inline void innerLoop(queue_s *q) { switch(mState) { case States::RESET: if (!mWaitTime.isTimeout()) @@ -95,20 +95,20 @@ class Communication : public CommQueue<> { q->iv->radioStatistics.txCnt++; mIsRetransmit = false; if(NULL == q->iv->radio) - cmdDone(false); // can't communicate while radio is not defined! + cmdDone(q, false); // can't communicate while radio is not defined! mFirstTry = (INV_RADIO_TYPE_NRF == q->iv->ivRadioType) && (q->iv->isAvailable()); q->iv->mCmd = q->cmd; q->iv->mIsSingleframeReq = false; mFramesExpected = getFramesExpected(q); // function to get expected frame count. mTimeout = DURATION_TXFRAME + mFramesExpected*DURATION_ONEFRAME + duration_reserve[q->iv->ivRadioType]; if((q->iv->ivGen == IV_MI) && ((q->cmd == MI_REQ_CH1) || (q->cmd == MI_REQ_4CH))) - incrAttempt(q->iv->channels); // 2 more attempts for 2ch, 4 more for 4ch + incrAttempt(q, q->iv->channels); // 2 more attempts for 2ch, 4 more for 4ch mState = States::START; break; case States::START: - setTs(mTimestamp); + setTs(q, mTimestamp); if(INV_RADIO_TYPE_CMT == q->iv->ivRadioType) { // frequency was changed during runtime if(q->iv->curCmtFreq != q->iv->config->frequency) { @@ -127,10 +127,10 @@ class Communication : public CommQueue<> { //q->iv->radioStatistics.txCnt++; q->iv->radio->mRadioWaitTime.startTimeMonitor(mTimeout); if((!mIsRetransmit && (q->cmd == AlarmData)) || (q->cmd == GridOnProFilePara)) - incrAttempt((q->cmd == AlarmData)? MORE_ATTEMPS_ALARMDATA : MORE_ATTEMPS_GRIDONPROFILEPARA); + incrAttempt(q, (q->cmd == AlarmData)? MORE_ATTEMPS_ALARMDATA : MORE_ATTEMPS_GRIDONPROFILEPARA); mIsRetransmit = false; - setAttempt(); + setAttempt(q); mState = States::WAIT; break; @@ -191,7 +191,7 @@ class Communication : public CommQueue<> { q->iv->mDtuRxCnt++; if (p->packet[0] == (TX_REQ_INFO + ALL_FRAMES)) { // response from get information command - if(parseFrame(p)) { + if(parseFrame(q, p)) { q->iv->curFrmCnt++; if(!mIsRetransmit && ((p->packet[9] == 0x02) || (p->packet[9] == 0x82)) && (p->millis < LIMIT_FAST_IV)) mHeu.setIvRetriesGood(q->iv,p->millis < LIMIT_VERYFAST_IV); @@ -292,7 +292,7 @@ class Communication : public CommQueue<> { } } - setAttempt(); + setAttempt(q); if(*mSerialDebug) { DPRINT_IVID(DBG_WARN, q->iv->id); @@ -321,7 +321,7 @@ class Communication : public CommQueue<> { } } - inline void printRxInfo(const queue_s *q, packet_t *p) { + inline void printRxInfo(queue_s *q, packet_t *p) { DPRINT_IVID(DBG_INFO, q->iv->id); DBGPRINT(F("RX ")); if(p->millis < 100) @@ -355,7 +355,7 @@ class Communication : public CommQueue<> { } - inline uint8_t getFramesExpected(const queue_s *q) { + inline uint8_t getFramesExpected(queue_s *q) { if(q->isDevControl) return 1; @@ -419,7 +419,7 @@ class Communication : public CommQueue<> { return (ah::crc8(buf, len - 1) == buf[len-1]); } - inline bool parseFrame(packet_t *p) { + inline bool parseFrame(queue_s *q, packet_t *p) { uint8_t *frameId = &p->packet[9]; if(0x00 == *frameId) { DPRINTLN(DBG_WARN, F("invalid frameId 0x00")); @@ -438,7 +438,7 @@ class Communication : public CommQueue<> { if((*frameId & ALL_FRAMES) == ALL_FRAMES) { mMaxFrameId = (*frameId & 0x7f); if(mMaxFrameId > 8) // large payloads, e.g. AlarmData - incrAttempt(mMaxFrameId - 6); + incrAttempt(q, mMaxFrameId - 6); } frame_t *f = &mLocalBuf[(*frameId & 0x7f) - 1]; @@ -449,7 +449,7 @@ class Communication : public CommQueue<> { return true; } - inline void parseMiFrame(packet_t *p, const queue_s *q) { + inline void parseMiFrame(packet_t *p, queue_s *q) { if((!mIsRetransmit && p->packet[9] == 0x00) && (p->millis < LIMIT_FAST_IV_MI)) //first frame is fast? mHeu.setIvRetriesGood(q->iv,p->millis < LIMIT_VERYFAST_IV_MI); if ((p->packet[0] == MI_REQ_CH1 + ALL_FRAMES) @@ -473,7 +473,7 @@ class Communication : public CommQueue<> { } } - inline bool parseDevCtrl(const packet_t *p, const queue_s *q) { + inline bool parseDevCtrl(const packet_t *p, queue_s *q) { switch(p->packet[12]) { case ActivePowerContr: if(p->packet[13] != 0x00) @@ -511,7 +511,7 @@ class Communication : public CommQueue<> { return accepted; } - inline bool compilePayload(const queue_s *q) { + inline bool compilePayload(queue_s *q) { uint16_t crc = 0xffff, crcRcv = 0x0000; for(uint8_t i = 0; i < mMaxFrameId; i++) { if(i == (mMaxFrameId - 1)) { @@ -611,7 +611,7 @@ class Communication : public CommQueue<> { return true; } - void sendRetransmit(const queue_s *q, uint8_t i) { + void sendRetransmit(queue_s *q, uint8_t i) { mFramesExpected = 1; q->iv->radio->setExpectedFrames(mFramesExpected); q->iv->radio->sendCmdPacket(q->iv, TX_REQ_INFO, (SINGLE_FRAME + i), true); @@ -622,7 +622,7 @@ class Communication : public CommQueue<> { } private: - void closeRequest(const queue_s *q, bool crcPass) { + void closeRequest(queue_s *q, bool crcPass) { mHeu.evalTxChQuality(q->iv, crcPass, (q->attemptsMax - 1 - q->attempts), q->iv->curFrmCnt); if(crcPass) q->iv->radioStatistics.rxSuccess++; @@ -636,7 +636,7 @@ class Communication : public CommQueue<> { if(q->isDevControl) keep = !crcPass; - cmdDone(keep); + cmdDone(q, keep); q->iv->mGotFragment = false; q->iv->mGotLastMsg = false; q->iv->miMultiParts = 0; @@ -646,7 +646,7 @@ class Communication : public CommQueue<> { DBGPRINTLN(F("-----")); } - inline void miHwDecode(packet_t *p, const queue_s *q) { + inline void miHwDecode(packet_t *p, queue_s *q) { record_t<> *rec = q->iv->getRecordStruct(InverterDevInform_All); // choose the record structure rec->ts = q->ts; /* @@ -760,7 +760,7 @@ class Communication : public CommQueue<> { } } - inline void miGPFDecode(packet_t *p, const queue_s *q) { + inline void miGPFDecode(packet_t *p, queue_s *q) { record_t<> *rec = q->iv->getRecordStruct(InverterDevInform_Simple); // choose the record structure rec->ts = q->ts; rec->mqttSentStatus = MqttSentStatus::NEW_DATA; @@ -786,7 +786,7 @@ class Communication : public CommQueue<> { q->iv->miMultiParts = 7; // indicate we are ready } - inline void miDataDecode(packet_t *p, const queue_s *q) { + inline void miDataDecode(packet_t *p, queue_s *q) { record_t<> *rec = q->iv->getRecordStruct(RealTimeRunData_Debug); // choose the parser rec->ts = q->ts; //mState = States::RESET; @@ -839,7 +839,7 @@ class Communication : public CommQueue<> { q->iv->miMultiParts += 6; // indicate we are ready } - void miNextRequest(uint8_t cmd, const queue_s *q) { + void miNextRequest(uint8_t cmd, queue_s *q) { mHeu.evalTxChQuality(q->iv, true, (q->attemptsMax - 1 - q->attempts), q->iv->curFrmCnt); mHeu.getTxCh(q->iv); q->iv->radioStatistics.ivSent++; @@ -859,12 +859,12 @@ class Communication : public CommQueue<> { DBGHEXLN(cmd); } mIsRetransmit = true; - chgCmd(cmd); + chgCmd(q, cmd); //mState = States::WAIT; } - void miRepeatRequest(const queue_s *q) { - setAttempt(); // if function is called, we got something, and we necessarily need more transmissions for MI types... + void miRepeatRequest(queue_s *q) { + setAttempt(q); // if function is called, we got something, and we necessarily need more transmissions for MI types... q->iv->radio->sendCmdPacket(q->iv, q->cmd, 0x00, true); q->iv->radioStatistics.retransmits++; q->iv->radio->mRadioWaitTime.startTimeMonitor(DURATION_TXFRAME + DURATION_ONEFRAME + duration_reserve[q->iv->ivRadioType]); @@ -878,7 +878,7 @@ class Communication : public CommQueue<> { //mIsRetransmit = false; } - void miStsConsolidate(const queue_s *q, uint8_t stschan, record_t<> *rec, uint8_t uState, uint8_t uEnum, uint8_t lState = 0, uint8_t lEnum = 0) { + void miStsConsolidate(queue_s *q, uint8_t stschan, record_t<> *rec, uint8_t uState, uint8_t uEnum, uint8_t lState = 0, uint8_t lEnum = 0) { //uint8_t status = (p->packet[11] << 8) + p->packet[12]; uint16_t statusMi = 3; // regular status for MI, change to 1 later? if ( uState == 2 ) { diff --git a/src/web/html/about.html b/src/web/html/about.html index 1b27ac9d..b03aff78 100644 --- a/src/web/html/about.html +++ b/src/web/html/about.html @@ -14,7 +14,7 @@
Used Libraries
- +