Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ new_features:
change: |
Added filter state read/write support for dynamic module cert validators, allowing modules to set and
get string values in the connection's filter state during certificate chain verification.
- area: dynamic_modules
change: |
Added HTTP callout support for dynamic module listener filters, enabling listener filters to initiate
asynchronous HTTP requests to upstream clusters and receive responses via the ``send_http_callout`` ABI
callback and ``on_listener_filter_http_callout_done`` event hook.
- area: stats
change: |
Added support to limit the number of stats stored in each stats scope in the stats library.
Expand Down
52 changes: 52 additions & 0 deletions source/extensions/dynamic_modules/abi/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -4018,6 +4018,32 @@ void envoy_dynamic_module_on_listener_filter_config_scheduled(
envoy_dynamic_module_type_listener_filter_config_module_ptr filter_config_module_ptr,
uint64_t event_id);

/**
* envoy_dynamic_module_on_listener_filter_http_callout_done is called when the HTTP callout
* response is received initiated by a listener filter.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleListenerFilter object of the
* corresponding listener filter.
* @param filter_module_ptr is the pointer to the in-module listener filter created by
* envoy_dynamic_module_on_listener_filter_new.
* @param callout_id is the ID of the callout. This is used to differentiate between multiple
* calls.
* @param result is the result of the callout.
* @param headers is the headers of the response.
* @param headers_size is the size of the headers.
* @param body_chunks is the body of the response.
* @param body_chunks_size is the size of the body.
*
* headers and body_chunks are owned by Envoy, and they are guaranteed to be valid until the end of
* this event hook. They may be null if the callout fails or the response is empty.
*/
void envoy_dynamic_module_on_listener_filter_http_callout_done(
envoy_dynamic_module_type_listener_filter_envoy_ptr filter_envoy_ptr,
envoy_dynamic_module_type_listener_filter_module_ptr filter_module_ptr, uint64_t callout_id,
envoy_dynamic_module_type_http_callout_result result,
envoy_dynamic_module_type_envoy_http_header* headers, size_t headers_size,
envoy_dynamic_module_type_envoy_buffer* body_chunks, size_t body_chunks_size);

// =============================================================================
// Listener Filter Callbacks
// =============================================================================
Expand Down Expand Up @@ -4545,6 +4571,32 @@ envoy_dynamic_module_callback_listener_filter_record_histogram_value(
envoy_dynamic_module_type_listener_filter_envoy_ptr filter_envoy_ptr, size_t id,
uint64_t value);

// ---------------------- Listener Filter Callbacks - HTTP Callout ---------------

/**
* envoy_dynamic_module_callback_listener_filter_http_callout is called by the module to initiate an
* HTTP callout. The callout is initiated by the listener filter and the response is received in
* envoy_dynamic_module_on_listener_filter_http_callout_done.
*
* @param filter_envoy_ptr is the pointer to the DynamicModuleListenerFilter object of the
* corresponding listener filter.
* @param callout_id_out is a pointer to a variable where the callout ID will be stored. This can be
* arbitrary and is used to differentiate between multiple calls from the same filter.
* @param cluster_name is the name of the cluster to which the callout is sent.
* @param headers is the headers of the request. It must contain :method, :path and host headers.
* @param headers_size is the size of the headers.
* @param body is the body of the request.
* @param timeout_milliseconds is the timeout for the callout in milliseconds.
* @return envoy_dynamic_module_type_http_callout_init_result is the result of the callout
* initialization.
*/
envoy_dynamic_module_type_http_callout_init_result
envoy_dynamic_module_callback_listener_filter_http_callout(
envoy_dynamic_module_type_listener_filter_envoy_ptr filter_envoy_ptr, uint64_t* callout_id_out,
envoy_dynamic_module_type_module_buffer cluster_name,
envoy_dynamic_module_type_module_http_header* headers, size_t headers_size,
envoy_dynamic_module_type_module_buffer body, uint64_t timeout_milliseconds);

// ---------------------- Listener filter scheduler callbacks -----------------

/**
Expand Down
144 changes: 144 additions & 0 deletions source/extensions/dynamic_modules/sdk/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5950,6 +5950,23 @@ pub trait ListenerFilter<ELF: EnvoyListenerFilter> {
/// This is called when the listener filter is destroyed for each accepted connection.
fn on_close(&mut self, _envoy_filter: &mut ELF) {}

/// This is called when the HTTP callout response is received initiated by a listener filter.
///
/// * `envoy_filter` can be used to interact with the underlying Envoy filter object.
/// * `callout_id` is the ID of the callout. This is used to differentiate between multiple calls.
/// * `result` is the result of the callout.
/// * `response_headers` is a list of key-value pairs of the response headers. This is optional.
/// * `response_body` is the response body. This is optional.
fn on_http_callout_done(
&mut self,
_envoy_filter: &mut ELF,
_callout_id: u64,
_result: abi::envoy_dynamic_module_type_http_callout_result,
_response_headers: Vec<(EnvoyBuffer, EnvoyBuffer)>,
_response_body: Vec<EnvoyBuffer>,
) {
}

/// This is called when the new event is scheduled via the
/// [`EnvoyListenerFilterScheduler::commit`] for this [`ListenerFilter`].
///
Expand Down Expand Up @@ -6076,6 +6093,33 @@ pub trait EnvoyListenerFilter {
value: u64,
) -> Result<(), abi::envoy_dynamic_module_type_metrics_result>;

/// Send an HTTP callout to the given cluster with the given headers and optional body.
///
/// Headers must contain the `:method`, `:path`, and `host` headers.
///
/// This returns the status and callout id of the callout. The id is used to
/// distinguish different callouts made from the same filter and is generated by Envoy.
/// The meaning of the status is:
///
/// * Success: The callout was sent successfully.
/// * MissingRequiredHeaders: One of the required headers is missing: `:method`, `:path`, or
/// `host`.
/// * ClusterNotFound: The cluster with the given name was not found.
/// * CannotCreateRequest: The request could not be created. This happens when, for example,
/// there's no healthy upstream host in the cluster.
///
/// The callout result will be delivered to the [`ListenerFilter::on_http_callout_done`] method.
fn send_http_callout<'a>(
&mut self,
_cluster_name: &'a str,
_headers: Vec<(&'a str, &'a [u8])>,
_body: Option<&'a [u8]>,
_timeout_milliseconds: u64,
) -> (
abi::envoy_dynamic_module_type_http_callout_init_result,
u64, // callout handle
);

/// Create a new implementation of the [`EnvoyListenerFilterScheduler`] trait.
///
/// ## Example Usage
Expand Down Expand Up @@ -6656,6 +6700,52 @@ impl EnvoyListenerFilter for EnvoyListenerFilterImpl {
}
}

fn send_http_callout<'a>(
&mut self,
cluster_name: &'a str,
headers: Vec<(&'a str, &'a [u8])>,
body: Option<&'a [u8]>,
timeout_milliseconds: u64,
) -> (abi::envoy_dynamic_module_type_http_callout_init_result, u64) {
let mut callout_id: u64 = 0;

// Convert headers to module HTTP headers.
let module_headers: Vec<abi::envoy_dynamic_module_type_module_http_header> = headers
.iter()
.map(|(k, v)| abi::envoy_dynamic_module_type_module_http_header {
key_ptr: k.as_ptr() as *const _,
key_length: k.len(),
value_ptr: v.as_ptr() as *const _,
value_length: v.len(),
})
.collect();

let body_buffer = match body {
Some(b) => abi::envoy_dynamic_module_type_module_buffer {
ptr: b.as_ptr() as *const _,
length: b.len(),
},
None => abi::envoy_dynamic_module_type_module_buffer {
ptr: std::ptr::null(),
length: 0,
},
};

let result = unsafe {
abi::envoy_dynamic_module_callback_listener_filter_http_callout(
self.raw,
&mut callout_id,
str_to_module_buffer(cluster_name),
module_headers.as_ptr() as *mut _,
module_headers.len(),
body_buffer,
timeout_milliseconds,
)
};

(result, callout_id)
}

fn new_scheduler(&self) -> impl EnvoyListenerFilterScheduler + 'static {
unsafe {
let scheduler_ptr =
Expand Down Expand Up @@ -6802,6 +6892,60 @@ pub extern "C" fn envoy_dynamic_module_on_listener_filter_config_scheduled(
filter_config.on_config_scheduled(event_id);
}

/// # Safety
///
/// Caller must ensure `filter_ptr`, `headers`, and `body_chunks` point to valid memory for the
/// provided sizes, and that the pointed-to data lives for the duration of this call.
#[no_mangle]
pub unsafe extern "C" fn envoy_dynamic_module_on_listener_filter_http_callout_done(
envoy_ptr: abi::envoy_dynamic_module_type_listener_filter_envoy_ptr,
filter_ptr: abi::envoy_dynamic_module_type_listener_filter_module_ptr,
callout_id: u64,
result: abi::envoy_dynamic_module_type_http_callout_result,
headers: *const abi::envoy_dynamic_module_type_envoy_http_header,
headers_size: usize,
body_chunks: *const abi::envoy_dynamic_module_type_envoy_buffer,
body_chunks_size: usize,
) {
let filter = filter_ptr as *mut Box<dyn ListenerFilter<EnvoyListenerFilterImpl>>;
let filter = unsafe { &mut *filter };

// Convert headers to Vec<(EnvoyBuffer, EnvoyBuffer)>.
let header_vec = if headers.is_null() || headers_size == 0 {
Vec::new()
} else {
let headers_slice = unsafe { std::slice::from_raw_parts(headers, headers_size) };
headers_slice
.iter()
.map(|h| {
(
unsafe { EnvoyBuffer::new_from_raw(h.key_ptr as *const _, h.key_length) },
unsafe { EnvoyBuffer::new_from_raw(h.value_ptr as *const _, h.value_length) },
)
})
.collect()
};

// Convert body chunks to Vec<EnvoyBuffer>.
let body_vec = if body_chunks.is_null() || body_chunks_size == 0 {
Vec::new()
} else {
let chunks_slice = unsafe { std::slice::from_raw_parts(body_chunks, body_chunks_size) };
chunks_slice
.iter()
.map(|c| unsafe { EnvoyBuffer::new_from_raw(c.ptr as *const _, c.length) })
.collect()
};

filter.on_http_callout_done(
&mut EnvoyListenerFilterImpl::new(envoy_ptr),
callout_id,
result,
header_vec,
body_vec,
);
}

// =============================================================================
// UDP Listener Filter Support
// =============================================================================
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/filters/listener/dynamic_modules/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_library(
hdrs = ["filter_config.h"],
deps = [
"//envoy/stats:stats_interface",
"//envoy/upstream:cluster_manager_interface",
"//source/common/config:utility_lib",
"//source/common/stats:utility_lib",
"//source/extensions/dynamic_modules:dynamic_modules_lib",
Expand All @@ -30,10 +31,12 @@ envoy_cc_library(
hdrs = ["filter.h"],
deps = [
":filter_config_lib",
"//envoy/http:async_client_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_filter_buffer_interface",
"//source/common/common:logger_lib",
"//source/common/http:message_lib",
"//source/common/network:address_lib",
"//source/common/network:utility_lib",
"//source/common/router:string_accessor_lib",
Expand Down
40 changes: 40 additions & 0 deletions source/extensions/filters/listener/dynamic_modules/abi_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/network/listen_socket.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/message_impl.h"
#include "source/common/network/address_impl.h"
#include "source/common/network/utility.h"
#include "source/common/protobuf/utility.h"
Expand Down Expand Up @@ -746,6 +747,45 @@ envoy_dynamic_module_callback_listener_filter_record_histogram_value(
return envoy_dynamic_module_type_metrics_result_Success;
}

// -----------------------------------------------------------------------------
// HTTP Callout Callbacks
// -----------------------------------------------------------------------------

envoy_dynamic_module_type_http_callout_init_result
envoy_dynamic_module_callback_listener_filter_http_callout(
envoy_dynamic_module_type_listener_filter_envoy_ptr filter_envoy_ptr, uint64_t* callout_id_out,
envoy_dynamic_module_type_module_buffer cluster_name,
envoy_dynamic_module_type_module_http_header* headers, size_t headers_size,
envoy_dynamic_module_type_module_buffer body, uint64_t timeout_milliseconds) {
auto* filter = static_cast<DynamicModuleListenerFilter*>(filter_envoy_ptr);

// Build the request message.
Http::RequestMessagePtr message = std::make_unique<Http::RequestMessageImpl>();

// Add headers.
for (size_t i = 0; i < headers_size; i++) {
const auto& header = headers[i];
message->headers().addCopy(
Http::LowerCaseString(std::string(header.key_ptr, header.key_length)),
std::string(header.value_ptr, header.value_length));
}

// Add body if present.
if (body.length > 0 && body.ptr != nullptr) {
message->body().add(body.ptr, body.length);
}

// Validate required headers.
if (message->headers().Method() == nullptr || message->headers().Path() == nullptr ||
message->headers().Host() == nullptr) {
return envoy_dynamic_module_type_http_callout_init_result_MissingRequiredHeaders;
}

// Send the callout.
return filter->sendHttpCallout(callout_id_out, std::string(cluster_name.ptr, cluster_name.length),
std::move(message), timeout_milliseconds);
}

// -----------------------------------------------------------------------------
// Scheduler Callbacks
// -----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ DynamicModuleListenerFilterConfigFactory::createListenerFilterFactoryFromProto(
auto filter_config =
Extensions::DynamicModules::ListenerFilters::newDynamicModuleListenerFilterConfig(
proto_config.filter_name(), filter_config_str, metrics_namespace,
std::move(dynamic_module.value()), context.listenerScope(),
context.serverFactoryContext().mainThreadDispatcher());
std::move(dynamic_module.value()), context.serverFactoryContext().clusterManager(),
context.listenerScope(), context.serverFactoryContext().mainThreadDispatcher());

if (!filter_config.ok()) {
throw EnvoyException("Failed to create filter config: " +
Expand Down
Loading
Loading