Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/butil/iobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,31 +323,47 @@ IOBuf::Block* share_tls_block() {
// NOTE: b MUST be non-NULL and all blocks linked SHOULD not be full.
void release_tls_block_chain(IOBuf::Block* b) {
TLSData& tls_data = g_tls_data;
size_t n = 0;
size_t n_hit_threshold = 0;
if (tls_data.num_blocks >= max_blocks_per_thread()) {
do {
++n;
IOBuf::Block* const saved_next = b->u.portal_next;
b->dec_ref();
// If b is already cached in TLS, dec_ref() would drop a reference
// still owned by the TLS list and leave a dangling pointer behind.
if (!is_in_tls_block_chain(tls_data.block_head, b)) {
b->dec_ref();
++n_hit_threshold;
}
b = saved_next;
} while (b);
g_num_hit_tls_threshold.fetch_add(n, butil::memory_order_relaxed);
g_num_hit_tls_threshold.fetch_add(
n_hit_threshold, butil::memory_order_relaxed);
return;
}
IOBuf::Block* first_b = b;
IOBuf::Block* last_b = NULL;
size_t n_to_tls = 0;
do {
++n;
CHECK(!b->full());
// Guard against overlap with any node already cached in TLS, not just
// block_head. Returning X into H -> X would create a 2-node cycle
// X -> H -> X. If the overlap happens after a unique prefix such as
// A -> X, keep the prefix by linking A directly to the old TLS head.
if (is_in_tls_block_chain(tls_data.block_head, b)) {
break;
}
++n_to_tls;
last_b = b;
if (b->u.portal_next == NULL) {
last_b = b;
break;
}
b = b->u.portal_next;
} while (true);
if (last_b == NULL) {
return;
}
last_b->u.portal_next = tls_data.block_head;
tls_data.block_head = first_b;
tls_data.num_blocks += n;
tls_data.num_blocks += n_to_tls;
if (!tls_data.registered) {
tls_data.registered = true;
butil::thread_atexit(remove_tls_block_chain);
Expand Down
16 changes: 16 additions & 0 deletions src/butil/iobuf_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,12 +603,28 @@ void remove_tls_block_chain();

IOBuf::Block* acquire_tls_block();

inline bool is_in_tls_block_chain(IOBuf::Block* head, IOBuf::Block* b) {
for (IOBuf::Block* p = head; p != NULL; p = p->u.portal_next) {
if (p == b) {
return true;
}
}
return false;
}

// Return one block to TLS.
inline void release_tls_block(IOBuf::Block* b) {
if (!b) {
return;
}
TLSData *tls_data = get_g_tls_data();
// Guard against duplicate return anywhere in the TLS list. Checking only
// block_head misses cases like H -> X where returning X again would create
// a 2-node cycle X -> H -> X. The TLS list is short (soft-limited), so a
// linear scan is cheap here.
if (is_in_tls_block_chain(tls_data->block_head, b)) {
return;
}
if (b->full()) {
b->dec_ref();
} else if (tls_data->num_blocks >= max_blocks_per_thread()) {
Expand Down
304 changes: 304 additions & 0 deletions test/iobuf_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1946,4 +1946,308 @@ TEST_F(IOBufTest, single_iobuf) {
ASSERT_TRUE(p != nullptr);
}

// Regression tests for https://github.com/apache/brpc/issues/3243
//
// A duplicate return of a block that's already in the TLS free list can create
// either a self-loop (when the block is block_head) or a longer cycle (when
// the block already exists deeper in the list). Any later traversal then
// spins forever. Run these tests in a dedicated pthread so the main thread's
// TLS remains untouched.

static bool tls_block_chain_has_cycle() {
butil::IOBuf::Block* slow = butil::iobuf::get_tls_block_head();
butil::IOBuf::Block* fast = slow;
while (fast != NULL) {
fast = butil::iobuf::get_portal_next(fast);
if (fast == NULL) {
return false;
}
slow = butil::iobuf::get_portal_next(slow);
fast = butil::iobuf::get_portal_next(fast);
if (slow == fast) {
return true;
}
}
return false;
}

static void dec_ref_distinct_blocks(butil::IOBuf::Block* b1,
butil::IOBuf::Block* b2,
butil::IOBuf::Block* b3 = NULL) {
if (b1 != NULL) {
b1->dec_ref();
}
if (b2 != NULL && b2 != b1) {
b2->dec_ref();
}
if (b3 != NULL && b3 != b1 && b3 != b2) {
b3->dec_ref();
}
}

static void cleanup_corrupted_tls_chain(int drain_times) {
butil::IOBuf::Block* b1 = NULL;
butil::IOBuf::Block* b2 = NULL;
butil::IOBuf::Block* b3 = NULL;
if (drain_times >= 1) {
b1 = butil::iobuf::acquire_tls_block();
}
if (drain_times >= 2) {
b2 = butil::iobuf::acquire_tls_block();
}
if (drain_times >= 3) {
b3 = butil::iobuf::acquire_tls_block();
}
dec_ref_distinct_blocks(b1, b2, b3);
}

static void* double_return_release_tls_block_head_thread(void* arg) {
bool* has_cycle = static_cast<bool*>(arg);
butil::iobuf::remove_tls_block_chain();

butil::IOBuf::Block* b = butil::iobuf::acquire_tls_block();
if (!b) {
return NULL;
}
butil::iobuf::release_tls_block(b);
butil::iobuf::release_tls_block(b);

if (tls_block_chain_has_cycle()) {
*has_cycle = true;
cleanup_corrupted_tls_chain(2);
} else {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_release_tls_block_head_no_cycle) {
bool has_cycle = false;
pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
double_return_release_tls_block_head_thread,
&has_cycle));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(has_cycle)
<< "release_tls_block() created a cycle when the same block was "
"returned twice while already cached in TLS. (GitHub issue #3243)";
}

static void* double_return_release_tls_block_non_head_thread(void* arg) {
bool* has_cycle = static_cast<bool*>(arg);
butil::iobuf::remove_tls_block_chain();

butil::IOBuf::Block* tail = butil::iobuf::acquire_tls_block();
butil::IOBuf::Block* head = butil::iobuf::acquire_tls_block();
if (!tail || !head) {
dec_ref_distinct_blocks(tail, head);
return NULL;
}

butil::iobuf::release_tls_block(tail);
butil::iobuf::release_tls_block(head);
// TLS: head -> tail -> NULL. Returning tail again used to create
// tail -> head -> tail.
butil::iobuf::release_tls_block(tail);

if (tls_block_chain_has_cycle()) {
*has_cycle = true;
cleanup_corrupted_tls_chain(3);
} else {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_release_tls_block_non_head_no_cycle) {
bool has_cycle = false;
pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
double_return_release_tls_block_non_head_thread,
&has_cycle));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(has_cycle)
<< "release_tls_block() created a cycle when a block already present "
"deeper in the TLS list was returned again. "
"(GitHub issue #3243)";
}

static void* double_return_release_tls_block_chain_head_thread(void* arg) {
bool* has_cycle = static_cast<bool*>(arg);
butil::iobuf::remove_tls_block_chain();

butil::IOBuf::Block* b = butil::iobuf::acquire_tls_block();
if (!b) {
return NULL;
}
butil::iobuf::release_tls_block(b);
b->u.portal_next = NULL;
butil::iobuf::release_tls_block_chain(b);

if (tls_block_chain_has_cycle()) {
*has_cycle = true;
cleanup_corrupted_tls_chain(2);
} else {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_release_tls_block_chain_head_no_cycle) {
bool has_cycle = false;
pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
double_return_release_tls_block_chain_head_thread,
&has_cycle));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(has_cycle)
<< "release_tls_block_chain() created a cycle when the returned chain "
"overlapped the TLS head. (GitHub issue #3243)";
}

static void* double_return_release_tls_block_chain_non_head_thread(void* arg) {
bool* has_cycle = static_cast<bool*>(arg);
butil::iobuf::remove_tls_block_chain();

butil::IOBuf::Block* tail = butil::iobuf::acquire_tls_block();
butil::IOBuf::Block* head = butil::iobuf::acquire_tls_block();
if (!tail || !head) {
dec_ref_distinct_blocks(tail, head);
return NULL;
}

butil::iobuf::release_tls_block(tail);
butil::iobuf::release_tls_block(head);
// TLS: head -> tail -> NULL. Returning the single-node chain [tail]
// again used to create tail -> head -> tail.
tail->u.portal_next = NULL;
butil::iobuf::release_tls_block_chain(tail);

if (tls_block_chain_has_cycle()) {
*has_cycle = true;
cleanup_corrupted_tls_chain(3);
} else {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_release_tls_block_chain_non_head_no_cycle) {
bool has_cycle = false;
pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
double_return_release_tls_block_chain_non_head_thread,
&has_cycle));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(has_cycle)
<< "release_tls_block_chain() created a cycle when the returned chain "
"contained a block already present deeper in TLS. "
"(GitHub issue #3243)";
}

struct PartialOverlapResult {
bool has_cycle;
int tls_block_count;
};

static void* partial_overlap_release_tls_block_chain_thread(void* arg) {
PartialOverlapResult* result = static_cast<PartialOverlapResult*>(arg);
result->has_cycle = false;
result->tls_block_count = -1;
butil::iobuf::remove_tls_block_chain();

butil::IOBuf::Block* tail = butil::iobuf::acquire_tls_block();
butil::IOBuf::Block* head = butil::iobuf::acquire_tls_block();
butil::IOBuf::Block* prefix = butil::iobuf::acquire_tls_block();
if (!tail || !head || !prefix) {
dec_ref_distinct_blocks(tail, head, prefix);
return NULL;
}

butil::iobuf::release_tls_block(tail);
butil::iobuf::release_tls_block(head);
// TLS: head -> tail -> NULL. Returning prefix -> tail should preserve
// prefix and avoid both the tail->head->tail cycle and leaking prefix.
prefix->u.portal_next = tail;
butil::iobuf::release_tls_block_chain(prefix);

result->has_cycle = tls_block_chain_has_cycle();
result->tls_block_count = butil::iobuf::get_tls_block_count();
if (result->has_cycle) {
cleanup_corrupted_tls_chain(3);
} else {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_release_tls_block_chain_partial_overlap_keeps_prefix) {
install_debug_allocator();

PartialOverlapResult result;
result.has_cycle = false;
result.tls_block_count = -1;

pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
partial_overlap_release_tls_block_chain_thread,
&result));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(result.has_cycle)
<< "release_tls_block_chain() created a cycle when a unique prefix "
"was returned ahead of a block already cached in TLS. "
"(GitHub issue #3243)";
EXPECT_EQ(3, result.tls_block_count)
<< "release_tls_block_chain() dropped the unique prefix when the "
"returned chain overlapped the TLS list. (GitHub issue #3243)";
}

// Reproduce the issue through IOBufAsZeroCopyOutputStream::BackUp(). BackUp()
// eagerly returns _cur_block to TLS. A subsequent release of the same pointer
// used to create a self-loop at the head.
static void* backup_double_return_thread(void* arg) {
bool* has_cycle = static_cast<bool*>(arg);
butil::iobuf::remove_tls_block_chain();

butil::IOBuf buf;
{
butil::IOBufAsZeroCopyOutputStream stream(&buf);
void* data = NULL;
int size = 0;
EXPECT_TRUE(stream.Next(&data, &size));
stream.BackUp(size);

butil::IOBuf::Block* head = butil::iobuf::get_tls_block_head();
EXPECT_TRUE(head != NULL);
butil::iobuf::release_tls_block(head);

if (tls_block_chain_has_cycle()) {
*has_cycle = true;
cleanup_corrupted_tls_chain(2);
}
}
if (!*has_cycle) {
butil::iobuf::remove_tls_block_chain();
}
return NULL;
}

TEST_F(IOBufTest, regression_3243_backup_then_double_release_no_cycle) {
bool has_cycle = false;
pthread_t tid;
ASSERT_EQ(0, pthread_create(&tid, NULL,
backup_double_return_thread, &has_cycle));
ASSERT_EQ(0, pthread_join(tid, NULL));

EXPECT_FALSE(has_cycle)
<< "After IOBufAsZeroCopyOutputStream::BackUp() returned a block to "
"TLS, a second release_tls_block() created a cycle. "
"(GitHub issue #3243)";
}

} // namespace