@ -29,7 +29,9 @@ template<class HMSYSTEM>
class PubMqtt {
public :
PubMqtt ( ) {
mRxCnt = 0 ;
mTxCnt = 0 ;
mEnReconnect = false ;
}
~ PubMqtt ( ) { }
@ -43,6 +45,8 @@ class PubMqtt {
mSunrise = sunrise ;
mSunset = sunset ;
snprintf ( mLwtTopic , MQTT_TOPIC_LEN + 7 , " %s/status " , mCfgMqtt - > topic ) ;
mHWifiCon = WiFi . onStationModeGotIP ( std : : bind ( & PubMqtt : : onWifiConnect , this , std : : placeholders : : _1 ) ) ;
mHWifiDiscon = WiFi . onStationModeDisconnected ( std : : bind ( & PubMqtt : : onWifiDisconnect , this , std : : placeholders : : _1 ) ) ;
@ -51,11 +55,10 @@ class PubMqtt {
mClient . setCredentials ( mCfgMqtt - > user , mCfgMqtt - > pwd ) ;
mClient . setClientId ( mDevName ) ; // TODO: add mac?
mClient . setServer ( mCfgMqtt - > broker , mCfgMqtt - > port ) ;
mClient . setWill ( mLwtTopic , QOS_0 , true , mLwtOffline ) ;
mClient . onConnect ( std : : bind ( & PubMqtt : : onConnect , this , std : : placeholders : : _1 ) ) ;
mClient . onDisconnect ( std : : bind ( & PubMqtt : : onDisconnect , this , std : : placeholders : : _1 ) ) ;
mClient . onSubscribe ( std : : bind ( & PubMqtt : : onSubscribe , this , std : : placeholders : : _1 , std : : placeholders : : _2 , std : : placeholders : : _3 ) ) ;
mClient . onPublish ( std : : bind ( & PubMqtt : : onPublish , this , std : : placeholders : : _1 ) ) ;
//mClient.setWill
mClient . onMessage ( std : : bind ( & PubMqtt : : onMessage , this , std : : placeholders : : _1 , std : : placeholders : : _2 , std : : placeholders : : _3 , std : : placeholders : : _4 , std : : placeholders : : _5 , std : : placeholders : : _6 ) ) ;
}
void loop ( ) {
@ -71,6 +74,11 @@ class PubMqtt {
snprintf ( val , 12 , " %ld " , millis ( ) / 1000 ) ;
publish ( " uptime " , val ) ;
publish ( " wifi_rssi " , String ( WiFi . RSSI ( ) ) . c_str ( ) ) ;
if ( ! mClient . connected ( ) ) {
if ( mEnReconnect )
mClient . connect ( ) ;
}
}
void tickerHour ( ) {
@ -78,10 +86,13 @@ class PubMqtt {
publish ( " sunset " , String ( * mSunset ) . c_str ( ) , true ) ;
}
void publish ( const char * subTopic , const char * payload , bool retained = false ) {
void publish ( const char * subTopic , const char * payload , bool retained = false , bool addTopic = true ) {
char topic [ MQTT_TOPIC_LEN + 2 ] ;
snprintf ( topic , ( MQTT_TOPIC_LEN + 2 ) , " %s/%s " , mCfgMqtt - > topic , subTopic ) ;
mClient . publish ( topic , QOS_0 , retained , payload ) ;
if ( addTopic )
mClient . publish ( topic , QOS_0 , retained , payload ) ;
else
mClient . publish ( subTopic , QOS_0 , retained , payload ) ;
mTxCnt + + ;
}
@ -99,6 +110,10 @@ class PubMqtt {
return mTxCnt ;
}
inline uint32_t getRxCnt ( void ) {
return mRxCnt ;
}
void payloadEventListener ( uint8_t cmd ) {
mSendList . push ( cmd ) ;
}
@ -112,11 +127,11 @@ class PubMqtt {
if ( NULL ! = iv ) {
record_t < > * rec = iv - > getRecordStruct ( RealTimeRunData_Debug ) ;
DynamicJsonDocument deviceDoc ( 128 ) ;
deviceDoc [ " name " ] = iv - > config - > name ;
deviceDoc [ " ids " ] = String ( iv - > config - > serial . u64 , HEX ) ;
deviceDoc [ " cu " ] = F ( " http:// " ) + String ( WiFi . localIP ( ) . toString ( ) ) ;
deviceDoc [ " mf " ] = " Hoymiles " ;
deviceDoc [ " mdl " ] = iv - > config - > name ;
deviceDoc [ F ( " name " ) ] = iv - > config - > name ;
deviceDoc [ F ( " ids " ) ] = String ( iv - > config - > serial . u64 , HEX ) ;
deviceDoc [ F ( " cu " ) ] = F ( " http:// " ) + String ( WiFi . localIP ( ) . toString ( ) ) ;
deviceDoc [ F ( " mf " ) ] = F ( " Hoymiles " ) ;
deviceDoc [ F ( " mdl " ) ] = iv - > config - > name ;
JsonObject deviceObj = deviceDoc . as < JsonObject > ( ) ;
DynamicJsonDocument doc ( 384 ) ;
@ -132,19 +147,19 @@ class PubMqtt {
const char * devCls = getFieldDeviceClass ( rec - > assign [ i ] . fieldId ) ;
const char * stateCls = getFieldStateClass ( rec - > assign [ i ] . fieldId ) ;
doc [ " name " ] = name ;
doc [ " stat_t " ] = stateTopic ;
doc [ " unit_of_meas " ] = iv - > getUnit ( i , rec ) ;
doc [ " uniq_id " ] = String ( iv - > config - > serial . u64 , HEX ) + " _ " + uniq_id ;
doc [ " dev " ] = deviceObj ;
doc [ " exp_aft " ] = MQTT_INTERVAL + 5 ; // add 5 sec if connection is bad or ESP too slow @TODO: stimmt das wirklich als expire!?
doc [ F ( " name " ) ] = name ;
doc [ F ( " stat_t " ) ] = stateTopic ;
doc [ F ( " unit_of_meas " ) ] = iv - > getUnit ( i , rec ) ;
doc [ F ( " uniq_id " ) ] = String ( iv - > config - > serial . u64 , HEX ) + " _ " + uniq_id ;
doc [ F ( " dev " ) ] = deviceObj ;
doc [ F ( " exp_aft " ) ] = MQTT_INTERVAL + 5 ; // add 5 sec if connection is bad or ESP too slow @TODO: stimmt das wirklich als expire!?
if ( devCls ! = NULL )
doc [ " dev_cla " ] = devCls ;
doc [ F ( " dev_cla " ) ] = devCls ;
if ( stateCls ! = NULL )
doc [ " stat_cla " ] = stateCls ;
doc [ F ( " stat_cla " ) ] = stateCls ;
serializeJson ( doc , buffer ) ;
publish ( discoveryTopic , buffer , true ) ;
publish ( discoveryTopic , buffer , true , false ) ;
doc . clear ( ) ;
}
@ -160,17 +175,22 @@ class PubMqtt {
}
void onWifiDisconnect ( const WiFiEventStationModeDisconnected & event ) {
DPRINTLN ( DBG_WARN , F ( " TODO: MQTT reconnect! " ) ) ;
mEnReconnect = false ;
}
void onConnect ( bool sessionPreset ) {
DPRINTLN ( DBG_INFO , F ( " MQTT connected " ) ) ;
mEnReconnect = true ;
publish ( " version " , mVersion , true ) ;
publish ( " device " , mDevName , true ) ;
publish ( " uptime " , " 0 " ) ;
subscribe ( " devcontrol/# " ) ; // TODO: register onMessage callback!
publish ( mLwtTopic , mLwtOnline , true , false ) ;
subscribe ( " ctrl/# " ) ;
subscribe ( " setup/# " ) ;
subscribe ( " status/# " ) ;
}
void onDisconnect ( espMqttClientTypes : : DisconnectReason reason ) {
@ -199,58 +219,57 @@ class PubMqtt {
}
}
void onSubscribe ( uint16_t packetId , const espMqttClientTypes : : SubscribeReturncode * codes , size_t len ) {
DPRINTLN ( DBG_INFO , F ( " MQTT Subscribe " ) ) ;
Serial . print ( " packetId: " ) ;
Serial . println ( packetId ) ;
for ( size_t i = 0 ; i < len ; + + i ) {
Serial . print ( " qos: " ) ;
Serial . println ( static_cast < uint8_t > ( codes [ i ] ) ) ;
void onMessage ( const espMqttClientTypes : : MessageProperties & properties , const char * topic , const uint8_t * payload , size_t len , size_t index , size_t total ) {
DPRINTLN ( DBG_VERBOSE , F ( " MQTT got topic: " ) + String ( topic ) ) ;
char * tpc = new char [ strlen ( topic ) + 1 ] ;
uint8_t cnt = 0 ;
DynamicJsonDocument json ( 128 ) ;
JsonObject root = json . to < JsonObject > ( ) ;
strncpy ( tpc , topic , strlen ( topic ) + 1 ) ;
if ( len > 0 ) {
char * pyld = new char [ len + 1 ] ;
strncpy ( pyld , ( const char * ) payload , len ) ;
pyld [ len ] = ' \0 ' ;
root [ " val " ] = atoi ( pyld ) ;
delete [ ] pyld ;
}
}
void onPublish ( uint16_t packetId ) {
Serial . println ( " Publish acknowledged. " ) ;
Serial . print ( " packetId: " ) ;
Serial . println ( packetId ) ;
}
/*void reconnect(void) {
DPRINTLN ( DBG_DEBUG , F ( " mqtt.h:reconnect " ) ) ;
DPRINTLN ( DBG_DEBUG , F ( " MQTT mClient->_state " ) + String ( mClient - > state ( ) ) ) ;
# ifdef ESP8266
DPRINTLN ( DBG_DEBUG , F ( " WIFI mEspClient.status " ) + String ( mEspClient . status ( ) ) ) ;
# endif
boolean resub = false ;
if ( ! mClient - > connected ( ) & & ( millis ( ) - mLastReconnect ) > MQTT_RECONNECT_DELAY ) {
mLastReconnect = millis ( ) ;
if ( strlen ( mDevName ) > 0 ) {
// der Server und der Port müssen neu gesetzt werden,
// da ein MQTT_CONNECTION_LOST -3 die Werte zerstört hat.
mClient - > setServer ( mCfgMqtt - > broker , mCfgMqtt - > port ) ;
mClient - > setBufferSize ( MQTT_MAX_PACKET_SIZE ) ;
char lwt [ MQTT_TOPIC_LEN + 7 ] ; // "/uptime" --> + 7 byte
snprintf ( lwt , MQTT_TOPIC_LEN + 7 , " %s/uptime " , mCfgMqtt - > topic ) ;
if ( ( strlen ( mCfgMqtt - > user ) > 0 ) & & ( strlen ( mCfgMqtt - > pwd ) > 0 ) )
resub = mClient - > connect ( mDevName , mCfgMqtt - > user , mCfgMqtt - > pwd , lwt , 0 , false , " offline " ) ;
else
resub = mClient - > connect ( mDevName , lwt , 0 , false , " offline " ) ;
// ein Subscribe ist nur nach einem connect notwendig
if ( resub ) {
char topic [ MQTT_TOPIC_LEN + 13 ] ; // "/devcontrol/#" --> + 6 byte
// ToDo: "/devcontrol/#" is hardcoded
snprintf ( topic , MQTT_TOPIC_LEN + 13 , " %s/devcontrol/# " , mCfgMqtt - > topic ) ;
DPRINTLN ( DBG_INFO , F ( " subscribe to " ) + String ( topic ) ) ;
mClient - > subscribe ( topic ) ; // subscribe to mTopic + "/devcontrol/#"
char * p = strtok ( tpc , " / " ) ;
p = strtok ( NULL , " / " ) ; // remove mCfgMqtt->topic
while ( NULL ! = p ) {
if ( 0 = = cnt ) {
if ( 0 = = strncmp ( p , " ctrl " , 4 ) ) {
if ( NULL ! = ( p = strtok ( NULL , " / " ) ) ) {
root [ F ( " path " ) ] = F ( " ctrl " ) ;
root [ F ( " cmd " ) ] = p ;
}
} else if ( 0 = = strncmp ( p , " setup " , 5 ) ) {
if ( NULL ! = ( p = strtok ( NULL , " / " ) ) ) {
root [ F ( " path " ) ] = F ( " setup " ) ;
root [ F ( " setup " ) ] = p ;
}
} else if ( 0 = = strncmp ( p , " status " , 6 ) ) {
if ( NULL ! = ( p = strtok ( NULL , " / " ) ) ) {
root [ F ( " path " ) ] = F ( " status " ) ;
root [ F ( " cmd " ) ] = p ;
}
}
}
else if ( 1 = = cnt ) {
root [ " id " ] = atoi ( p ) ;
}
p = strtok ( NULL , " / " ) ;
cnt + + ;
}
} */
delete [ ] tpc ;
char out [ 128 ] ;
serializeJson ( root , out , 128 ) ;
DPRINTLN ( DBG_INFO , " json: " + String ( out ) ) ;
mRxCnt + + ;
}
const char * getFieldDeviceClass ( uint8_t fieldId ) {
uint8_t pos = 0 ;
@ -487,11 +506,16 @@ class PubMqtt {
uint32_t * mSunrise , * mSunset ;
HMSYSTEM * mSys ;
uint32_t * mUtcTimestamp ;
uint32_t mTxCnt ;
uint32_t mRxCnt , m TxCnt ;
std : : queue < uint8_t > mSendList ;
bool mEnReconnect ;
// last will topic and payload must be available trough lifetime of 'espMqttClient'
char mLwtTopic [ MQTT_TOPIC_LEN + 7 ] ;
const char * mLwtOnline = " online " ;
const char * mLwtOffline = " offline " ;
const char * mDevName , * mVersion ;
//uint32_t mLastReconnect;
} ;
# endif /*__PUB_MQTT_H__*/