|
|
@ -11,6 +11,8 @@ |
|
|
|
#if defined(ENABLE_MQTT) |
|
|
|
#ifdef ESP8266 |
|
|
|
#include <ESP8266WiFi.h> |
|
|
|
#define xSemaphoreTake(a, b) { while(a) { yield(); } a = true; } |
|
|
|
#define xSemaphoreGive(a) { a = false; } |
|
|
|
#elif defined(ESP32) |
|
|
|
#include <WiFi.h> |
|
|
|
#endif |
|
|
@ -40,6 +42,13 @@ template<class HMSYSTEM> |
|
|
|
class PubMqtt { |
|
|
|
public: |
|
|
|
PubMqtt() : SendIvData() { |
|
|
|
#if defined(ESP32) |
|
|
|
mutex = xSemaphoreCreateBinaryStatic(&mutexBuffer); |
|
|
|
xSemaphoreGive(mutex); |
|
|
|
#else |
|
|
|
mutex = false; |
|
|
|
#endif |
|
|
|
|
|
|
|
mLastIvState.fill(InverterStatus::OFF); |
|
|
|
mIvLastRTRpub.fill(0); |
|
|
|
|
|
|
@ -51,7 +60,11 @@ class PubMqtt { |
|
|
|
mSendAlarm.fill(false); |
|
|
|
} |
|
|
|
|
|
|
|
~PubMqtt() { } |
|
|
|
~PubMqtt() { |
|
|
|
#if defined(ESP32) |
|
|
|
vSemaphoreDelete(mutex); |
|
|
|
#endif |
|
|
|
} |
|
|
|
|
|
|
|
void setup(IApp *app, cfgMqtt_t *cfg_mqtt, const char *devName, const char *version, HMSYSTEM *sys, uint32_t *utcTs, uint32_t *uptime) { |
|
|
|
mApp = app; |
|
|
@ -97,6 +110,17 @@ class PubMqtt { |
|
|
|
} |
|
|
|
|
|
|
|
void loop() { |
|
|
|
std::queue<message_s> queue; |
|
|
|
xSemaphoreTake(mutex, portMAX_DELAY); |
|
|
|
std::swap(queue, mReceiveQueue); |
|
|
|
xSemaphoreGive(mutex); |
|
|
|
|
|
|
|
while (!queue.empty()) { |
|
|
|
message_s *entry = &queue.front(); |
|
|
|
handleMessage(entry->topic, entry->payload, entry->len, entry->index, entry->total); |
|
|
|
queue.pop(); |
|
|
|
} |
|
|
|
|
|
|
|
SendIvData.loop(); |
|
|
|
|
|
|
|
#if defined(ESP8266) |
|
|
@ -272,14 +296,14 @@ class PubMqtt { |
|
|
|
tickerMinute(); |
|
|
|
publish(mLwtTopic.data(), mqttStr[MQTT_STR_LWT_CONN], true, false); |
|
|
|
|
|
|
|
// for(uint8_t i = 0; i < MAX_NUM_INVERTERS; i++) {
|
|
|
|
// snprintf(mVal.data(), mVal.size(), "ctrl/limit/%d", i);
|
|
|
|
// subscribe(mVal.data(), QOS_2);
|
|
|
|
// snprintf(mVal.data(), mVal.size(), "ctrl/restart/%d", i);
|
|
|
|
// subscribe(mVal.data());
|
|
|
|
// snprintf(mVal.data(), mVal.size(), "ctrl/power/%d", i);
|
|
|
|
// subscribe(mVal.data());
|
|
|
|
// }
|
|
|
|
for(uint8_t i = 0; i < MAX_NUM_INVERTERS; i++) { |
|
|
|
snprintf(mVal.data(), mVal.size(), "ctrl/limit/%d", i); |
|
|
|
subscribe(mVal.data(), QOS_2); |
|
|
|
snprintf(mVal.data(), mVal.size(), "ctrl/restart/%d", i); |
|
|
|
subscribe(mVal.data()); |
|
|
|
snprintf(mVal.data(), mVal.size(), "ctrl/power/%d", i); |
|
|
|
subscribe(mVal.data()); |
|
|
|
} |
|
|
|
snprintf(mVal.data(), mVal.size(), "ctrl/#"); |
|
|
|
subscribe(mVal.data(), QOS_2); |
|
|
|
subscribe(subscr[MQTT_SUBS_SET_TIME]); |
|
|
@ -318,6 +342,14 @@ class PubMqtt { |
|
|
|
void onMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { |
|
|
|
if(len == 0) |
|
|
|
return; |
|
|
|
|
|
|
|
xSemaphoreTake(mutex, portMAX_DELAY); |
|
|
|
mReceiveQueue.push(message_s(topic, payload, len, index, total)); |
|
|
|
xSemaphoreGive(mutex); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
inline void handleMessage(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { |
|
|
|
DPRINT(DBG_INFO, mqttStr[MQTT_STR_GOT_TOPIC]); |
|
|
|
DBGPRINTLN(String(topic)); |
|
|
|
if(NULL == mSubscriptionCb) |
|
|
@ -631,12 +663,40 @@ class PubMqtt { |
|
|
|
private: |
|
|
|
enum {MQTT_STATUS_OFFLINE = 0, MQTT_STATUS_PARTIAL, MQTT_STATUS_ONLINE}; |
|
|
|
|
|
|
|
struct message_s { |
|
|
|
char* topic; |
|
|
|
uint8_t* payload; |
|
|
|
size_t len; |
|
|
|
size_t index; |
|
|
|
size_t total; |
|
|
|
|
|
|
|
message_s(const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { |
|
|
|
this->topic = new char[strlen(topic) + 1]; |
|
|
|
this->payload = new uint8_t[len]; |
|
|
|
|
|
|
|
memcpy(this->topic, topic, strlen(topic)); |
|
|
|
memcpy(this->payload, payload, len); |
|
|
|
this->len = len; |
|
|
|
this->index = index; |
|
|
|
this->total = total; |
|
|
|
} |
|
|
|
|
|
|
|
~message_s() { |
|
|
|
delete[] this->topic; |
|
|
|
delete[] this->payload; |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
private: |
|
|
|
espMqttClient mClient; |
|
|
|
cfgMqtt_t *mCfgMqtt = nullptr; |
|
|
|
IApp *mApp; |
|
|
|
#if defined(ESP8266) |
|
|
|
WiFiEventHandler mHWifiCon, mHWifiDiscon; |
|
|
|
volatile bool mutex; |
|
|
|
#else |
|
|
|
SemaphoreHandle_t mutex; |
|
|
|
StaticSemaphore_t mutexBuffer; |
|
|
|
#endif |
|
|
|
|
|
|
|
HMSYSTEM *mSys = nullptr; |
|
|
@ -653,6 +713,8 @@ class PubMqtt { |
|
|
|
std::array<uint32_t, MAX_NUM_INVERTERS> mIvLastRTRpub; |
|
|
|
uint16_t mIntervalTimeout = 0; |
|
|
|
|
|
|
|
std::queue<message_s> mReceiveQueue; |
|
|
|
|
|
|
|
// last will topic and payload must be available through lifetime of 'espMqttClient'
|
|
|
|
std::array<char, (MQTT_TOPIC_LEN + 5)> mLwtTopic; |
|
|
|
const char *mDevName = nullptr, *mVersion = nullptr; |
|
|
|