@@ -56,31 +56,71 @@ def record_warning(type, attributes)
5656 redis . rpush ( key ( 'warnings' ) , Marshal . dump ( [ type , attributes ] ) )
5757 end
5858
59- def record_error ( id , payload , stats : nil )
59+ def record_error ( id , payload , stat_delta : nil )
60+ # Run acknowledge first so we know whether we're the first to ack
6061 acknowledged = @queue . acknowledge ( id , error : payload )
6162
6263 if acknowledged
63- # if another worker already acknowledged the test, we don't need to update the global stats or increment the test failed count
64- record_stats ( stats )
64+ # We were the first to ack; another worker already ack'd would get falsy from SADD
6565 @queue . increment_test_failed
66+ # Only the acknowledging worker's stats include this failure (others skip increment when ack=false).
67+ # Store so we can subtract it if another worker records success later.
68+ store_error_report_delta ( id , stat_delta ) if stat_delta && stat_delta . any?
6669 end
67- nil
70+ # Return so caller can roll back local counter when not acknowledged
71+ !!acknowledged
6872 end
6973
70- def record_success ( id , stats : nil , skip_flaky_record : false )
71- _ , error_reports_deleted_count , requeued_count , _ = redis . multi do |transaction |
74+ def record_success ( id , skip_flaky_record : false )
75+ acknowledged , error_reports_deleted_count , requeued_count , delta_json = redis . multi do |transaction |
7276 @queue . acknowledge ( id , pipeline : transaction )
7377 transaction . hdel ( key ( 'error-reports' ) , id )
7478 transaction . hget ( key ( 'requeues-count' ) , id )
75- record_stats ( stats , pipeline : transaction )
79+ transaction . hget ( key ( 'error-report-deltas' ) , id )
80+ end
81+ # When we're replacing a failure, subtract the (single) acknowledging worker's stat contribution
82+ if error_reports_deleted_count . to_i > 0 && delta_json
83+ apply_error_report_delta_correction ( delta_json )
84+ redis . hdel ( key ( 'error-report-deltas' ) , id )
7685 end
7786 record_flaky ( id ) if !skip_flaky_record && ( error_reports_deleted_count . to_i > 0 || requeued_count . to_i > 0 )
78- nil
87+ # Count this run when we ack'd or when we replaced a failure (so stats delta is applied)
88+ !!( acknowledged || error_reports_deleted_count . to_i > 0 )
7989 end
8090
81- def record_requeue ( id , stats : nil )
82- redis . pipelined do |pipeline |
83- record_stats ( stats , pipeline : pipeline )
91+ def record_requeue ( id )
92+ true
93+ end
94+
95+ def record_stats ( stats = nil , pipeline : nil )
96+ return unless stats
97+ if pipeline
98+ stats . each do |stat_name , stat_value |
99+ pipeline . hset ( key ( stat_name ) , config . worker_id , stat_value )
100+ pipeline . expire ( key ( stat_name ) , config . redis_ttl )
101+ end
102+ else
103+ redis . pipelined do |p |
104+ record_stats ( stats , pipeline : p )
105+ end
106+ end
107+ end
108+
109+ # Apply a delta to this worker's stats in Redis (HINCRBY). Use this instead of
110+ # record_stats when recording per-test so we never overwrite and correction sticks.
111+ def record_stats_delta ( delta , pipeline : nil )
112+ return if delta . nil? || delta . empty?
113+ apply_delta = lambda do |p |
114+ delta . each do |stat_name , value |
115+ next unless value . is_a? ( Numeric ) || value . to_s . match? ( /\A -?\d +\. ?\d *\z / )
116+ p . hincrbyfloat ( key ( stat_name ) , config . worker_id . to_s , value . to_f )
117+ p . expire ( key ( stat_name ) , config . redis_ttl )
118+ end
119+ end
120+ if pipeline
121+ apply_delta . call ( pipeline )
122+ else
123+ redis . pipelined { |p | apply_delta . call ( p ) }
84124 end
85125 end
86126
@@ -131,17 +171,31 @@ def reset_stats(stat_names)
131171
132172 attr_reader :config , :redis
133173
134- def record_stats ( stats , pipeline : redis )
135- return unless stats
136- stats . each do |stat_name , stat_value |
137- pipeline . hset ( key ( stat_name ) , config . worker_id , stat_value )
138- pipeline . expire ( key ( stat_name ) , config . redis_ttl )
139- end
140- end
141-
142174 def key ( *args )
143175 KeyShortener . key ( config . build_id , *args )
144176 end
177+
178+ def store_error_report_delta ( test_id , stat_delta )
179+ # Only the acknowledging worker's stats include this test; store their delta for correction on success
180+ payload = { 'worker_id' => config . worker_id . to_s } . merge ( stat_delta )
181+ redis . hset ( key ( 'error-report-deltas' ) , test_id , JSON . generate ( payload ) )
182+ redis . expire ( key ( 'error-report-deltas' ) , config . redis_ttl )
183+ end
184+
185+ def apply_error_report_delta_correction ( delta_json )
186+ delta = JSON . parse ( delta_json )
187+ worker_id = delta . delete ( 'worker_id' ) &.to_s
188+ return if worker_id . nil? || worker_id . empty? || delta . empty?
189+
190+ redis . pipelined do |pipeline |
191+ delta . each do |stat_name , value |
192+ next unless value . is_a? ( Numeric ) || value . to_s . match? ( /\A -?\d +\. ?\d *\z / )
193+
194+ pipeline . hincrbyfloat ( key ( stat_name ) , worker_id , -value . to_f )
195+ pipeline . expire ( key ( stat_name ) , config . redis_ttl )
196+ end
197+ end
198+ end
145199 end
146200 end
147201 end
0 commit comments