diff --git a/esphome/components/api/__init__.py b/esphome/components/api/__init__.py index 559f8f649c..fc140dc7d2 100644 --- a/esphome/components/api/__init__.py +++ b/esphome/components/api/__init__.py @@ -19,7 +19,7 @@ from esphome.const import ( from esphome.core import coroutine_with_priority DEPENDENCIES = ["network"] -AUTO_LOAD = ["async_tcp"] +AUTO_LOAD = ["socket"] CODEOWNERS = ["@OttoWinter"] api_ns = cg.esphome_ns.namespace("api") diff --git a/esphome/components/api/api_connection.cpp b/esphome/components/api/api_connection.cpp index 99e611be10..bce0b0bab8 100644 --- a/esphome/components/api/api_connection.cpp +++ b/esphome/components/api/api_connection.cpp @@ -2,6 +2,7 @@ #include "esphome/core/log.h" #include "esphome/core/util.h" #include "esphome/core/version.h" +#include #ifdef USE_DEEP_SLEEP #include "esphome/components/deep_sleep/deep_sleep_component.h" @@ -18,74 +19,27 @@ namespace api { static const char *const TAG = "api.connection"; -APIConnection::APIConnection(AsyncClient *client, APIServer *parent) - : client_(client), parent_(parent), initial_state_iterator_(parent, this), list_entities_iterator_(parent, this) { - this->client_->onError([](void *s, AsyncClient *c, int8_t error) { ((APIConnection *) s)->on_error_(error); }, this); - this->client_->onDisconnect([](void *s, AsyncClient *c) { ((APIConnection *) s)->on_disconnect_(); }, this); - this->client_->onTimeout([](void *s, AsyncClient *c, uint32_t time) { ((APIConnection *) s)->on_timeout_(time); }, - this); - this->client_->onData([](void *s, AsyncClient *c, void *buf, - size_t len) { ((APIConnection *) s)->on_data_(reinterpret_cast(buf), len); }, - this); +APIConnection::APIConnection(std::unique_ptr sock, APIServer *parent) + : parent_(parent), initial_state_iterator_(parent, this), list_entities_iterator_(parent, this) { + this->proto_write_buffer_.reserve(64); - this->send_buffer_.reserve(64); - this->recv_buffer_.reserve(32); - this->client_info_ = this->client_->remoteIP().toString().c_str(); + helper_ = std::unique_ptr{new APIPlaintextFrameHelper(std::move(sock))}; +} +void APIConnection::start() { this->last_traffic_ = millis(); -} -APIConnection::~APIConnection() { delete this->client_; } -void APIConnection::on_error_(int8_t error) { this->remove_ = true; } -void APIConnection::on_disconnect_() { this->remove_ = true; } -void APIConnection::on_timeout_(uint32_t time) { this->on_fatal_error(); } -void APIConnection::on_data_(uint8_t *buf, size_t len) { - if (len == 0 || buf == nullptr) + + APIError err = helper_->init(); + if (err != APIError::OK) { + ESP_LOGW(TAG, "Helper init failed: %d errno=%d", (int) err, errno); + remove_ = true; return; - this->recv_buffer_.insert(this->recv_buffer_.end(), buf, buf + len); -} -void APIConnection::parse_recv_buffer_() { - if (this->recv_buffer_.empty() || this->remove_) - return; - - while (!this->recv_buffer_.empty()) { - if (this->recv_buffer_[0] != 0x00) { - ESP_LOGW(TAG, "Invalid preamble from %s", this->client_info_.c_str()); - this->on_fatal_error(); - return; - } - uint32_t i = 1; - const uint32_t size = this->recv_buffer_.size(); - uint32_t consumed; - auto msg_size_varint = ProtoVarInt::parse(&this->recv_buffer_[i], size - i, &consumed); - if (!msg_size_varint.has_value()) - // not enough data there yet - return; - i += consumed; - uint32_t msg_size = msg_size_varint->as_uint32(); - - auto msg_type_varint = ProtoVarInt::parse(&this->recv_buffer_[i], size - i, &consumed); - if (!msg_type_varint.has_value()) - // not enough data there yet - return; - i += consumed; - uint32_t msg_type = msg_type_varint->as_uint32(); - - if (size - i < msg_size) - // message body not fully received - return; - - uint8_t *msg = &this->recv_buffer_[i]; - this->read_message(msg_size, msg_type, msg); - if (this->remove_) - return; - // pop front - uint32_t total = i + msg_size; - this->recv_buffer_.erase(this->recv_buffer_.begin(), this->recv_buffer_.begin() + total); - this->last_traffic_ = millis(); } + client_info_ = helper_->getpeername(); + helper_->set_log_info(client_info_); } -void APIConnection::disconnect_client() { - this->client_->close(); +void APIConnection::force_disconnect_client() { + this->helper_->close(); this->remove_ = true; } @@ -93,61 +47,74 @@ void APIConnection::loop() { if (this->remove_) return; - if (this->next_close_) { - this->disconnect_client(); - return; - } - if (!network_is_connected()) { // when network is disconnected force disconnect immediately // don't wait for timeout this->on_fatal_error(); return; } - if (this->client_->disconnected()) { - // failsafe for disconnect logic - this->on_disconnect_(); + if (this->next_close_) { + this->helper_->close(); + this->remove_ = true; return; } - this->parse_recv_buffer_(); + + APIError err = helper_->loop(); + if (err != APIError::OK) { + on_fatal_error(); + ESP_LOGW(TAG, "%s: Socket operation failed: %d", client_info_.c_str(), (int) err); + return; + } + ReadPacketBuffer buffer; + err = helper_->read_packet(&buffer); + if (err == APIError::WOULD_BLOCK) { + // pass + } else if (err != APIError::OK) { + on_fatal_error(); + ESP_LOGW(TAG, "%s: Reading failed: %d", client_info_.c_str(), (int) err); + return; + } else { + this->last_traffic_ = millis(); + // read a packet + this->read_message(buffer.data_len, buffer.type, &buffer.container[buffer.data_offset]); + if (this->remove_) + return; + } this->list_entities_iterator_.advance(); this->initial_state_iterator_.advance(); const uint32_t keepalive = 60000; + const uint32_t now = millis(); if (this->sent_ping_) { // Disconnect if not responded within 2.5*keepalive - if (millis() - this->last_traffic_ > (keepalive * 5) / 2) { + if (now - this->last_traffic_ > (keepalive * 5) / 2) { + this->force_disconnect_client(); ESP_LOGW(TAG, "'%s' didn't respond to ping request in time. Disconnecting...", this->client_info_.c_str()); - this->disconnect_client(); } - } else if (millis() - this->last_traffic_ > keepalive) { + } else if (now - this->last_traffic_ > keepalive) { this->sent_ping_ = true; this->send_ping_request(PingRequest()); } #ifdef USE_ESP32_CAMERA - if (this->image_reader_.available()) { - uint32_t space = this->client_->space(); - // reserve 15 bytes for metadata, and at least 64 bytes of data - if (space >= 15 + 64) { - uint32_t to_send = std::min(space - 15, this->image_reader_.available()); - auto buffer = this->create_buffer(); - // fixed32 key = 1; - buffer.encode_fixed32(1, esp32_camera::global_esp32_camera->get_object_id_hash()); - // bytes data = 2; - buffer.encode_bytes(2, this->image_reader_.peek_data_buffer(), to_send); - // bool done = 3; - bool done = this->image_reader_.available() == to_send; - buffer.encode_bool(3, done); - bool success = this->send_buffer(buffer, 44); + if (this->image_reader_.available() && this->helper_->can_write_without_blocking()) { + uint32_t to_send = std::min((size_t) 1024, this->image_reader_.available()); + auto buffer = this->create_buffer(); + // fixed32 key = 1; + buffer.encode_fixed32(1, esp32_camera::global_esp32_camera->get_object_id_hash()); + // bytes data = 2; + buffer.encode_bytes(2, this->image_reader_.peek_data_buffer(), to_send); + // bool done = 3; + bool done = this->image_reader_.available() == to_send; + buffer.encode_bool(3, done); + bool success = this->send_buffer(buffer, 44); - if (success) { - this->image_reader_.consume_data(to_send); - } - if (success && done) { - this->image_reader_.return_image(); - } + if (success) { + this->image_reader_.consume_data(to_send); + } + if (success && done) { + this->image_reader_.return_image(); } } #endif @@ -709,8 +676,8 @@ bool APIConnection::send_log_message(int level, const char *tag, const char *lin } HelloResponse APIConnection::hello(const HelloRequest &msg) { - this->client_info_ = msg.client_info + " (" + this->client_->remoteIP().toString().c_str(); - this->client_info_ += ")"; + this->client_info_ = msg.client_info + " (" + this->helper_->getpeername() + ")"; + this->helper_->set_log_info(client_info_); ESP_LOGV(TAG, "Hello from client: '%s'", this->client_info_.c_str()); HelloResponse resp; @@ -786,44 +753,31 @@ void APIConnection::subscribe_home_assistant_states(const SubscribeHomeAssistant bool APIConnection::send_buffer(ProtoWriteBuffer buffer, uint32_t message_type) { if (this->remove_) return false; + if (!this->helper_->can_write_without_blocking()) + return false; - std::vector header; - header.push_back(0x00); - ProtoVarInt(buffer.get_buffer()->size()).encode(header); - ProtoVarInt(message_type).encode(header); - - size_t needed_space = buffer.get_buffer()->size() + header.size(); - - if (needed_space > this->client_->space()) { - delay(0); - if (needed_space > this->client_->space()) { - // SubscribeLogsResponse - if (message_type != 29) { - ESP_LOGV(TAG, "Cannot send message because of TCP buffer space"); - } - delay(0); - return false; - } + APIError err = this->helper_->write_packet(message_type, buffer.get_buffer()->data(), buffer.get_buffer()->size()); + if (err == APIError::WOULD_BLOCK) + return false; + if (err != APIError::OK) { + on_fatal_error(); + ESP_LOGW(TAG, "%s: Packet write failed %d errno=%d", client_info_.c_str(), (int) err, errno); + return false; } - - this->client_->add(reinterpret_cast(header.data()), header.size(), - ASYNC_WRITE_FLAG_COPY | ASYNC_WRITE_FLAG_MORE); - this->client_->add(reinterpret_cast(buffer.get_buffer()->data()), buffer.get_buffer()->size(), - ASYNC_WRITE_FLAG_COPY); - bool ret = this->client_->send(); - return ret; + this->last_traffic_ = millis(); + return true; } void APIConnection::on_unauthenticated_access() { - ESP_LOGD(TAG, "'%s' tried to access without authentication.", this->client_info_.c_str()); this->on_fatal_error(); + ESP_LOGD(TAG, "'%s' tried to access without authentication.", this->client_info_.c_str()); } void APIConnection::on_no_setup_connection() { - ESP_LOGD(TAG, "'%s' tried to access without full connection.", this->client_info_.c_str()); this->on_fatal_error(); + ESP_LOGD(TAG, "'%s' tried to access without full connection.", this->client_info_.c_str()); } void APIConnection::on_fatal_error() { ESP_LOGV(TAG, "Error: Disconnecting %s", this->client_info_.c_str()); - this->client_->close(); + this->helper_->close(); this->remove_ = true; } diff --git a/esphome/components/api/api_connection.h b/esphome/components/api/api_connection.h index bc9839a423..a1788bbede 100644 --- a/esphome/components/api/api_connection.h +++ b/esphome/components/api/api_connection.h @@ -5,16 +5,18 @@ #include "api_pb2.h" #include "api_pb2_service.h" #include "api_server.h" +#include "api_frame_helper.h" namespace esphome { namespace api { class APIConnection : public APIServerConnection { public: - APIConnection(AsyncClient *client, APIServer *parent); - virtual ~APIConnection(); + APIConnection(std::unique_ptr socket, APIServer *parent); + virtual ~APIConnection() = default; - void disconnect_client(); + void start(); + void force_disconnect_client(); void loop(); bool send_list_info_done() { @@ -87,8 +89,8 @@ class APIConnection : public APIServerConnection { #endif void on_disconnect_response(const DisconnectResponse &value) override { - // we initiated disconnect_client - this->next_close_ = true; + this->helper_->close(); + this->remove_ = true; } void on_ping_response(const PingResponse &value) override { // we initiated ping @@ -102,6 +104,8 @@ class APIConnection : public APIServerConnection { ConnectResponse connect(const ConnectRequest &msg) override; DisconnectResponse disconnect(const DisconnectRequest &msg) override { // remote initiated disconnect_client + // don't close yet, we still need to send the disconnect response + // close will happen on next loop this->next_close_ = true; DisconnectResponse resp; return resp; @@ -135,19 +139,16 @@ class APIConnection : public APIServerConnection { void on_unauthenticated_access() override; void on_no_setup_connection() override; ProtoWriteBuffer create_buffer() override { - this->send_buffer_.clear(); - return {&this->send_buffer_}; + // FIXME: ensure no recursive writes can happen + this->proto_write_buffer_.clear(); + return {&this->proto_write_buffer_}; } bool send_buffer(ProtoWriteBuffer buffer, uint32_t message_type) override; protected: friend APIServer; - void on_error_(int8_t error); - void on_disconnect_(); - void on_timeout_(uint32_t time); - void on_data_(uint8_t *buf, size_t len); - void parse_recv_buffer_(); + bool send_(const void *buf, size_t len, bool force); enum class ConnectionState { WAITING_FOR_HELLO, @@ -157,8 +158,10 @@ class APIConnection : public APIServerConnection { bool remove_{false}; - std::vector send_buffer_; - std::vector recv_buffer_; + // Buffer used to encode proto messages + // Re-use to prevent allocations + std::vector proto_write_buffer_; + std::unique_ptr helper_; std::string client_info_; #ifdef USE_ESP32_CAMERA @@ -170,9 +173,7 @@ class APIConnection : public APIServerConnection { uint32_t last_traffic_; bool sent_ping_{false}; bool service_call_subscription_{false}; - bool current_nodelay_{false}; - bool next_close_{false}; - AsyncClient *client_; + bool next_close_ = false; APIServer *parent_; InitialStateIterator initial_state_iterator_; ListEntitiesIterator list_entities_iterator_; diff --git a/esphome/components/api/api_frame_helper.cpp b/esphome/components/api/api_frame_helper.cpp new file mode 100644 index 0000000000..f903ab8656 --- /dev/null +++ b/esphome/components/api/api_frame_helper.cpp @@ -0,0 +1,294 @@ +#include "api_frame_helper.h" + +#include "esphome/core/log.h" +#include "esphome/core/helpers.h" +#include "proto.h" + +namespace esphome { +namespace api { + +static const char *const TAG = "api.socket"; + +/// Is the given return value (from read/write syscalls) a wouldblock error? +bool is_would_block(ssize_t ret) { + if (ret == -1) { + return errno == EWOULDBLOCK || errno == EAGAIN; + } + return ret == 0; +} + +#define HELPER_LOG(msg, ...) ESP_LOGVV(TAG, "%s: " msg, info_.c_str(), ##__VA_ARGS__) + +/// Initialize the frame helper, returns OK if successful. +APIError APIPlaintextFrameHelper::init() { + if (state_ != State::INITIALIZE || socket_ == nullptr) { + HELPER_LOG("Bad state for init %d", (int) state_); + return APIError::BAD_STATE; + } + int err = socket_->setblocking(false); + if (err != 0) { + state_ = State::FAILED; + HELPER_LOG("Setting nonblocking failed with errno %d", errno); + return APIError::TCP_NONBLOCKING_FAILED; + } + int enable = 1; + err = socket_->setsockopt(IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); + if (err != 0) { + state_ = State::FAILED; + HELPER_LOG("Setting nodelay failed with errno %d", errno); + return APIError::TCP_NODELAY_FAILED; + } + + state_ = State::DATA; + return APIError::OK; +} +/// Not used for plaintext +APIError APIPlaintextFrameHelper::loop() { + if (state_ != State::DATA) { + return APIError::BAD_STATE; + } + // try send pending TX data + if (!tx_buf_.empty()) { + APIError err = try_send_tx_buf_(); + if (err != APIError::OK) { + return err; + } + } + return APIError::OK; +} + +/** Read a packet into the rx_buf_. If successful, stores frame data in the frame parameter + * + * @param frame: The struct to hold the frame information in. + * msg: store the parsed frame in that struct + * + * @return See APIError + * + * error API_ERROR_BAD_INDICATOR: Bad indicator byte at start of frame. + */ +APIError APIPlaintextFrameHelper::try_read_frame_(ParsedFrame *frame) { + int err; + APIError aerr; + + if (frame == nullptr) { + HELPER_LOG("Bad argument for try_read_frame_"); + return APIError::BAD_ARG; + } + + // read header + while (!rx_header_parsed_) { + uint8_t data; + ssize_t received = socket_->read(&data, 1); + if (is_would_block(received)) { + return APIError::WOULD_BLOCK; + } else if (received == -1) { + state_ = State::FAILED; + HELPER_LOG("Socket read failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } + rx_header_buf_.push_back(data); + + // try parse header + if (rx_header_buf_[0] != 0x00) { + state_ = State::FAILED; + HELPER_LOG("Bad indicator byte %u", rx_header_buf_[0]); + return APIError::BAD_INDICATOR; + } + + size_t i = 1; + size_t consumed = 0; + auto msg_size_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); + if (!msg_size_varint.has_value()) { + // not enough data there yet + continue; + } + + i += consumed; + rx_header_parsed_len_ = msg_size_varint->as_uint32(); + + auto msg_type_varint = ProtoVarInt::parse(&rx_header_buf_[i], rx_header_buf_.size() - i, &consumed); + if (!msg_type_varint.has_value()) { + // not enough data there yet + continue; + } + rx_header_parsed_type_ = msg_type_varint->as_uint32(); + rx_header_parsed_ = true; + } + // header reading done + + // reserve space for body + if (rx_buf_.size() != rx_header_parsed_len_) { + rx_buf_.resize(rx_header_parsed_len_); + } + + if (rx_buf_len_ < rx_header_parsed_len_) { + // more data to read + size_t to_read = rx_header_parsed_len_ - rx_buf_len_; + ssize_t received = socket_->read(&rx_buf_[rx_buf_len_], to_read); + if (is_would_block(received)) { + return APIError::WOULD_BLOCK; + } else if (received == -1) { + state_ = State::FAILED; + HELPER_LOG("Socket read failed with errno %d", errno); + return APIError::SOCKET_READ_FAILED; + } + rx_buf_len_ += received; + if (received != to_read) { + // not all read + return APIError::WOULD_BLOCK; + } + } + + // uncomment for even more debugging + // ESP_LOGVV(TAG, "Received frame: %s", hexencode(rx_buf_).c_str()); + frame->msg = std::move(rx_buf_); + // consume msg + rx_buf_ = {}; + rx_buf_len_ = 0; + rx_header_buf_.clear(); + rx_header_parsed_ = false; + return APIError::OK; +} + +APIError APIPlaintextFrameHelper::read_packet(ReadPacketBuffer *buffer) { + int err; + APIError aerr; + + if (state_ != State::DATA) { + return APIError::WOULD_BLOCK; + } + + ParsedFrame frame; + aerr = try_read_frame_(&frame); + if (aerr != APIError::OK) + return aerr; + + buffer->container = std::move(frame.msg); + buffer->data_offset = 0; + buffer->data_len = rx_header_parsed_len_; + buffer->type = rx_header_parsed_type_; + return APIError::OK; +} +bool APIPlaintextFrameHelper::can_write_without_blocking() { return state_ == State::DATA && tx_buf_.empty(); } +APIError APIPlaintextFrameHelper::write_packet(uint16_t type, const uint8_t *payload, size_t payload_len) { + int err; + APIError aerr; + + if (state_ != State::DATA) { + return APIError::BAD_STATE; + } + + std::vector header; + header.push_back(0x00); + ProtoVarInt(payload_len).encode(header); + ProtoVarInt(type).encode(header); + + aerr = write_raw_(&header[0], header.size()); + if (aerr != APIError::OK) { + return aerr; + } + aerr = write_raw_(payload, payload_len); + if (aerr != APIError::OK) { + return aerr; + } + return APIError::OK; +} +APIError APIPlaintextFrameHelper::try_send_tx_buf_() { + // try send from tx_buf + while (state_ != State::CLOSED && !tx_buf_.empty()) { + ssize_t sent = socket_->write(tx_buf_.data(), tx_buf_.size()); + if (sent == -1) { + if (errno == EWOULDBLOCK || errno == EAGAIN) + break; + state_ = State::FAILED; + HELPER_LOG("Socket write failed with errno %d", errno); + return APIError::SOCKET_WRITE_FAILED; + } else if (sent == 0) { + break; + } + // TODO: inefficient if multiple packets in txbuf + // replace with deque of buffers + tx_buf_.erase(tx_buf_.begin(), tx_buf_.begin() + sent); + } + + return APIError::OK; +} +/** Write the data to the socket, or buffer it a write would block + * + * @param data The data to write + * @param len The length of data + */ +APIError APIPlaintextFrameHelper::write_raw_(const uint8_t *data, size_t len) { + if (len == 0) + return APIError::OK; + int err; + APIError aerr; + + // uncomment for even more debugging + // ESP_LOGVV(TAG, "Sending raw: %s", hexencode(data, len).c_str()); + + if (!tx_buf_.empty()) { + // try to empty tx_buf_ first + aerr = try_send_tx_buf_(); + if (aerr != APIError::OK && aerr != APIError::WOULD_BLOCK) + return aerr; + } + + if (!tx_buf_.empty()) { + // tx buf not empty, can't write now because then stream would be inconsistent + tx_buf_.insert(tx_buf_.end(), data, data + len); + return APIError::OK; + } + + ssize_t sent = socket_->write(data, len); + if (is_would_block(sent)) { + // operation would block, add buffer to tx_buf + tx_buf_.insert(tx_buf_.end(), data, data + len); + return APIError::OK; + } else if (sent == -1) { + // an error occured + state_ = State::FAILED; + HELPER_LOG("Socket write failed with errno %d", errno); + return APIError::SOCKET_WRITE_FAILED; + } else if (sent != len) { + // partially sent, add end to tx_buf + tx_buf_.insert(tx_buf_.end(), data + sent, data + len); + return APIError::OK; + } + // fully sent + return APIError::OK; +} +APIError APIPlaintextFrameHelper::write_frame_(const uint8_t *data, size_t len) { + APIError aerr; + + uint8_t header[3]; + header[0] = 0x01; // indicator + header[1] = (uint8_t)(len >> 8); + header[2] = (uint8_t) len; + + aerr = write_raw_(header, 3); + if (aerr != APIError::OK) + return aerr; + aerr = write_raw_(data, len); + return aerr; +} + +APIError APIPlaintextFrameHelper::close() { + state_ = State::CLOSED; + int err = socket_->close(); + if (err == -1) + return APIError::CLOSE_FAILED; + return APIError::OK; +} +APIError APIPlaintextFrameHelper::shutdown(int how) { + int err = socket_->shutdown(how); + if (err == -1) + return APIError::SHUTDOWN_FAILED; + if (how == SHUT_RDWR) { + state_ = State::CLOSED; + } + return APIError::OK; +} + +} // namespace api +} // namespace esphome diff --git a/esphome/components/api/api_frame_helper.h b/esphome/components/api/api_frame_helper.h new file mode 100644 index 0000000000..14a0760c25 --- /dev/null +++ b/esphome/components/api/api_frame_helper.h @@ -0,0 +1,103 @@ +#pragma once +#include +#include +#include + +#include "esphome/core/defines.h" + +#include "esphome/components/socket/socket.h" + +namespace esphome { +namespace api { + +struct ReadPacketBuffer { + std::vector container; + uint16_t type; + size_t data_offset; + size_t data_len; +}; + +struct PacketBuffer { + const std::vector container; + uint16_t type; + uint8_t data_offset; + uint8_t data_len; +}; + +enum class APIError : int { + OK = 0, + WOULD_BLOCK = 1001, + BAD_INDICATOR = 1003, + BAD_DATA_PACKET = 1004, + TCP_NODELAY_FAILED = 1005, + TCP_NONBLOCKING_FAILED = 1006, + CLOSE_FAILED = 1007, + SHUTDOWN_FAILED = 1008, + BAD_STATE = 1009, + BAD_ARG = 1010, + SOCKET_READ_FAILED = 1011, + SOCKET_WRITE_FAILED = 1012, + OUT_OF_MEMORY = 1018, +}; + +class APIFrameHelper { + public: + virtual APIError init() = 0; + virtual APIError loop() = 0; + virtual APIError read_packet(ReadPacketBuffer *buffer) = 0; + virtual bool can_write_without_blocking() = 0; + virtual APIError write_packet(uint16_t type, const uint8_t *data, size_t len) = 0; + virtual std::string getpeername() = 0; + virtual APIError close() = 0; + virtual APIError shutdown(int how) = 0; + // Give this helper a name for logging + virtual void set_log_info(std::string info) = 0; +}; +class APIPlaintextFrameHelper : public APIFrameHelper { + public: + APIPlaintextFrameHelper(std::unique_ptr socket) : socket_(std::move(socket)) {} + ~APIPlaintextFrameHelper() = default; + APIError init() override; + APIError loop() override; + APIError read_packet(ReadPacketBuffer *buffer) override; + bool can_write_without_blocking() override; + APIError write_packet(uint16_t type, const uint8_t *payload, size_t len) override; + std::string getpeername() override { return socket_->getpeername(); } + APIError close() override; + APIError shutdown(int how) override; + // Give this helper a name for logging + void set_log_info(std::string info) override { info_ = std::move(info); } + + protected: + struct ParsedFrame { + std::vector msg; + }; + + APIError try_read_frame_(ParsedFrame *frame); + APIError try_send_tx_buf_(); + APIError write_frame_(const uint8_t *data, size_t len); + APIError write_raw_(const uint8_t *data, size_t len); + + std::unique_ptr socket_; + + std::string info_; + std::vector rx_header_buf_; + bool rx_header_parsed_ = false; + uint32_t rx_header_parsed_type_ = 0; + uint32_t rx_header_parsed_len_ = 0; + + std::vector rx_buf_; + size_t rx_buf_len_ = 0; + + std::vector tx_buf_; + + enum class State { + INITIALIZE = 1, + DATA = 2, + CLOSED = 3, + FAILED = 4, + } state_ = State::INITIALIZE; +}; + +} // namespace api +} // namespace esphome diff --git a/esphome/components/api/api_server.cpp b/esphome/components/api/api_server.cpp index d48c0a4fd8..c4c193b389 100644 --- a/esphome/components/api/api_server.cpp +++ b/esphome/components/api/api_server.cpp @@ -1,10 +1,11 @@ #include "api_server.h" #include "api_connection.h" -#include "esphome/core/log.h" #include "esphome/core/application.h" -#include "esphome/core/util.h" #include "esphome/core/defines.h" +#include "esphome/core/log.h" +#include "esphome/core/util.h" #include "esphome/core/version.h" +#include #ifdef USE_LOGGER #include "esphome/components/logger/logger.h" @@ -21,20 +22,45 @@ static const char *const TAG = "api"; void APIServer::setup() { ESP_LOGCONFIG(TAG, "Setting up Home Assistant API server..."); this->setup_controller(); - this->server_ = AsyncServer(this->port_); - this->server_.setNoDelay(false); - this->server_.begin(); - this->server_.onClient( - [](void *s, AsyncClient *client) { - if (client == nullptr) - return; + socket_ = socket::socket(AF_INET, SOCK_STREAM, 0); + if (socket_ == nullptr) { + ESP_LOGW(TAG, "Could not create socket."); + this->mark_failed(); + return; + } + int enable = 1; + int err = socket_->setsockopt(SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)); + if (err != 0) { + ESP_LOGW(TAG, "Socket unable to set reuseaddr: errno %d", err); + // we can still continue + } + err = socket_->setblocking(false); + if (err != 0) { + ESP_LOGW(TAG, "Socket unable to set nonblocking mode: errno %d", err); + this->mark_failed(); + return; + } + + struct sockaddr_in server; + memset(&server, 0, sizeof(server)); + server.sin_family = AF_INET; + server.sin_addr.s_addr = ESPHOME_INADDR_ANY; + server.sin_port = htons(this->port_); + + err = socket_->bind((struct sockaddr *) &server, sizeof(server)); + if (err != 0) { + ESP_LOGW(TAG, "Socket unable to bind: errno %d", errno); + this->mark_failed(); + return; + } + + err = socket_->listen(4); + if (err != 0) { + ESP_LOGW(TAG, "Socket unable to listen: errno %d", errno); + this->mark_failed(); + return; + } - // can't print here because in lwIP thread - // ESP_LOGD(TAG, "New client connected from %s", client->remoteIP().toString().c_str()); - auto *a_this = (APIServer *) s; - a_this->clients_.push_back(new APIConnection(client, a_this)); - }, - this); #ifdef USE_LOGGER if (logger::global_logger != nullptr) { logger::global_logger->add_on_log_callback([this](int level, const char *tag, const char *message) { @@ -59,6 +85,20 @@ void APIServer::setup() { #endif } void APIServer::loop() { + // Accept new clients + while (true) { + struct sockaddr_storage source_addr; + socklen_t addr_len = sizeof(source_addr); + auto sock = socket_->accept((struct sockaddr *) &source_addr, &addr_len); + if (!sock) + break; + ESP_LOGD(TAG, "Accepted %s", sock->getpeername().c_str()); + + auto *conn = new APIConnection(std::move(sock), this); + clients_.push_back(conn); + conn->start(); + } + // Partition clients into remove and active auto new_end = std::partition(this->clients_.begin(), this->clients_.end(), [](APIConnection *conn) { return !conn->remove_; }); diff --git a/esphome/components/api/api_server.h b/esphome/components/api/api_server.h index 96b3192e9e..7c42fe7dd5 100644 --- a/esphome/components/api/api_server.h +++ b/esphome/components/api/api_server.h @@ -4,6 +4,7 @@ #include "esphome/core/controller.h" #include "esphome/core/defines.h" #include "esphome/core/log.h" +#include "esphome/components/socket/socket.h" #include "api_pb2.h" #include "api_pb2_service.h" #include "util.h" @@ -11,13 +12,6 @@ #include "subscribe_state.h" #include "user_services.h" -#ifdef ARDUINO_ARCH_ESP32 -#include -#endif -#ifdef ARDUINO_ARCH_ESP8266 -#include -#endif - namespace esphome { namespace api { @@ -35,6 +29,7 @@ class APIServer : public Component, public Controller { void set_port(uint16_t port); void set_password(const std::string &password); void set_reboot_timeout(uint32_t reboot_timeout); + void handle_disconnect(APIConnection *conn); #ifdef USE_BINARY_SENSOR void on_binary_sensor_update(binary_sensor::BinarySensor *obj, bool state) override; @@ -86,7 +81,7 @@ class APIServer : public Component, public Controller { const std::vector &get_user_services() const { return this->user_services_; } protected: - AsyncServer server_{0}; + std::unique_ptr socket_ = nullptr; uint16_t port_{6053}; uint32_t reboot_timeout_{300000}; uint32_t last_connected_{0}; diff --git a/esphome/components/socket/headers.h b/esphome/components/socket/headers.h index a084823bdf..da710b760e 100644 --- a/esphome/components/socket/headers.h +++ b/esphome/components/socket/headers.h @@ -90,7 +90,11 @@ typedef uint32_t socklen_t; #undef INADDR_NONE #endif -#define INADDR_ANY ((uint32_t) 0x00000000UL) +#define ESPHOME_INADDR_ANY ((uint32_t) 0x00000000UL) +#define ESPHOME_INADDR_NONE ((uint32_t) 0xFFFFFFFFUL) +#else // !ARDUINO_ARCH_ESP8266 +#define ESPHOME_INADDR_ANY INADDR_ANY +#define ESPHOME_INADDR_NONE INADDR_NONE #endif #endif // USE_SOCKET_IMPL_LWIP_TCP @@ -112,6 +116,12 @@ typedef uint32_t socklen_t; #endif // not defined for ESP32 typedef uint32_t socklen_t; -#endif // ARDUINO_ARCH_ESP32 + +#define ESPHOME_INADDR_ANY ((uint32_t) 0x00000000UL) +#define ESPHOME_INADDR_NONE ((uint32_t) 0xFFFFFFFFUL) +#else // !ARDUINO_ARCH_ESP32 +#define ESPHOME_INADDR_ANY INADDR_ANY +#define ESPHOME_INADDR_NONE INADDR_NONE +#endif #endif // USE_SOCKET_IMPL_BSD_SOCKETS