From 85ea4e995a75abe061f6fc375ea0481084dddd43 Mon Sep 17 00:00:00 2001 From: schererleander Date: Tue, 20 Jan 2026 08:34:54 +0100 Subject: initial commit --- .../ESP_Async_WebServer/src/AsyncEventSource.cpp | 507 +++++++++++++++++++++ 1 file changed, 507 insertions(+) create mode 100644 libraries/ESP_Async_WebServer/src/AsyncEventSource.cpp (limited to 'libraries/ESP_Async_WebServer/src/AsyncEventSource.cpp') diff --git a/libraries/ESP_Async_WebServer/src/AsyncEventSource.cpp b/libraries/ESP_Async_WebServer/src/AsyncEventSource.cpp new file mode 100644 index 0000000..2ebfa2d --- /dev/null +++ b/libraries/ESP_Async_WebServer/src/AsyncEventSource.cpp @@ -0,0 +1,507 @@ +// SPDX-License-Identifier: LGPL-3.0-or-later +// Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov + +#include "Arduino.h" +#if defined(ESP32) +#include +#endif +#include "AsyncEventSource.h" + +#define ASYNC_SSE_NEW_LINE_CHAR (char)0xa + +using namespace asyncsrv; + +static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect) { + String str; + size_t len{0}; + if (message) { + len += strlen(message); + } + + if (event) { + len += strlen(event); + } + + len += 42; // give it some overhead + + if (!str.reserve(len)) { +#ifdef ESP32 + log_e("Failed to allocate"); +#endif + return emptyString; + } + + if (reconnect) { + str += T_retry_; + str += reconnect; + str += ASYNC_SSE_NEW_LINE_CHAR; // '\n' + } + + if (id) { + str += T_id__; + str += id; + str += ASYNC_SSE_NEW_LINE_CHAR; // '\n' + } + + if (event != NULL) { + str += T_event_; + str += event; + str += ASYNC_SSE_NEW_LINE_CHAR; // '\n' + } + + if (!message) { + return str; + } + + size_t messageLen = strlen(message); + char *lineStart = (char *)message; + char *lineEnd; + do { + char *nextN = strchr(lineStart, '\n'); + char *nextR = strchr(lineStart, '\r'); + if (nextN == NULL && nextR == NULL) { + // a message is a single-line string + str += T_data_; + str += message; + str += T_nn; + return str; + } + + // a message is a multi-line string + char *nextLine = NULL; + if (nextN != NULL && nextR != NULL) { // windows line-ending \r\n + if (nextR + 1 == nextN) { + // normal \r\n sequence + lineEnd = nextR; + nextLine = nextN + 1; + } else { + // some abnormal \n \r mixed sequence + lineEnd = std::min(nextR, nextN); + nextLine = lineEnd + 1; + } + } else if (nextN != NULL) { // Unix/Mac OS X LF + lineEnd = nextN; + nextLine = nextN + 1; + } else { // some ancient garbage + lineEnd = nextR; + nextLine = nextR + 1; + } + + str += T_data_; + str.concat(lineStart, lineEnd - lineStart); + str += ASYNC_SSE_NEW_LINE_CHAR; // \n + + lineStart = nextLine; + } while (lineStart < ((char *)message + messageLen)); + + // append another \n to terminate message + str += ASYNC_SSE_NEW_LINE_CHAR; // '\n' + + return str; +} + +// Message + +size_t AsyncEventSourceMessage::ack(size_t len, __attribute__((unused)) uint32_t time) { + // If the whole message is now acked... + if (_acked + len > _data->length()) { + // Return the number of extra bytes acked (they will be carried on to the next message) + const size_t extra = _acked + len - _data->length(); + _acked = _data->length(); + return extra; + } + // Return that no extra bytes left. + _acked += len; + return 0; +} + +size_t AsyncEventSourceMessage::write(AsyncClient *client) { + if (!client) { + return 0; + } + + if (_sent >= _data->length() || !client->canSend()) { + return 0; + } + + size_t len = std::min(_data->length() - _sent, client->space()); + /* + add() would call lwip's tcp_write() under the AsyncTCP hood with apiflags argument. + By default apiflags=ASYNC_WRITE_FLAG_COPY + we could have used apiflags with this flag unset to pass data by reference and avoid copy to socket buffer, + but looks like it does not work for Arduino's lwip in ESP32/IDF + it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30 + if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF + https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744 + + So let's just keep it enforced ASYNC_WRITE_FLAG_COPY and keep in mind that there is no zero-copy + */ + size_t written = client->add(_data->c_str() + _sent, len, ASYNC_WRITE_FLAG_COPY); // ASYNC_WRITE_FLAG_MORE + _sent += written; + return written; +} + +size_t AsyncEventSourceMessage::send(AsyncClient *client) { + size_t sent = write(client); + return sent && client->send() ? sent : 0; +} + +// Client + +AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->client()), _server(server) { + + if (request->hasHeader(T_Last_Event_ID)) { + _lastId = atoi(request->getHeader(T_Last_Event_ID)->value().c_str()); + } + + _client->setRxTimeout(0); + _client->onError(NULL, NULL); + _client->onAck( + [](void *r, AsyncClient *c, size_t len, uint32_t time) { + (void)c; + static_cast(r)->_onAck(len, time); + }, + this + ); + _client->onPoll( + [](void *r, AsyncClient *c) { + (void)c; + static_cast(r)->_onPoll(); + }, + this + ); + _client->onData(NULL, NULL); + _client->onTimeout( + [this](void *r, AsyncClient *c __attribute__((unused)), uint32_t time) { + static_cast(r)->_onTimeout(time); + }, + this + ); + _client->onDisconnect( + [this](void *r, AsyncClient *c) { + static_cast(r)->_onDisconnect(); + delete c; + }, + this + ); + + _server->_addClient(this); + delete request; + + _client->setNoDelay(true); +} + +AsyncEventSourceClient::~AsyncEventSourceClient() { +#ifdef ESP32 + std::lock_guard lock(_lockmq); +#endif + _messageQueue.clear(); + close(); +} + +bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) { + if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) { +#ifdef ESP8266 + ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); +#elif defined(ESP32) + log_e("Event message queue overflow: discard message"); +#endif + return false; + } + +#ifdef ESP32 + // length() is not thread-safe, thus acquiring the lock before this call.. + std::lock_guard lock(_lockmq); +#endif + + _messageQueue.emplace_back(message, len); + + /* + throttle queue run + if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff + forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue + the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP + */ + if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) { + _runQueue(); + } + + return true; +} + +bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) { + if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) { +#ifdef ESP8266 + ets_printf(String(F("ERROR: Too many messages queued\n")).c_str()); +#elif defined(ESP32) + log_e("Event message queue overflow: discard message"); +#endif + return false; + } + +#ifdef ESP32 + // length() is not thread-safe, thus acquiring the lock before this call.. + std::lock_guard lock(_lockmq); +#endif + + _messageQueue.emplace_back(std::move(msg)); + + /* + throttle queue run + if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff + forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue + the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP + */ + if (_messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2 && _client->canSend()) { + _runQueue(); + } + return true; +} + +void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))) { +#ifdef ESP32 + // Same here, acquiring the lock early + std::lock_guard lock(_lockmq); +#endif + + // adjust in-flight len + if (len < _inflight) { + _inflight -= len; + } else { + _inflight = 0; + } + + // acknowledge as much messages's data as we got confirmed len from a AsyncTCP + while (len && _messageQueue.size()) { + len = _messageQueue.front().ack(len); + if (_messageQueue.front().finished()) { + // now we could release full ack'ed messages, we were keeping it unless send confirmed from AsyncTCP + _messageQueue.pop_front(); + } + } + + // try to send another batch of data + if (_messageQueue.size()) { + _runQueue(); + } +} + +void AsyncEventSourceClient::_onPoll() { + if (_messageQueue.size()) { +#ifdef ESP32 + // Same here, acquiring the lock early + std::lock_guard lock(_lockmq); +#endif + _runQueue(); + } +} + +void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))) { + if (_client) { + _client->close(true); + } +} + +void AsyncEventSourceClient::_onDisconnect() { + if (!_client) { + return; + } + _client = nullptr; + _server->_handleDisconnect(this); +} + +void AsyncEventSourceClient::close() { + if (_client) { + _client->close(); + } +} + +bool AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) { + if (!connected()) { + return false; + } + return _queueMessage(std::make_shared(generateEventMessage(message, event, id, reconnect))); +} + +void AsyncEventSourceClient::_runQueue() { + if (!_client) { + return; + } + + // there is no need to lock the mutex here, 'cause all the calls to this method must be already lock'ed + size_t total_bytes_written = 0; + for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) { + if (!i->sent()) { + const size_t bytes_written = i->write(_client); + total_bytes_written += bytes_written; + _inflight += bytes_written; + if (bytes_written == 0 || _inflight > _max_inflight) { + // Serial.print("_"); + break; + } + } + } + + // flush socket + if (total_bytes_written) { + _client->send(); + } +} + +void AsyncEventSourceClient::set_max_inflight_bytes(size_t value) { + if (value >= SSE_MIN_INFLIGH && value <= SSE_MAX_INFLIGH) { + _max_inflight = value; + } +} + +/* AsyncEventSource */ + +void AsyncEventSource::authorizeConnect(ArAuthorizeConnectHandler cb) { + AsyncAuthorizationMiddleware *m = new AsyncAuthorizationMiddleware(401, cb); + m->_freeOnRemoval = true; + addMiddleware(m); +} + +void AsyncEventSource::_addClient(AsyncEventSourceClient *client) { + if (!client) { + return; + } +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + _clients.emplace_back(client); + if (_connectcb) { + _connectcb(client); + } + + _adjust_inflight_window(); +} + +void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) { + if (_disconnectcb) { + _disconnectcb(client); + } +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + for (auto i = _clients.begin(); i != _clients.end(); ++i) { + if (i->get() == client) { + _clients.erase(i); + break; + } + } + _adjust_inflight_window(); +} + +void AsyncEventSource::close() { + // While the whole loop is not done, the linked list is locked and so the + // iterator should remain valid even when AsyncEventSource::_handleDisconnect() + // is called very early +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + for (const auto &c : _clients) { + if (c->connected()) { + /** + * @brief: Fix self-deadlock by using recursive_mutex instead. + * Due to c->close() shall call the callback function _onDisconnect() + * The calling flow _onDisconnect() --> _handleDisconnect() --> deadlock + */ + c->close(); + } + } +} + +// pmb fix +size_t AsyncEventSource::avgPacketsWaiting() const { + size_t aql = 0; + uint32_t nConnectedClients = 0; +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + if (!_clients.size()) { + return 0; + } + + for (const auto &c : _clients) { + if (c->connected()) { + aql += c->packetsWaiting(); + ++nConnectedClients; + } + } + return ((aql) + (nConnectedClients / 2)) / (nConnectedClients); // round up +} + +AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) { + AsyncEvent_SharedData_t shared_msg = std::make_shared(generateEventMessage(message, event, id, reconnect)); +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + size_t hits = 0; + size_t miss = 0; + for (const auto &c : _clients) { + if (c->write(shared_msg)) { + ++hits; + } else { + ++miss; + } + } + return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED); +} + +size_t AsyncEventSource::count() const { +#ifdef ESP32 + std::lock_guard lock(_client_queue_lock); +#endif + size_t n_clients{0}; + for (const auto &i : _clients) { + if (i->connected()) { + ++n_clients; + } + } + + return n_clients; +} + +bool AsyncEventSource::canHandle(AsyncWebServerRequest *request) const { + return request->isSSE() && request->url().equals(_url); +} + +void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) { + request->send(new AsyncEventSourceResponse(this)); +} + +void AsyncEventSource::_adjust_inflight_window() { + if (_clients.size()) { + size_t inflight = SSE_MAX_INFLIGH / _clients.size(); + for (const auto &c : _clients) { + c->set_max_inflight_bytes(inflight); + } + // Serial.printf("adjusted inflight to: %u\n", inflight); + } +} + +/* Response */ + +AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) { + _server = server; + _code = 200; + _contentType = T_text_event_stream; + _sendContentLength = false; + addHeader(T_Cache_Control, T_no_cache); + addHeader(T_Connection, T_keep_alive); +} + +void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request) { + String out; + _assembleHead(out, request->version()); + request->client()->write(out.c_str(), _headLength); + _state = RESPONSE_WAIT_ACK; +} + +size_t AsyncEventSourceResponse::_ack(AsyncWebServerRequest *request, size_t len, uint32_t time __attribute__((unused))) { + if (len) { + new AsyncEventSourceClient(request, _server); + } + return 0; +} -- cgit v1.3.1