Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions include/bitcoin/network/channels/channel_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ class BCT_API channel_http
virtual void handle_receive(const code& ec, size_t bytes,
const http::request_cptr& request) NOEXCEPT;
virtual void handle_send(const code& ec, size_t bytes,
const http::response_cptr& response,
const result_handler& handler) NOEXCEPT;
const std::string& message, const result_handler& handler) NOEXCEPT;

private:
void handle_unauthorized(const code& ec) NOEXCEPT;
void log_message(const http::request& request,
size_t bytes) const NOEXCEPT;
void log_message(const http::response& response,
std::string log_message(const http::response& response) const NOEXCEPT;
std::string log_message(const http::request& request,
size_t bytes) const NOEXCEPT;

// This is thread safe.
Expand Down
17 changes: 17 additions & 0 deletions include/bitcoin/network/messages/http_body.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,23 @@ struct BCT_API body
{
}

inline bool binary() const NOEXCEPT
{
return std::visit(overload
{
[&](const std::monostate&) NOEXCEPT { return false; },
[&](const empty_writer&) NOEXCEPT { return false; },
[&](const data_writer&) NOEXCEPT { return true; },
[&](const file_writer&) NOEXCEPT { return true; },
[&](const span_writer&) NOEXCEPT { return true; },
[&](const buffer_writer&) NOEXCEPT { return true; },
[&](const string_writer&) NOEXCEPT { return false; },
[&](const json_writer&) NOEXCEPT { return false; },
[&](const rpc::writer&) NOEXCEPT { return false; },
[&](const rpc::notifier&) NOEXCEPT { return false; }
}, writer_);
}

void init(boost_code& ec) NOEXCEPT;
out_buffer get(boost_code& ec) NOEXCEPT;

Expand Down
8 changes: 6 additions & 2 deletions include/bitcoin/network/net/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,26 @@ class BCT_API proxy
virtual void write(rpc::request&& notification,
count_handler&& handler) NOEXCEPT;

/// HTTP (generic/rpc).
/// HTTP/WS (generic/rpc).
/// -----------------------------------------------------------------------

/// Read http request from the socket, using provided buffer.
/// If socket is websocket request body type must have been set by caller.
virtual void read(http::flat_buffer& buffer, http::request& request,
count_handler&& handler) NOEXCEPT;

/// Write http response to the socket (json buffer in body).
virtual void write(http::response& response,
/// If socket is websocket body is written (headers ignored).
virtual void write(http::response&& response,
count_handler&& handler) NOEXCEPT;

private:
typedef std::function<void()> writer;
typedef std::deque<writer> queue;

// For write buffering.
void do_http_write(const http::response_ptr& response,
const count_handler& handler) NOEXCEPT;
void do_ws_write(const asio::const_buffer& payload, bool binary,
const count_handler& handler) NOEXCEPT;
void do_tcp_write(const asio::const_buffer& payload,
Expand Down
23 changes: 12 additions & 11 deletions include/bitcoin/network/net/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ class BCT_API socket
virtual void body_notify(http::request&& notification,
count_handler&& handler) NOEXCEPT;

/// HTTP (generic/rpc).
/// HTTP/WS (generic/rpc).
/// -----------------------------------------------------------------------

/// Read http request from the socket, handler posted to socket strand.
virtual void http_read(http::flat_buffer& buffer, http::request& request,
count_handler&& handler) NOEXCEPT;

/// Write http response to the socket, handler posted to socket strand.
virtual void http_write(http::response& response,
virtual void http_write(http::response&& response,
count_handler&& handler) NOEXCEPT;

/// Properties.
Expand Down Expand Up @@ -203,6 +203,9 @@ class BCT_API socket
/// Get the network threadpool iocontext.
virtual asio::context& service() const NOEXCEPT;

/// The socket was upgraded to a websocket (requires strand).
virtual bool is_websocket() const NOEXCEPT;

protected:
using ws_t = std::variant<ref<ws::socket>, ref<ws::ssl::socket>>;
using tcp_t = std::variant<ref<asio::socket>, ref<asio::ssl::socket>>;
Expand All @@ -215,16 +218,12 @@ class BCT_API socket
const config::address& address, const config::endpoint& endpoint,
bool proxied, bool inbound) NOEXCEPT;


/// Context.
/// -----------------------------------------------------------------------

/// The socket was upgraded to ssl (requires strand).
bool is_secure() const NOEXCEPT;

/// The socket was upgraded to a websocket (requires strand).
bool is_websocket() const NOEXCEPT;

/// The socket is not upgraded (asio::socket).
bool is_base() const NOEXCEPT;

Expand All @@ -245,6 +244,10 @@ class BCT_API socket
const count_handler& handler) NOEXCEPT;
void async_write(const asio::const_buffer& buffer, bool binary,
const count_handler& handler) NOEXCEPT;
void async_read_http(http::flat_buffer& buffer, http::request& request,
const count_handler& handler) NOEXCEPT;
void async_write_http(http::response&& response,
const count_handler& handler) NOEXCEPT;

private:
using http_parser = boost::beast::http::request_parser<http::body>;
Expand Down Expand Up @@ -339,7 +342,7 @@ class BCT_API socket
void do_http_read(ref<http::flat_buffer> buffer,
const ref<http::request>& request,
const count_handler& handler) NOEXCEPT;
void do_http_write(const ref<http::response>& response,
void do_http_write(const http::response_ptr& response,
const count_handler& handler) NOEXCEPT;

// handle
Expand Down Expand Up @@ -368,9 +371,6 @@ class BCT_API socket
// read/write (tcp/ws)
void handle_async(const boost_code& ec, size_t size,
const count_handler& handler,const std::string& operation) NOEXCEPT;
void handle_async_read(const boost_code& ec, size_t size,
const asio::mutable_buffer& buffer, const http::flat_buffer_ptr& flat,
const count_handler& handler) NOEXCEPT;

// rpc
void handle_rpc_read(const code& ec, size_t bytes,
Expand All @@ -383,11 +383,12 @@ class BCT_API socket
void handle_body_notify(boost_code ec, size_t size, size_t total,
const notify_state::ptr& out, const count_handler& handler) NOEXCEPT;

// http (generic/rpc)
// http/ws (native/rpc)
void handle_http_read(const boost_code& ec, size_t size,
const ref<http::request>& request, const http_parser_ptr& parser,
const count_handler& handler) NOEXCEPT;
void handle_http_write(const boost_code& ec, size_t size,
const http::response_ptr& request,
const count_handler& handler) NOEXCEPT;

// utility
Expand Down
92 changes: 42 additions & 50 deletions src/channels/channel_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void channel_http::handle_receive(const code& ec, size_t bytes,
}

reading_ = false;
log_message(*request, bytes);
LOGA(log_message(*request, bytes));
dispatch(request);
}

Expand Down Expand Up @@ -165,45 +165,38 @@ void channel_http::send(response&& response, result_handler&& handler) NOEXCEPT
BC_ASSERT(stranded());

assign_json_buffer(response);
const auto out = system::move_shared(std::move(response));
count_handler complete = std::bind(&channel_http::handle_send,
shared_from_base<channel_http>(), _1, _2, out, std::move(handler));
auto message = log_message(response);

if (!out)
{
complete(error::bad_alloc, {});
return;
}

// response has been moved to out.
write(*out, std::move(complete));
write(std::move(response),
std::bind(&channel_http::handle_send,
shared_from_base<channel_http>(), _1, _2, std::move(message),
std::move(handler)));
}

void channel_http::handle_send(const code& ec, size_t bytes,
const response_cptr& response, const result_handler& handler) NOEXCEPT
const std::string& message, const result_handler& handler) NOEXCEPT
{
if (ec)
stop(ec);

log_message(*response, bytes);
if (ec) stop(ec);
LOGA(boost::format(message) % bytes);
handler(ec);
}

// private
void channel_http::assign_json_buffer(response& response) NOEXCEPT
void channel_http::assign_json_buffer(response& ) NOEXCEPT
{
if (const auto& body = response.body();
body.contains<json_body::value_type>())
{
auto& value = body.get<json_body::value_type>();
value.buffer = response_buffer_;
}
// TODO: limit to http (not full duplex safe).
////if (const auto& body = response.body();
//// body.contains<json_body::value_type>())
////{
//// auto& value = body.get<json_body::value_type>();
//// value.buffer = response_buffer_;
////}
}

// unauthorized helpers
// ----------------------------------------------------------------------------

bool channel_http::unauthorized(const http::request& request) NOEXCEPT
bool channel_http::unauthorized(const request& request) NOEXCEPT
{
return options_.authorize() &&
(options_.credential() != request[field::authorization]);
Expand All @@ -218,35 +211,34 @@ void channel_http::handle_unauthorized(const code& ec) NOEXCEPT
// log helpers
// ----------------------------------------------------------------------------

void channel_http::log_message(const request& LOG_ONLY(request),
size_t LOG_ONLY(bytes)) const NOEXCEPT
std::string channel_http::log_message(const request& request,
size_t bytes) const NOEXCEPT
{
LOG_ONLY(const auto scheme = secure() ? "https" : "http";)
LOG_ONLY(const auto version = "http/" + serialize(request.version() / 10) +
"." + serialize(request.version() % 10);)

LOGA(scheme << " [" << request.method_string()
<< "] " << version
<< " (" << (request.chunked() ? "c" : serialize(bytes))
<< ") " << (request.keep_alive() ? "keep" : "drop")
<< " [" << endpoint() << "]"
<< " {" << (split(request[field::accept], ",").front()) << "...}"
<< " " << request.target());
const std::string scheme = secure() ? "https" : "http";
const std::string method = request.method_string();
const std::string keep = request.keep_alive() ? "keep" : "drop";
const std::string size = request.chunked() ? "c" : serialize(bytes);
const std::string accept = split(request[field::accept], ",").front();
const std::string version = "http/" + serialize(request.version() / 10) +
"." + serialize(request.version() % 10);

return scheme + " [" + method + "] " + version + " (" + size + ") " +
keep + " [" + endpoint().to_string() + "] " +
"{" + accept + "...} " + std::string(request.target());
}

void channel_http::log_message(const response& LOG_ONLY(response),
size_t LOG_ONLY(bytes)) const NOEXCEPT
std::string channel_http::log_message(const response& response) const NOEXCEPT
{
LOG_ONLY(const auto scheme = secure() ? "https" : "http";)
LOG_ONLY(const auto version = "http/" + serialize(response.version() / 10)
+ "." + serialize(response.version() % 10);)

LOGA(scheme << " [" << status_string(response.result())
<< "] " << version
<< " (" << (response.chunked() ? "c" : serialize(bytes))
<< ") " << (response.keep_alive() ? "keep" : "drop")
<< " [" << endpoint() << "]"
<< " {" << (response[field::content_type]) << "}");
const std::string scheme = secure() ? "https" : "http";
const std::string status = status_string(response.result());
const std::string keep = response.keep_alive() ? "keep" : "drop";
const std::string version = "http/" + serialize(response.version() / 10) +
"." + serialize(response.version() % 10);

// %1% is required placeholder for response size (filled by caller).
return scheme + " [" + status + "] " + version + " (%1%) " +
keep + " [" + endpoint().to_string() + "] " +
"{" + std::string(response[field::content_type]) + "}";
}

BC_POP_WARNING()
Expand Down
36 changes: 32 additions & 4 deletions src/net/proxy_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
namespace libbitcoin {
namespace network {

// Shared pointers required in handler parameters so closures control lifetime.
BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR)
BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED)
BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)

using namespace system;
Expand Down Expand Up @@ -155,7 +158,7 @@ void proxy::do_notification_write(const rpc::request_ptr& notification,
shared_from_this(), _1, _2, handler));
}

// HTTP (generic/rpc).
// HTTP/WS (generic/rpc).
// ----------------------------------------------------------------------------

// Method reading() is invoked directly if read() is called from strand().
Expand All @@ -166,13 +169,38 @@ void proxy::read(http::flat_buffer& buffer, http::request& request,
socket_->http_read(buffer, request, std::move(handler));
}

// Writes are composed but http is half duplex so there is no interleave risk.
void proxy::write(http::response& response,
void proxy::write(http::response&& response,
count_handler&& handler) NOEXCEPT
{
socket_->http_write(response, std::move(handler));
if (socket_->is_websocket())
{
// Pointer ships moveable message through the send queue.
const auto out = move_shared(std::move(response));
writer call = std::bind(&proxy::do_http_write,
shared_from_this(), out, std::move(handler));

boost::asio::dispatch(strand(),
std::bind(&proxy::do_write,
shared_from_this(), std::move(call)));
}
else
{
// http is half duplex so there is no interleave risk.
socket_->http_write(std::move(response), std::move(handler));
}
}

// private
void proxy::do_http_write(const http::response_ptr& response,
const count_handler& handler) NOEXCEPT
{
socket_->http_write(std::move(*response),
std::bind(&proxy::handle_write,
shared_from_this(), _1, _2, handler));
}

BC_POP_WARNING()
BC_POP_WARNING()
BC_POP_WARNING()

} // namespace network
Expand Down
Loading
Loading