From 8ce61f90ba4aea3d52e92e258c3803b8b885726e Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Wed, 31 Dec 2025 13:54:00 +0100 Subject: [PATCH 1/2] Thread::Queue use a ring buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread::Queue spends a significant amount of time in array functions, checking for invariants we know aren't a problem, and whether the backing array need to reordered. By using a ring buffer we can remove a lot of overhead (~23% faster). ``` $ hyperfine './miniruby --yjit /tmp/q.rb' './miniruby-qrb --yjit /tmp/q.rb' Benchmark 1: ./miniruby --yjit /tmp/q.rb Time (mean ± σ): 1.050 s ± 0.191 s [User: 0.988 s, System: 0.004 s] Range (min … max): 0.984 s … 1.595 s 10 runs Benchmark 2: ./miniruby-qrb --yjit /tmp/q.rb Time (mean ± σ): 844.2 ms ± 3.1 ms [User: 840.4 ms, System: 2.8 ms] Range (min … max): 838.6 ms … 848.9 ms 10 runs Summary ./miniruby-qrb --yjit /tmp/q.rb ran 1.24 ± 0.23 times faster than ./miniruby --yjit /tmp/q.rb ``` ``` q = Queue.new([1, 2, 3, 4, 5, 6, 7, 8]) i = 2_000_000 while i > 0 i -= 1 q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) q.push(q.pop) end ``` --- thread_sync.c | 224 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 157 insertions(+), 67 deletions(-) diff --git a/thread_sync.c b/thread_sync.c index a93888fad02ae6..72341762399a8f 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -691,47 +691,57 @@ rb_mutex_allow_trap(VALUE self, int val) /* Queue */ -#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) -#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que) -RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN() struct rb_queue { struct ccan_list_head waitq; rb_serial_t fork_gen; - const VALUE que; + long capa; + long len; + long offset; + VALUE *buffer; int num_waiting; -} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END(); +}; + +#define szqueue_waitq(sq) &sq->q.waitq +#define szqueue_pushq(sq) &sq->pushq -#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq) -#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que) -#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq) -RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN() struct rb_szqueue { struct rb_queue q; int num_waiting_push; struct ccan_list_head pushq; long max; -} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END(); +}; static void queue_mark_and_move(void *ptr) { struct rb_queue *q = ptr; - /* no need to mark threads in waitq, they are on stack */ - rb_gc_mark_and_move((VALUE *)UNALIGNED_MEMBER_PTR(q, que)); + for (long index = 0; index < q->len; index++) { + rb_gc_mark_and_move(&q->buffer[((q->offset + index) % q->capa)]); + } +} + +static void +queue_free(void *ptr) +{ + struct rb_queue *q = ptr; + if (q->buffer) { + ruby_sized_xfree(q->buffer, q->capa * sizeof(VALUE)); + } } static size_t queue_memsize(const void *ptr) { - return sizeof(struct rb_queue); + const struct rb_queue *q = ptr; + return sizeof(struct rb_queue) + (q->capa * sizeof(VALUE)); } static const rb_data_type_t queue_data_type = { .wrap_struct_name = "Thread::Queue", .function = { .dmark = queue_mark_and_move, - .dfree = RUBY_TYPED_DEFAULT_FREE, + .dfree = queue_free, .dsize = queue_memsize, .dcompact = queue_mark_and_move, }, @@ -745,7 +755,7 @@ queue_alloc(VALUE klass) struct rb_queue *q; obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q); - ccan_list_head_init(queue_waitq(q)); + ccan_list_head_init(&q->waitq); return obj; } @@ -759,13 +769,13 @@ queue_fork_check(struct rb_queue *q) } /* forked children can't reach into parent thread stacks */ q->fork_gen = fork_gen; - ccan_list_head_init(queue_waitq(q)); + ccan_list_head_init(&q->waitq); q->num_waiting = 0; return 1; } -static struct rb_queue * -queue_ptr(VALUE obj) +static inline struct rb_queue * +raw_queue_ptr(VALUE obj) { struct rb_queue *q; @@ -775,6 +785,22 @@ queue_ptr(VALUE obj) return q; } +static inline void +check_queue(VALUE obj, struct rb_queue *q) +{ + if (RB_UNLIKELY(q->buffer == NULL)) { + rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); + } +} + +static inline struct rb_queue * +queue_ptr(VALUE obj) +{ + struct rb_queue *q = raw_queue_ptr(obj); + check_queue(obj, q); + return q; +} + #define QUEUE_CLOSED FL_USER5 static rb_hrtime_t @@ -801,17 +827,25 @@ szqueue_mark_and_move(void *ptr) queue_mark_and_move(&sq->q); } +static void +szqueue_free(void *ptr) +{ + struct rb_szqueue *sq = ptr; + queue_free(&sq->q); +} + static size_t szqueue_memsize(const void *ptr) { - return sizeof(struct rb_szqueue); + const struct rb_szqueue *sq = ptr; + return sizeof(struct rb_szqueue) + (sq->q.capa * sizeof(VALUE)); } static const rb_data_type_t szqueue_data_type = { .wrap_struct_name = "Thread::SizedQueue", .function = { .dmark = szqueue_mark_and_move, - .dfree = RUBY_TYPED_DEFAULT_FREE, + .dfree = szqueue_free, .dsize = szqueue_memsize, .dcompact = szqueue_mark_and_move, }, @@ -830,8 +864,8 @@ szqueue_alloc(VALUE klass) return obj; } -static struct rb_szqueue * -szqueue_ptr(VALUE obj) +static inline struct rb_szqueue * +raw_szqueue_ptr(VALUE obj) { struct rb_szqueue *sq; @@ -844,25 +878,12 @@ szqueue_ptr(VALUE obj) return sq; } -static VALUE -ary_buf_new(void) -{ - return rb_ary_hidden_new(1); -} - -static inline VALUE -check_array(VALUE obj, VALUE ary) -{ - if (RB_LIKELY(ary)) { - return ary; - } - rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); -} - -static long -queue_length(VALUE self, struct rb_queue *q) +static inline struct rb_szqueue * +szqueue_ptr(VALUE obj) { - return RARRAY_LEN(check_array(self, q->que)); + struct rb_szqueue *sq = raw_szqueue_ptr(obj); + check_queue(obj, &sq->q); + return sq; } static int @@ -889,10 +910,63 @@ raise_closed_queue_error(VALUE self) static VALUE queue_closed_result(VALUE self, struct rb_queue *q) { - RUBY_ASSERT(queue_length(self, q) == 0); + RUBY_ASSERT(q->len == 0); return Qnil; } +#define QUEUE_INITIAL_CAPA 8 + +static inline void +ring_buffer_init(struct rb_queue *q, long initial_capa) +{ + q->buffer = ALLOC_N(VALUE, initial_capa); + q->capa = initial_capa; +} + +static inline void +ring_buffer_expand(struct rb_queue *q) +{ + RUBY_ASSERT(q->capa > 0); + VALUE *new_buffer = ALLOC_N(VALUE, q->capa * 2); + MEMCPY(new_buffer, q->buffer + q->offset, VALUE, q->capa - q->offset); + MEMCPY(new_buffer + (q->capa - q->offset), q->buffer, VALUE, q->offset); + VALUE *old_buffer = q->buffer; + q->buffer = new_buffer; + q->offset = 0; + ruby_sized_xfree(old_buffer, q->capa * sizeof(VALUE)); + q->capa *= 2; +} + +static void +ring_buffer_push(VALUE self, struct rb_queue *q, VALUE obj) +{ + if (RB_UNLIKELY(q->len >= q->capa)) { + ring_buffer_expand(q); + } + RUBY_ASSERT(q->capa > q->len); + long index = (q->offset + q->len) % q->capa; + q->len++; + RB_OBJ_WRITE(self, &q->buffer[index], obj); +} + +static VALUE +ring_buffer_shift(struct rb_queue *q) +{ + if (!q->len) { + return Qnil; + } + + VALUE obj = q->buffer[q->offset]; + q->len--; + if (q->len == 0) { + q->offset = 0; + } + else { + q->offset = (q->offset + 1) % q->capa; + } + return obj; +} + /* * Document-class: Thread::Queue * @@ -957,14 +1031,27 @@ static VALUE rb_queue_initialize(int argc, VALUE *argv, VALUE self) { VALUE initial; - struct rb_queue *q = queue_ptr(self); + struct rb_queue *q = raw_queue_ptr(self); if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) { initial = rb_to_array(initial); } - RB_OBJ_WRITE(self, queue_list(q), ary_buf_new()); - ccan_list_head_init(queue_waitq(q)); + ccan_list_head_init(&q->waitq); if (argc == 1) { - rb_ary_concat(q->que, initial); + long len = RARRAY_LEN(initial); + long initial_capa = QUEUE_INITIAL_CAPA; + while (initial_capa < len) { + initial_capa *= 2; + } + ring_buffer_init(q, initial_capa); + + const VALUE *initial_ptr = RARRAY_CONST_PTR(initial); + + for (long index = 0; index < len; index++) { + ring_buffer_push(self, q, initial_ptr[index]); + } + } + else { + ring_buffer_init(q, QUEUE_INITIAL_CAPA); } return self; } @@ -972,11 +1059,12 @@ rb_queue_initialize(int argc, VALUE *argv, VALUE self) static VALUE queue_do_push(VALUE self, struct rb_queue *q, VALUE obj) { + check_queue(self, q); if (queue_closed_p(self)) { raise_closed_queue_error(self); } - rb_ary_push(check_array(self, q->que), obj); - wakeup_one(queue_waitq(q)); + ring_buffer_push(self, q, obj); + wakeup_one(&q->waitq); return self; } @@ -1021,7 +1109,7 @@ rb_queue_close(VALUE self) if (!queue_closed_p(self)) { FL_SET(self, QUEUE_CLOSED); - wakeup_all(queue_waitq(q)); + wakeup_all(&q->waitq); } return self; @@ -1097,8 +1185,7 @@ szqueue_sleep_done(VALUE p) static inline VALUE queue_do_pop(rb_execution_context_t *ec, VALUE self, struct rb_queue *q, VALUE non_block, VALUE timeout) { - check_array(self, q->que); - if (RARRAY_LEN(q->que) == 0) { + if (q->len == 0) { if (RTEST(non_block)) { rb_raise(rb_eThreadError, "queue empty"); } @@ -1109,12 +1196,12 @@ queue_do_pop(rb_execution_context_t *ec, VALUE self, struct rb_queue *q, VALUE n } rb_hrtime_t end = queue_timeout2hrtime(timeout); - while (RARRAY_LEN(q->que) == 0) { + while (q->len == 0) { if (queue_closed_p(self)) { return queue_closed_result(self, q); } else { - RUBY_ASSERT(RARRAY_LEN(q->que) == 0); + RUBY_ASSERT(q->len == 0); RUBY_ASSERT(queue_closed_p(self) == 0); struct queue_waiter queue_waiter = { @@ -1122,7 +1209,7 @@ queue_do_pop(rb_execution_context_t *ec, VALUE self, struct rb_queue *q, VALUE n .as = {.q = q} }; - struct ccan_list_head *waitq = queue_waitq(q); + struct ccan_list_head *waitq = &q->waitq; ccan_list_add_tail(waitq, &queue_waiter.w.node); queue_waiter.as.q->num_waiting++; @@ -1139,7 +1226,7 @@ queue_do_pop(rb_execution_context_t *ec, VALUE self, struct rb_queue *q, VALUE n } } - return rb_ary_shift(q->que); + return ring_buffer_shift(q); } static VALUE @@ -1158,7 +1245,14 @@ rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE time static VALUE rb_queue_empty_p(VALUE self) { - return RBOOL(queue_length(self, queue_ptr(self)) == 0); + return RBOOL(queue_ptr(self)->len == 0); +} + +static void +queue_clear(struct rb_queue *q) +{ + q->len = 0; + q->offset = 0; } /* @@ -1170,9 +1264,7 @@ rb_queue_empty_p(VALUE self) static VALUE rb_queue_clear(VALUE self) { - struct rb_queue *q = queue_ptr(self); - - rb_ary_clear(check_array(self, q->que)); + queue_clear(queue_ptr(self)); return self; } @@ -1188,7 +1280,7 @@ rb_queue_clear(VALUE self) static VALUE rb_queue_length(VALUE self) { - return LONG2NUM(queue_length(self, queue_ptr(self))); + return LONG2NUM(queue_ptr(self)->len); } NORETURN(static VALUE rb_queue_freeze(VALUE self)); @@ -1241,14 +1333,13 @@ static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax) { long max; - struct rb_szqueue *sq = szqueue_ptr(self); + struct rb_szqueue *sq = raw_szqueue_ptr(self); max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } - - RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new()); + ring_buffer_init(&sq->q, QUEUE_INITIAL_CAPA); ccan_list_head_init(szqueue_waitq(sq)); ccan_list_head_init(szqueue_pushq(sq)); sq->max = max; @@ -1323,7 +1414,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_ { struct rb_szqueue *sq = szqueue_ptr(self); - if (queue_length(self, &sq->q) >= sq->max) { + if (sq->q.len >= sq->max) { if (RTEST(non_block)) { rb_raise(rb_eThreadError, "queue full"); } @@ -1334,7 +1425,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_ } rb_hrtime_t end = queue_timeout2hrtime(timeout); - while (queue_length(self, &sq->q) >= sq->max) { + while (sq->q.len >= sq->max) { if (queue_closed_p(self)) { raise_closed_queue_error(self); } @@ -1370,7 +1461,7 @@ rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE ti struct rb_szqueue *sq = szqueue_ptr(self); VALUE retval = queue_do_pop(ec, self, &sq->q, non_block, timeout); - if (queue_length(self, &sq->q) < sq->max) { + if (sq->q.len < sq->max) { wakeup_one(szqueue_pushq(sq)); } @@ -1387,8 +1478,7 @@ static VALUE rb_szqueue_clear(VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self); - - rb_ary_clear(check_array(self, sq->q.que)); + queue_clear(&sq->q); wakeup_all(szqueue_pushq(sq)); return self; } From 26a5bcd6de806fa460cafd0906651a66cac33e7e Mon Sep 17 00:00:00 2001 From: Benoit Daloze Date: Wed, 31 Dec 2025 18:29:01 +0100 Subject: [PATCH 2/2] [ruby/prism] Fix spacing in the generated #each_child_node https://github.com/ruby/prism/commit/91f60cb736 --- prism/templates/lib/prism/node.rb.erb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prism/templates/lib/prism/node.rb.erb b/prism/templates/lib/prism/node.rb.erb index c97c029d3bf410..066a0cea1b508f 100644 --- a/prism/templates/lib/prism/node.rb.erb +++ b/prism/templates/lib/prism/node.rb.erb @@ -353,7 +353,7 @@ module Prism <%- when Prism::Template::OptionalNodeField -%> yield <%= field.name %> if <%= field.name %> <%- when Prism::Template::NodeListField -%> - <%= field.name %>.each {|node| yield node } + <%= field.name %>.each { |node| yield node } <%- end -%> <%- end -%> end