.. _program_listing_file_sockets_include_sockets_detail_socket.hpp: Program Listing for File socket.hpp =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``sockets/include/sockets/detail/socket.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "address_family.hpp" #include "address_info.hpp" #include "message_buffer.hpp" namespace c2k { class TimeoutError final : public std::runtime_error { public: TimeoutError() : std::runtime_error{ "operation timed out" } {} }; class ReadError final : public std::runtime_error { public: using std::runtime_error::runtime_error; ReadError() : std::runtime_error{ "error reading from socket" } {} }; class SendError final : public std::runtime_error { using std::runtime_error::runtime_error; }; class ClientSocket; class AbstractSocket { public: #ifdef _WIN32 using OsSocketHandle = std::uintptr_t; #else using OsSocketHandle = int; #endif private: AddressInfo m_local_address_info; AddressInfo m_remote_address_info; protected: UniqueValue m_socket_descriptor; explicit AbstractSocket(OsSocketHandle os_socket_handle); public: [[nodiscard]] std::optional os_socket_handle() const { if (not m_socket_descriptor.has_value()) { return std::nullopt; } return m_socket_descriptor.value(); } AddressInfo const& local_address() && = delete; [[nodiscard]] AddressInfo const& local_address() const& { return m_local_address_info; } AddressInfo const& remote_address() && = delete; [[nodiscard]] AddressInfo const& remote_address() const& { return m_remote_address_info; } }; namespace detail { [[nodiscard]] AbstractSocket::OsSocketHandle initialize_server_socket( AddressFamily address_family, std::uint16_t port ); } class ServerSocket final : public AbstractSocket { friend class Sockets; private: std::jthread m_listen_thread; ServerSocket(AddressFamily address_family, std::uint16_t port, std::function on_connect); public: ServerSocket(ServerSocket&& other) noexcept = default; ServerSocket& operator=(ServerSocket&& other) noexcept = default; ~ServerSocket(); void stop(); }; class ClientSocket final : public AbstractSocket { friend class Sockets; friend void server_listen( std::stop_token const& stop_token, OsSocketHandle listen_socket, std::function const& on_connect ); private: struct SendTask { std::promise promise; std::vector data; SendTask(std::promise promise, std::vector data) : promise{ std::move(promise) }, data{ std::move(data) } {} }; struct ReceiveTask { enum class Kind { Exact, MaxBytes, }; std::promise> promise; std::size_t max_num_bytes; Kind kind; std::chrono::steady_clock::time_point end_time; ReceiveTask( std::promise> promise, std::size_t const max_num_bytes, Kind const kind, std::chrono::steady_clock::time_point const end_time ) : promise{ std::move(promise) }, max_num_bytes{ max_num_bytes }, kind{ kind }, end_time{ end_time } {} }; class State { private: NonNullOwner running{ make_non_null_owner(true) }; public: Synchronized> send_tasks{ std::deque{} }; Synchronized> receive_tasks{ std::deque{} }; std::condition_variable_any data_received_condition_variable; std::condition_variable_any data_sent_condition_variable; [[nodiscard]] bool is_running() const { return *running; } void stop_running() { *running = false; // we have to ensure that running is set to false while holding the lock, otherwise we have a // race condition regarding the condition variables which can lead to the threads blocking indefinitely receive_tasks.apply([this](auto const&) { *running = false; }); send_tasks.apply([this](auto const&) { *running = false; }); data_received_condition_variable.notify_one(); data_sent_condition_variable.notify_one(); } void clear_queues(); }; static constexpr auto default_timeout = static_cast(std::chrono::seconds{ 1 }); std::unique_ptr m_shared_state{ std::make_unique() }; std::jthread m_send_thread; std::jthread m_receive_thread; explicit ClientSocket(OsSocketHandle os_socket_handle); ClientSocket(AddressFamily address_family, std::string const& host, std::uint16_t port); static void keep_sending(State& state, OsSocketHandle socket); static void keep_receiving(State& state, OsSocketHandle socket); using Timeout = std::chrono::steady_clock::duration; public: ClientSocket(ClientSocket&& other) noexcept = default; ClientSocket& operator=(ClientSocket&& other) noexcept = default; ~ClientSocket(); [[nodiscard]] bool is_connected() const { return m_shared_state->is_running(); } // clang-format off [[nodiscard("discarding the return value may lead to the data to never be transmitted")]] std::future send(std::vector data); [[nodiscard("discarding the return value may lead to the data to never be transmitted")]] std::future send(std::integral auto... values) { auto package = MessageBuffer{}; (package << ... << values); return send(std::move(package)); } [[nodiscard("discarding the return value may lead to the data to never be transmitted")]] std::future send(MessageBuffer const& package) { return send(package.data()); } [[nodiscard("discarding the return value may lead to the data to never be transmitted")]] std::future send(MessageBuffer&& package) { return send(std::move(package).data()); } // clang-format on [[nodiscard]] std::future> receive(std::size_t max_num_bytes); [[nodiscard]] std::future> receive(std::size_t max_num_bytes, Timeout timeout); [[nodiscard]] std::future> receive_exact(std::size_t num_bytes); [[nodiscard]] std::future> receive_exact(std::size_t num_bytes, Timeout timeout); template [[nodiscard]] auto receive(Timeout const timeout = default_timeout) { static constexpr auto total_size = detail::summed_sizeof(); auto future = receive_exact(total_size, timeout); return std::async(std::launch::deferred, [future = std::move(future)]() mutable { auto message_buffer = MessageBuffer{ future.get() }; assert(message_buffer.size() == total_size); return message_buffer.try_extract().value(); // should never fail since we have enough data }); } void close(); private: // clang-format off [[nodiscard]] std::future> receive_implementation( std::size_t max_num_bytes, ReceiveTask::Kind kind, std::optional end_time ); // clang-format on [[nodiscard]] static bool process_receive_task(OsSocketHandle socket, ReceiveTask task); [[nodiscard]] static bool process_send_task(OsSocketHandle socket, SendTask task); }; } // namespace c2k