Skip to content

Interconnecting with the UBShmTransport Based on the LD/ST Shared Memory Semantics.#3290

Open
zchuango wants to merge 21 commits intoapache:masterfrom
zchuango:ubshm_transport_dev
Open

Interconnecting with the UBShmTransport Based on the LD/ST Shared Memory Semantics.#3290
zchuango wants to merge 21 commits intoapache:masterfrom
zchuango:ubshm_transport_dev

Conversation

@zchuango
Copy link
Copy Markdown
Contributor

@zchuango zchuango commented May 9, 2026

What problem does this PR solve?

Issue Number: #3226 #3167 #3217

Problem Summary:
After recent efforts, the UB-Ring framework has been successfully integrated with the BRPC transport framework. Currently, high-performance and low-latency communication based on the load/store (LD/ST) semantics is supported. I feel happy be able to contribute this to the community and look forward to receiving feedback and reviews. @wwbmmm @chenBright

What is changed and the side effects?

Changed:

  1. The ubring framework is added. This framework implements low-latency data communication based on the shared memory LD/ST semantics.
  2. Currently, the ubring framework supports two modes: POSIX IPC shared memory and ubs-mem remote shared memory.
  3. The ub_shm_type parameter is used to control whether to use the IPC or ubs-mem capability. Currently, ubs-mem can run on the Kunpeng 950 supernode that supports the ub protocol.
    Side effects:
  • Performance effects: NAN

  • Breaking backward compatibility:


Check List:

#include "brpc/transport.h"

namespace brpc {
class UBShmTransport : public Transport {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class no need to indent

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new UBRing-based shared-memory transport mode to brpc (IPC + optional ubs-mem backend) and wires it into the Socket/Transport framework, along with docs and a performance example.

Changes:

  • Introduce UBRing transport (SOCKET_MODE_UBRING) with endpoint handshake, polling, and ring manager infrastructure.
  • Add shared-memory backend abstraction (POSIX IPC + ubs-mem via dlopen’d SDK stubs/headers) plus timer utilities.
  • Update build/docs/examples to expose the feature and provide a basic performance harness.

Reviewed changes

Copilot reviewed 43 out of 43 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
src/brpc/ubshm/ubs_mem/ubshmem_stub.cpp Adds stub implementations of ubs-mem APIs for non-ubs environments/UT.
src/brpc/ubshm/ubs_mem/ubs_mem.h Introduces ubs-mem C API header used by the UBS backend integration.
src/brpc/ubshm/ubs_mem/ubs_mem_def.h Defines ubs-mem types/constants used by the UBS backend integration.
src/brpc/ubshm/ubs_mem/declare_shm_ubs.h Declares the dynamically loaded ubs-mem function pointer table.
src/brpc/ubshm/ubr_trx.h Defines core UBR transaction structures and states.
src/brpc/ubshm/ubr_msg.h Defines UBR message chunk format used by the ring transport.
src/brpc/ubshm/ub_ring.h Declares UBRing read/write and lifecycle APIs used by the endpoint.
src/brpc/ubshm/ub_ring_manager.h Declares global manager for UBR transactions and link bookkeeping.
src/brpc/ubshm/ub_ring_manager.cpp Implements UBR transaction manager and UB event callback plumbing.
src/brpc/ubshm/ub_helper.h Declares UBRing global init/availability helpers.
src/brpc/ubshm/ub_helper.cpp Implements global init/fini, availability flags, and polling init.
src/brpc/ubshm/ub_endpoint.h Declares UB shared-memory endpoint and polling infrastructure.
src/brpc/ubshm/ub_endpoint.cpp Implements handshake, polling loop, and I/O integration with Socket/InputMessenger.
src/brpc/ubshm/timer/timer_mgr.h Declares timer module used by UBS cleanup/recovery flows.
src/brpc/ubshm/timer/timer_mgr.cpp Implements epoll/kqueue-based timer dispatch for UBRing subsystems.
src/brpc/ubshm/shm/shm_ubs.h Declares UBS backend shared-memory operations.
src/brpc/ubshm/shm/shm_ubs.cpp Implements UBS backend via dynamically loaded ubs-mem SDK.
src/brpc/ubshm/shm/shm_mgr.h Declares backend-agnostic SHM manager interface.
src/brpc/ubshm/shm/shm_mgr.cpp Implements SHM manager selecting IPC vs UBS backend via flag.
src/brpc/ubshm/shm/shm_ipc.h Declares POSIX IPC SHM backend operations.
src/brpc/ubshm/shm/shm_ipc.cpp Implements POSIX IPC SHM backend operations.
src/brpc/ubshm/shm/shm_def.h Adds SHM structs/constants used across SHM backends and UBRing.
src/brpc/ubshm/common/thread_lock.h Adds RAII-style mutex/spin/rwlock/semaphore guard macros.
src/brpc/ubshm/common/common.h Adds common macros/types/constants used throughout UBRing code.
src/brpc/ubshm_transport.h Declares UBShmTransport implementing the Transport interface.
src/brpc/ubshm_transport.cpp Implements transport selection between UBRing and TCP fallback paths.
src/brpc/transport_factory.cpp Wires SOCKET_MODE_UBRING into transport creation/context init.
src/brpc/socket.h Adds UB endpoint/connect friend declarations for Socket integration.
src/brpc/socket_mode.h Adds SOCKET_MODE_UBRING enum value.
src/brpc/rdma_transport.cpp Adjusts RDMA transport’s TCP fallback member initialization (currently broken).
src/brpc/input_messenger.h Adds UB endpoint friend declaration to support message processing hooks.
src/brpc/input_messenger.cpp Extends RDMA-special message queuing behavior to UBRing sockets.
src/brpc/controller.h Guards latency_us() against unset begin time.
README.md Adds docs link for UBRing.
README_cn.md Adds docs link for UBRing (CN).
example/ubring_performance/test.proto Adds proto for UBRing performance test example.
example/ubring_performance/server.cpp Adds UBRing-capable perf test server example.
example/ubring_performance/client.cpp Adds UBRing-capable perf test client example.
example/ubring_performance/CMakeLists.txt Adds standalone CMake build for the performance example.
docs/en/ubring.md Documents build/run/configuration and backend selection for UBRing.
docs/cn/ubring.md Chinese documentation for UBRing build/run/configuration.
CMakeLists.txt Adds WITH_UBRING option and compile definition wiring.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

_on_edge_trigger = rdma::RdmaEndpoint::OnNewDataFromTcp;
}
_tcp_transport = std::make_shared<TcpTransport>();
_tcp_transport = std::unique_ptr<TcpTransport>();
Comment on lines +49 to +54
if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
_on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp;
}
_tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport());
_tcp_transport->Init(socket, options);
}
Comment on lines +98 to +114
memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN);
}

std::string HelloMessage::toString() const {
constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + SHM_MAX_NAME_BUFF_LEN + 32;
std::array<char, MAX_LEN> buf;
int n = snprintf(buf.data(), buf.size(),
"msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s",
msg_len,
hello_ver,
impl_ver,
static_cast<unsigned long>(len), // 兼容32/64位
static_cast<int>(SHM_MAX_NAME_BUFF_LEN), // 限制最大输出长度
shm_name
);
return std::string(buf.data(), static_cast<size_t>(n));
}
Comment on lines +81 to +97
uint16_t* current_pos = (uint16_t*)data;
*(current_pos++) = butil::HostToNet16(msg_len);
*(current_pos++) = butil::HostToNet16(hello_ver);
*(current_pos++) = butil::HostToNet16(impl_ver);
uint64_t* len_pos = (uint64_t*)current_pos;
*len_pos = butil::HostToNet64(len);
current_pos += 4;
memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN);
}

void HelloMessage::Deserialize(void* data) {
uint16_t* current_pos = (uint16_t*)data;
msg_len = butil::NetToHost16(*current_pos++);
hello_ver = butil::NetToHost16(*current_pos++);
impl_ver = butil::NetToHost16(*current_pos++);
len = butil::NetToHost64(*(uint64_t*)current_pos);
current_pos += 4; // move forward 4 Bytes
auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get());
size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()};
const char* shm_name = butil::endpoint2str(s->local_side()).c_str();
Comment on lines +210 to +218
uint32_t ubrIndex = trx->trxMgrIndex;
char* connectName = trx->localShm.name;
if (g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] == UBR_MGR_UNIT_FREE) {
strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].connectName,
connectName, SHM_MAX_NAME_BUFF_LEN);
strncpy(g_linkInfoMgr.allLinkInfo[ubrIndex].listenerName,
listenerName, SHM_MAX_NAME_BUFF_LEN);
g_linkInfoMgr.linkMgrUnitStatus[ubrIndex] = UBR_MGR_UNIT_USED;
g_linkInfoMgr.linkNum++;
Comment on lines +515 to +519
LOCK_GUARD(shmList->shmLock);
if (UNLIKELY(shmList == NULL)) {
LOG(ERROR) << "Shm list is null.";
return UBRING_ERR;
}
Comment on lines +30 to +67
DEFINE_int32(ub_shm_type, 1, "shm type: 1-ipc; 2-ub_ring");
static SHM_TYPE g_shmType;

static bool CheckInputShmParam(SHM *shm) {
if (shm == NULL) {
LOG(ERROR) << "Input Param shm is NULL.";
return false;
}

size_t nameLen = strlen(shm->name);
if (nameLen <= 0 || nameLen > SHM_MAX_NAME_LEN) {
LOG(ERROR) << "Shm name=" << shm->name << ", length=" << shm->len
<< ", which is not between 1 and " << SHM_MAX_NAME_LEN;
return false;
}

if (shm->len <= 0) {
LOG(ERROR) << "Shm length=" << shm->len << " is invalid.";
return false;
}

if (shm->len < SHM_ALLOC_UNIT_SIZE || (shm->len & (SHM_ALLOC_UNIT_SIZE - 1)) != 0) {
LOG(ERROR) << "Shm length=" << shm->len << " need to be (1..n) * 4MB.";
return false;
}

return true;
}

RETURN_CODE ShmMgrInit(void) {
if (UNLIKELY(FLAGS_ub_shm_type >= (uint32_t)SHM_TYPE_UNSUPPORT)) {
LOG(ERROR) << "Shm type config=" << FLAGS_ub_shm_type << " is not supported.";
return UBRING_ERR;
}

g_shmType = (SHM_TYPE)FLAGS_ub_shm_type;
if (g_shmType == SHM_TYPE_UBS) {
if (UbsShmInit() != UBRING_OK) {
Comment thread CMakeLists.txt
${MCPACK2PB_SOURCES}
${BRPC_SOURCES}
${THRIFT_SOURCES}
${BRPC_C_SOURCES}
Comment on lines +31 to +54
void UBShmTransport::Init(Socket *socket, const SocketOptions &options) {
CHECK(_ub_ep == NULL);
if (options.socket_mode == SOCKET_MODE_UBRING) {
_ub_ep = new(std::nothrow)ubring::UBShmEndpoint(socket);
if (!_ub_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create UBShmEndpoint";
socket->SetFailed(
saved_errno, "Fail to create UBShmEndpoint: %s", berror(saved_errno));
}
_ub_state = UB_UNKNOWN;
} else {
_ub_state = UB_OFF;
socket->_socket_mode = SOCKET_MODE_TCP;
}
_socket = socket;
_default_connect = options.app_connect;
_on_edge_trigger = options.on_edge_triggered_events;
if (options.need_on_edge_trigger && _on_edge_trigger == NULL) {
_on_edge_trigger = ubring::UBShmEndpoint::OnNewDataFromTcp;
}
_tcp_transport = std::unique_ptr<TcpTransport>(new TcpTransport());
_tcp_transport->Init(socket, options);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants