diff --git a/src/Helpers.h b/src/Helpers.h index 05ab136..50b4c2f 100644 --- a/src/Helpers.h +++ b/src/Helpers.h @@ -1,7 +1,7 @@ /* Copyright (c) 2022 Bert Melis. All rights reserved. -This work is licensed under the terms of the MIT license. +This work is licensed under the terms of the MIT license. For a copy, see <https://opensource.org/licenses/MIT> or the LICENSE file. */ @@ -13,6 +13,7 @@ the LICENSE file. #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "esp_task_wdt.h" + #define EMC_SEMAPHORE_TAKE_CHECK() if(pdTRUE == xSemaphoreTake(_xSemaphore, portMAX_DELAY)) #define EMC_SEMAPHORE_TAKE() xSemaphoreTake(_xSemaphore, portMAX_DELAY) #define EMC_SEMAPHORE_GIVE() xSemaphoreGive(_xSemaphore) #define EMC_GET_FREE_MEMORY() std::max(ESP.getMaxAllocHeap(), ESP.getMaxAllocPsram()) @@ -25,9 +26,11 @@ the LICENSE file. // _xSemaphore defined as std::atomic<bool> #define EMC_SEMAPHORE_TAKE() while (_xSemaphore) { /*ESP.wdtFeed();*/ } _xSemaphore = true #define EMC_SEMAPHORE_GIVE() _xSemaphore = false + #define EMC_SEMAPHORE_TAKE_CHECK() EMC_SEMAPHORE_TAKE #else #define EMC_SEMAPHORE_TAKE() #define EMC_SEMAPHORE_GIVE() + #define EMC_SEMAPHORE_TAKE_CHECK() #endif #define EMC_GET_FREE_MEMORY() ESP.getMaxFreeBlockSize() // no need to yield for ESP8266, the Arduino framework does this internally diff --git a/src/MqttClient.cpp b/src/MqttClient.cpp index dc21f74..d524e50 100644 --- a/src/MqttClient.cpp +++ b/src/MqttClient.cpp @@ -1,7 +1,7 @@ /* Copyright (c) 2022 Bert Melis. All rights reserved. -This work is licensed under the terms of the MIT license. +This work is licensed under the terms of the MIT license. For a copy, see <https://opensource.org/licenses/MIT> or the LICENSE file. */ @@ -148,16 +148,20 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const #endif return 0; } - EMC_SEMAPHORE_TAKE(); - uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1; - if (!_addPacket(packetId, topic, payload, length, qos, retain)) { - emc_log_e("Could not create PUBLISH packet"); + uint16_t packetId = 0; + EMC_SEMAPHORE_TAKE_CHECK() { + packetId = (qos > 0) ? _getNextPacketId() : 1; + if (!_addPacket(packetId, topic, payload, length, qos, retain)) { + emc_log_e("Could not create PUBLISH packet"); + EMC_SEMAPHORE_GIVE(); + _onError(packetId, Error::OUT_OF_MEMORY); + EMC_SEMAPHORE_TAKE_CHECK() { + packetId = 0; + } + } EMC_SEMAPHORE_GIVE(); - _onError(packetId, Error::OUT_OF_MEMORY); - EMC_SEMAPHORE_TAKE(); - packetId = 0; + yield(); } - EMC_SEMAPHORE_GIVE(); return packetId; } @@ -174,16 +178,20 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt #endif return 0; } - EMC_SEMAPHORE_TAKE(); - uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1; - if (!_addPacket(packetId, topic, callback, length, qos, retain)) { - emc_log_e("Could not create PUBLISH packet"); + uint16_t packetId = 0; + EMC_SEMAPHORE_TAKE_CHECK() { + packetId = (qos > 0) ? _getNextPacketId() : 1; + if (!_addPacket(packetId, topic, callback, length, qos, retain)) { + emc_log_e("Could not create PUBLISH packet"); + EMC_SEMAPHORE_GIVE(); + _onError(packetId, Error::OUT_OF_MEMORY); + EMC_SEMAPHORE_TAKE_CHECK() { + packetId = 0; + } + } EMC_SEMAPHORE_GIVE(); - _onError(packetId, Error::OUT_OF_MEMORY); - EMC_SEMAPHORE_TAKE(); - packetId = 0; + yield(); } - EMC_SEMAPHORE_GIVE(); return packetId; } @@ -237,11 +245,13 @@ void MqttClient::loop() { case State::connectingMqtt: #if EMC_WAIT_FOR_CONNACK if (_transport->connected()) { - EMC_SEMAPHORE_TAKE(); - _sendPacket(); - _checkIncoming(); - _checkPing(); - EMC_SEMAPHORE_GIVE(); + EMC_SEMAPHORE_TAKE_CHECK() { + _sendPacket(); + _checkIncoming(); + _checkPing(); + EMC_SEMAPHORE_GIVE(); + yield(); + } } else { _setState(State::disconnectingTcp1); _disconnectReason = DisconnectReason::TCP_DISCONNECTED;