Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/bitcoin/network/channels/channel_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class BCT_API channel_http
/// Read request buffer (requires strand).
virtual http::flat_buffer& request_buffer() NOEXCEPT;

/// Override to change default websocket reader expectation/config.
virtual http::request_ptr create_request() const NOEXCEPT;

/// Determine if http basic authorization is satisfied if enabled.
virtual bool unauthorized(const http::request& request) NOEXCEPT;

Expand Down
18 changes: 6 additions & 12 deletions include/bitcoin/network/channels/channel_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,12 @@
#include <bitcoin/network/messages/messages.hpp>
#include <bitcoin/network/net/net.hpp>

// TODO: This template can be re-based on channel_http giving it the ability to
// TODO: support RPC over both tcp and http/ws. In that case the settings type
// TODO: can be templatized and reader/writer configured at compile. Without an
// TODO: http reader the socket cannot be upgraded to ws and thus operates as a
// TODO: simple TCP socket, and TLS remains possible with all protocols. Http
// TODO: methods are dispatched from channel to base protocol and re-dispatched
// TODO: to subscribers. TCP/WS are dispatched to protocol as a custom method.
// TODO: this allows the base protocol to differentiate these as non-http for
// TODO: selection of the proper response path. As http is half duplex, notify
// TODO: is only provided to a TCP or upgraded (WS) socket.

namespace libbitcoin {
namespace network {

/// Read rpc-request and send rpc-response, dispatch to Interface.
/// read/write rpc::request/response over tcp/s, dispatch to Interface.
/// For rpc over http/s and/or ws/s use channel_http with forwarding
/// protocol dispatch (e.g. from http::post and synthetic ws::unknown).
template <typename Interface>
class channel_rpc
: public channel
Expand Down Expand Up @@ -99,6 +90,9 @@ class channel_rpc
/// Read request buffer (requires strand).
virtual inline http::flat_buffer& request_buffer() NOEXCEPT;

/// Override to change default reader config.
virtual rpc::request_ptr create_request() const NOEXCEPT;

/// Override to dispatch request to subscribers by requested method.
virtual inline void dispatch(const rpc::request_cptr& request) NOEXCEPT;

Expand Down
17 changes: 14 additions & 3 deletions include/bitcoin/network/impl/channels/channel_rpc.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ inline void CLASS::receive() NOEXCEPT
return;

reading_ = true;
const auto in = to_shared<rpc::request>();

const auto in = create_request();
read(request_buffer(), *in,
std::bind(&channel_rpc::handle_receive,
shared_from_base<channel_rpc>(), _1, _2, in));
Expand Down Expand Up @@ -128,13 +128,24 @@ inline void CLASS::dispatch(const rpc::request_cptr& request) NOEXCEPT
stop(code);
}

// request helpers
// ----------------------------------------------------------------------------

TEMPLATE
inline http::flat_buffer& CLASS::request_buffer() NOEXCEPT
{
BC_ASSERT(stranded());
return request_buffer_;
}

TEMPLATE
inline rpc::request_ptr CLASS::create_request() const NOEXCEPT
{
const auto out = system::to_shared<rpc::request>();
out->strict = true;
return out;
}

// Send.
// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -183,7 +194,7 @@ inline void CLASS::handle_send(const code& ec, size_t bytes,
// Typically a noop, but handshake may pause channel here.
handler(ec);

// Restart the listener (following response to request only).
// Restart the listener (only in response to requests).
if constexpr (is_same_type<Message, rpc::response_t>)
{
receive();
Expand Down
38 changes: 25 additions & 13 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,8 @@ class BCT_API proxy
/// Asserts/logs stopped.
virtual ~proxy() NOEXCEPT;

/// Pause reading from the socket (requires strand).
virtual void pause() NOEXCEPT;

/// Resume reading from the socket (requires strand).
virtual void resume() NOEXCEPT;

/// Reading from the socket is paused (requires strand).
virtual bool paused() const NOEXCEPT;
/// Stop.
/// -----------------------------------------------------------------------

/// Idempotent, may be called multiple times.
/// Stop socket, no delay, called by stop notify when iocontext is closing.
Expand All @@ -74,27 +68,45 @@ class BCT_API proxy
void subscribe_stop(result_handler&& handler,
result_handler&& complete) NOEXCEPT;

/// The channel strand.
asio::strand& strand() NOEXCEPT;
/// Pause.
/// -----------------------------------------------------------------------

/// Pause reading from the socket (requires strand).
virtual void pause() NOEXCEPT;

/// Resume reading from the socket (requires strand).
virtual void resume() NOEXCEPT;

/// Reading from the socket is paused (requires strand).
virtual bool paused() const NOEXCEPT;

/// Properties.
/// -----------------------------------------------------------------------

/// Get the network threadpool iocontext.
asio::context& service() const NOEXCEPT;

/// The channel strand.
asio::strand& strand() NOEXCEPT;

/// The strand is running in this thread.
bool stranded() const NOEXCEPT;

/// The proxy (socket) is stopped.
bool stopped() const NOEXCEPT;

/// The socket was accepted (vs. connected).
bool inbound() const NOEXCEPT;

/// Connection is currently secured (TLS or comparable for socket type).
bool secure() const NOEXCEPT;

/// The socket was upgraded to a websocket (requires strand).
bool websocket() const NOEXCEPT;

/// The total number of bytes queued/sent to the remote endpoint.
uint64_t total() const NOEXCEPT;

/// The socket was accepted (vs. connected).
bool inbound() const NOEXCEPT;

/// Get the address of the outgoing endpoint passed via construct.
const config::address& address() const NOEXCEPT;

Expand Down
38 changes: 19 additions & 19 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,35 +176,35 @@ class BCT_API socket
/// Properties.
/// -----------------------------------------------------------------------

/// Get the address of the outgoing endpoint passed via construct, or the
/// resolved endpoint address for incoming connections.
virtual const config::address& address() const NOEXCEPT;
/// Get the network threadpool iocontext.
virtual asio::context& service() const NOEXCEPT;

/// Get the endpoint of the remote host. Established by connection
/// resolution for incoming and non-proxied outgoing. For a proxied
/// connection (outgoing only) this is the value passed via construct.
virtual const config::endpoint& endpoint() const NOEXCEPT;
/// Get the strand of the socket.
virtual asio::strand& strand() NOEXCEPT;

/// The strand is running in this thread.
virtual bool stranded() const NOEXCEPT;

/// Stop has been signaled, work is stopping.
virtual bool stopped() const NOEXCEPT;

/// The socket was accepted (vs. connected).
virtual bool inbound() const NOEXCEPT;

/// The socket upgrades to its secure configuration upon connect.
virtual bool secure() const NOEXCEPT;

/// Stop has been signaled, work is stopping.
virtual bool stopped() const NOEXCEPT;

/// The strand is running in this thread.
virtual bool stranded() const NOEXCEPT;

/// Get the strand of the socket.
virtual asio::strand& strand() NOEXCEPT;
/// The socket was upgraded to a websocket (requires strand).
virtual bool websocket() const NOEXCEPT;

/// Get the network threadpool iocontext.
virtual asio::context& service() const NOEXCEPT;
/// Get the address of the outgoing endpoint passed via construct, or the
/// resolved endpoint address for incoming connections.
virtual const config::address& address() const NOEXCEPT;

/// The socket was upgraded to a websocket (requires strand).
virtual bool is_websocket() const NOEXCEPT;
/// Get the endpoint of the remote host. Established by connection
/// resolution for incoming and non-proxied outgoing. For a proxied
/// connection (outgoing only) this is the value passed via construct.
virtual const config::endpoint& endpoint() const NOEXCEPT;

protected:
using ws_t = std::variant<ref<ws::socket>, ref<ws::ssl::socket>>;
Expand Down
54 changes: 35 additions & 19 deletions src/channels/channel_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
namespace libbitcoin {
namespace network {

#define CLASS channel_http
#define CASE_REQUEST_TO_MODEL(verb_, request_, model_) \
case verb::verb_: \
model_.method = #verb_; \
Expand Down Expand Up @@ -65,8 +64,8 @@ void channel_http::resume() NOEXCEPT

// Read cycle.
// ----------------------------------------------------------------------------
// Failure to call receive() after successful message handling stalls channel.

// Failure to call after successful message handling causes stalled channel.
void channel_http::receive() NOEXCEPT
{
BC_ASSERT(stranded());
Expand All @@ -75,16 +74,9 @@ void channel_http::receive() NOEXCEPT
if (stopped() || paused() || reading_)
return;

// TODO: Extend support to batch (array of rpc).
// TODO: See notes in channel_rpc.ipp. This is the same except there must
// TODO: be an set of socket methods for incremental http-rpc. This can
// TODO: be handled by writing the header, whole rpc responses, and close.
// TODO: Boost beast provides these independent async calls for chunking.

reading_ = true;
const auto in = to_shared<request>();
const auto in = create_request();

// Post handle_read to strand upon stop, error, or buffer full.
read(request_buffer(), *in,
std::bind(&channel_http::handle_receive,
shared_from_base<channel_http>(), _1, _2, in));
Expand Down Expand Up @@ -114,8 +106,9 @@ void channel_http::handle_receive(const code& ec, size_t bytes,
return;
}

reading_ = false;
LOGA(log_message(*request, bytes));

reading_ = false;
dispatch(request);
}

Expand Down Expand Up @@ -152,11 +145,28 @@ void channel_http::dispatch(const request_cptr& request) NOEXCEPT
stop(code);
}

// request helpers
// ----------------------------------------------------------------------------

flat_buffer& channel_http::request_buffer() NOEXCEPT
{
return request_buffer_;
}

request_ptr channel_http::create_request() const NOEXCEPT
{
const auto out = to_shared<request>();
if (websocket())
{
// out->method() will return verb::unknown (mapped in dispatch).
out->method_string("websocket");
out->body() = http::json_value{};
out->body().plain_json = true;
}

return out;
}

// Send.
// ----------------------------------------------------------------------------

Expand All @@ -179,18 +189,24 @@ void channel_http::handle_send(const code& ec, size_t bytes,
if (ec) stop(ec);
LOGA(boost::format(message) % bytes);
handler(ec);

// Restart the listener (only in response to requests).
// TODO: use new ::send(rresponse, handler) method to differentiate.
receive();
}

// private
void channel_http::assign_json_buffer(response& ) NOEXCEPT
void channel_http::assign_json_buffer(response& response) NOEXCEPT
{
// TODO: limit to http (not full duplex safe).
////if (const auto& body = response.body();
//// body.contains<json_body::value_type>())
////{
//// auto& value = body.get<json_body::value_type>();
//// value.buffer = response_buffer_;
////}
// websocket is full duplex, so cannot use shared json repsonse buffer.
if (!websocket())
{
const auto& body = response.body();
if (body.contains<json_body::value_type>())
body.get<json_body::value_type>().buffer = response_buffer_;
else if (body.contains<rpc::request>())
body.get<rpc::request>().buffer = response_buffer_;
}
}

// unauthorized helpers
Expand Down
31 changes: 18 additions & 13 deletions src/net/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ proxy::~proxy() NOEXCEPT
// ----------------------------------------------------------------------------
// The proxy does not (must not) stop itself.

bool proxy::stopped() const NOEXCEPT
{
return socket_->stopped();
}

void proxy::stop(const code& ec) NOEXCEPT
{
if (stopped())
Expand Down Expand Up @@ -168,34 +163,44 @@ void proxy::do_reading() NOEXCEPT
// Properties.
// ----------------------------------------------------------------------------

asio::strand& proxy::strand() NOEXCEPT
asio::context& proxy::service() const NOEXCEPT
{
return socket_->strand();
return socket_->service();
}

asio::context& proxy::service() const NOEXCEPT
asio::strand& proxy::strand() NOEXCEPT
{
return socket_->service();
return socket_->strand();
}

bool proxy::stranded() const NOEXCEPT
{
return socket_->stranded();
}

bool proxy::stopped() const NOEXCEPT
{
return socket_->stopped();
}

bool proxy::inbound() const NOEXCEPT
{
return socket_->inbound();
}

bool proxy::secure() const NOEXCEPT
{
return socket_->secure();
}

uint64_t proxy::total() const NOEXCEPT
bool proxy::websocket() const NOEXCEPT
{
return total_.load(std::memory_order_relaxed);
return socket_->websocket();
}

bool proxy::inbound() const NOEXCEPT
uint64_t proxy::total() const NOEXCEPT
{
return socket_->inbound();
return total_.load(std::memory_order_relaxed);
}

const config::address& proxy::address() const NOEXCEPT
Expand Down
2 changes: 1 addition & 1 deletion src/net/proxy_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void proxy::read(http::flat_buffer& buffer, http::request& request,
void proxy::write(http::response&& response,
count_handler&& handler) NOEXCEPT
{
if (socket_->is_websocket())
if (socket_->websocket())
{
// Pointer ships moveable message through the send queue.
const auto out = move_shared(std::move(response));
Expand Down
Loading
Loading