Skip to content

Commit 1a336ea

Browse files
committed
perf: cache inspector views to avoid redundant JSON parsing
When source matching fails the "last match" optimization and falls back to matchAll, the same raw bytes would be parsed multiple times by the same inspector. Now views are cached per inspector during a single Process call. This reduces JSON parsing from O(sources * groups) to O(inspectors) in the worst case when messages switch between sources.
1 parent c73a5bf commit 1a336ea

2 files changed

Lines changed: 308 additions & 12 deletions

File tree

router.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,32 +221,68 @@ func (r *Router) Process(ctx context.Context, raw []byte) error {
221221
return err
222222
}
223223

224+
// viewCache caches parsed views per inspector to avoid re-parsing the same
225+
// raw bytes multiple times during source matching.
226+
type viewCache struct {
227+
raw []byte
228+
views map[Inspector]viewResult
229+
}
230+
231+
type viewResult struct {
232+
view View
233+
ok bool
234+
}
235+
236+
func newViewCache(raw []byte) *viewCache {
237+
return &viewCache{
238+
raw: raw,
239+
views: make(map[Inspector]viewResult),
240+
}
241+
}
242+
243+
// get returns a cached view or parses and caches it.
244+
func (c *viewCache) get(insp Inspector) (View, bool) {
245+
if result, ok := c.views[insp]; ok {
246+
return result.view, result.ok
247+
}
248+
249+
view, err := insp.Inspect(c.raw)
250+
if err != nil {
251+
c.views[insp] = viewResult{ok: false}
252+
return nil, false
253+
}
254+
255+
c.views[insp] = viewResult{view: view, ok: true}
256+
return view, true
257+
}
258+
224259
// match finds a source whose discriminator matches the raw message.
225260
// Uses adaptive ordering to try the last successful source first.
226261
func (r *Router) match(raw []byte) Source {
262+
cache := newViewCache(raw)
263+
227264
// Try last successful source first (fast path)
228265
if v := r.lastMatch.Load(); v != nil {
229266
if lastMatch := v.(string); lastMatch != "" {
230-
if src := r.trySource(raw, lastMatch); src != nil {
267+
if src := r.trySource(cache, lastMatch); src != nil {
231268
return src
232269
}
233270
}
234271
}
235272

236273
// Full search through all groups
237-
src := r.matchAll(raw)
274+
src := r.matchAll(cache)
238275
if src != nil {
239276
r.lastMatch.Store(src.Name())
240277
}
241278
return src
242279
}
243280

244281
// trySource attempts to match a specific source by name.
245-
func (r *Router) trySource(raw []byte, name string) Source {
282+
func (r *Router) trySource(cache *viewCache, name string) Source {
246283
// Check default sources
247284
if len(r.defaultSources) > 0 {
248-
view, err := r.defaultInspector.Inspect(raw)
249-
if err == nil {
285+
if view, ok := cache.get(r.defaultInspector); ok {
250286
for _, src := range r.defaultSources {
251287
if src.Name() == name && src.Discriminator().Match(view) {
252288
return src
@@ -257,8 +293,8 @@ func (r *Router) trySource(raw []byte, name string) Source {
257293

258294
// Check custom groups
259295
for _, g := range r.groups {
260-
view, err := g.inspector.Inspect(raw)
261-
if err != nil {
296+
view, ok := cache.get(g.inspector)
297+
if !ok {
262298
continue
263299
}
264300
for _, src := range g.sources {
@@ -272,11 +308,10 @@ func (r *Router) trySource(raw []byte, name string) Source {
272308
}
273309

274310
// matchAll searches all groups for a matching source.
275-
func (r *Router) matchAll(raw []byte) Source {
311+
func (r *Router) matchAll(cache *viewCache) Source {
276312
// Try default group first
277313
if len(r.defaultSources) > 0 {
278-
view, err := r.defaultInspector.Inspect(raw)
279-
if err == nil {
314+
if view, ok := cache.get(r.defaultInspector); ok {
280315
for _, src := range r.defaultSources {
281316
if src.Discriminator().Match(view) {
282317
return src
@@ -287,8 +322,8 @@ func (r *Router) matchAll(raw []byte) Source {
287322

288323
// Try custom groups in order
289324
for _, g := range r.groups {
290-
view, err := g.inspector.Inspect(raw)
291-
if err != nil {
325+
view, ok := cache.get(g.inspector)
326+
if !ok {
292327
continue
293328
}
294329
for _, src := range g.sources {

router_test.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,3 +1137,264 @@ func TestRouter_TrySourceInspectorFailsInCustomGroup(t *testing.T) {
11371137

11381138
assert.Error(t, err)
11391139
}
1140+
1141+
// countingInspector tracks how many times Inspect is called.
1142+
type countingInspector struct {
1143+
count int
1144+
}
1145+
1146+
func (c *countingInspector) Inspect(raw []byte) (View, error) {
1147+
c.count++
1148+
return JSONInspector().Inspect(raw)
1149+
}
1150+
1151+
func (c *countingInspector) reset() {
1152+
c.count = 0
1153+
}
1154+
1155+
type ViewCachingSuite struct {
1156+
suite.Suite
1157+
}
1158+
1159+
func TestViewCachingSuite(t *testing.T) {
1160+
suite.Run(t, new(ViewCachingSuite))
1161+
}
1162+
1163+
func (s *ViewCachingSuite) TestInspectorCalledOnceWhenTrySourceSucceeds() {
1164+
inspector := &countingInspector{}
1165+
1166+
r := New(WithInspector(inspector))
1167+
1168+
source := SourceFunc("test-source", HasFields("type"), func(raw []byte) (Parsed, bool) {
1169+
var env struct {
1170+
Type string `json:"type"`
1171+
Payload json.RawMessage `json:"payload"`
1172+
}
1173+
if json.Unmarshal(raw, &env) != nil || env.Type == "" {
1174+
return Parsed{}, false
1175+
}
1176+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1177+
})
1178+
r.AddSource(source)
1179+
Register(r, "test", &testHandler{})
1180+
1181+
// First message - no lastMatch, goes through matchAll
1182+
msg := []byte(`{"type": "test", "payload": {}}`)
1183+
err := r.Process(context.Background(), msg)
1184+
s.Require().NoError(err)
1185+
1186+
// Inspector should be called exactly once
1187+
s.Assert().Equal(1, inspector.count)
1188+
}
1189+
1190+
func (s *ViewCachingSuite) TestInspectorCalledOnceWhenTrySourceFailsAndMatchAllSucceeds() {
1191+
inspector := &countingInspector{}
1192+
1193+
r := New(WithInspector(inspector))
1194+
1195+
// Source A - matches "a" field
1196+
sourceA := SourceFunc("source-a", HasFields("a"), func(raw []byte) (Parsed, bool) {
1197+
var env struct {
1198+
A bool `json:"a"`
1199+
Type string `json:"type"`
1200+
Payload json.RawMessage `json:"payload"`
1201+
}
1202+
if json.Unmarshal(raw, &env) != nil || !env.A {
1203+
return Parsed{}, false
1204+
}
1205+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1206+
})
1207+
1208+
// Source B - matches "b" field
1209+
sourceB := SourceFunc("source-b", HasFields("b"), func(raw []byte) (Parsed, bool) {
1210+
var env struct {
1211+
B bool `json:"b"`
1212+
Type string `json:"type"`
1213+
Payload json.RawMessage `json:"payload"`
1214+
}
1215+
if json.Unmarshal(raw, &env) != nil || !env.B {
1216+
return Parsed{}, false
1217+
}
1218+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1219+
})
1220+
1221+
r.AddSource(sourceA)
1222+
r.AddSource(sourceB)
1223+
Register(r, "test", &testHandler{})
1224+
1225+
// Prime with source-a
1226+
msg1 := []byte(`{"a": true, "type": "test", "payload": {}}`)
1227+
err := r.Process(context.Background(), msg1)
1228+
s.Require().NoError(err)
1229+
inspector.reset()
1230+
1231+
// Now send message that matches source-b
1232+
// trySource will try "source-a" first (lastMatch), fail
1233+
// matchAll will find "source-b"
1234+
// With caching, inspector should only be called ONCE
1235+
msg2 := []byte(`{"b": true, "type": "test", "payload": {}}`)
1236+
err = r.Process(context.Background(), msg2)
1237+
1238+
s.Require().NoError(err)
1239+
s.Assert().Equal(1, inspector.count, "inspector should only be called once due to view caching")
1240+
}
1241+
1242+
func (s *ViewCachingSuite) TestInspectorCalledOncePerGroupWithMultipleGroups() {
1243+
defaultInspector := &countingInspector{}
1244+
group1Inspector := &countingInspector{}
1245+
group2Inspector := &countingInspector{}
1246+
1247+
r := New(WithInspector(defaultInspector))
1248+
1249+
// Default source - won't match
1250+
defaultSource := SourceFunc("default", HasFields("default_field"), func(raw []byte) (Parsed, bool) {
1251+
return Parsed{}, false
1252+
})
1253+
r.AddSource(defaultSource)
1254+
1255+
// Group 1 source - won't match
1256+
group1Source := SourceFunc("group1", HasFields("group1_field"), func(raw []byte) (Parsed, bool) {
1257+
return Parsed{}, false
1258+
})
1259+
r.AddGroup(group1Inspector, group1Source)
1260+
1261+
// Group 2 source - will match
1262+
group2Source := SourceFunc("group2", HasFields("group2_field"), func(raw []byte) (Parsed, bool) {
1263+
var env struct {
1264+
Type string `json:"type"`
1265+
Payload json.RawMessage `json:"payload"`
1266+
}
1267+
if json.Unmarshal(raw, &env) != nil {
1268+
return Parsed{}, false
1269+
}
1270+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1271+
})
1272+
r.AddGroup(group2Inspector, group2Source)
1273+
1274+
Register(r, "test", &testHandler{})
1275+
1276+
msg := []byte(`{"group2_field": true, "type": "test", "payload": {}}`)
1277+
err := r.Process(context.Background(), msg)
1278+
1279+
s.Require().NoError(err)
1280+
s.Assert().Equal(1, defaultInspector.count, "default inspector should be called once")
1281+
s.Assert().Equal(1, group1Inspector.count, "group1 inspector should be called once")
1282+
s.Assert().Equal(1, group2Inspector.count, "group2 inspector should be called once")
1283+
}
1284+
1285+
func (s *ViewCachingSuite) TestSameInspectorSharedAcrossGroupsCalledOnce() {
1286+
sharedInspector := &countingInspector{}
1287+
1288+
r := New(WithInspector(sharedInspector))
1289+
1290+
// Default source - won't match
1291+
defaultSource := SourceFunc("default", HasFields("default_field"), func(raw []byte) (Parsed, bool) {
1292+
return Parsed{}, false
1293+
})
1294+
r.AddSource(defaultSource)
1295+
1296+
// Custom group using the SAME inspector
1297+
customSource := SourceFunc("custom", HasFields("custom_field"), func(raw []byte) (Parsed, bool) {
1298+
var env struct {
1299+
Type string `json:"type"`
1300+
Payload json.RawMessage `json:"payload"`
1301+
}
1302+
if json.Unmarshal(raw, &env) != nil {
1303+
return Parsed{}, false
1304+
}
1305+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1306+
})
1307+
r.AddGroup(sharedInspector, customSource)
1308+
1309+
Register(r, "test", &testHandler{})
1310+
1311+
msg := []byte(`{"custom_field": true, "type": "test", "payload": {}}`)
1312+
err := r.Process(context.Background(), msg)
1313+
1314+
s.Require().NoError(err)
1315+
// Same inspector used for default group and custom group - should only be called once
1316+
s.Assert().Equal(1, sharedInspector.count, "shared inspector should only be called once")
1317+
}
1318+
1319+
func (s *ViewCachingSuite) TestViewCacheHandlesInspectorError() {
1320+
failingInspector := &mockInspector{err: ErrInvalidJSON}
1321+
workingInspector := &countingInspector{}
1322+
1323+
r := New(WithInspector(failingInspector))
1324+
1325+
// Default source with failing inspector
1326+
defaultSource := SourceFunc("default", HasFields("x"), func(raw []byte) (Parsed, bool) {
1327+
return Parsed{}, false
1328+
})
1329+
r.AddSource(defaultSource)
1330+
1331+
// Custom group with working inspector
1332+
customSource := SourceFunc("custom", HasFields("type"), func(raw []byte) (Parsed, bool) {
1333+
var env struct {
1334+
Type string `json:"type"`
1335+
Payload json.RawMessage `json:"payload"`
1336+
}
1337+
if json.Unmarshal(raw, &env) != nil {
1338+
return Parsed{}, false
1339+
}
1340+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1341+
})
1342+
r.AddGroup(workingInspector, customSource)
1343+
1344+
Register(r, "test", &testHandler{})
1345+
1346+
msg := []byte(`{"type": "test", "payload": {}}`)
1347+
err := r.Process(context.Background(), msg)
1348+
1349+
s.Require().NoError(err)
1350+
s.Assert().Equal(1, workingInspector.count)
1351+
}
1352+
1353+
func (s *ViewCachingSuite) TestViewCacheCachesFailureResult() {
1354+
// Inspector that fails but counts calls
1355+
failingInspector := &failingCountingInspector{}
1356+
workingInspector := &countingInspector{}
1357+
1358+
r := New(WithInspector(failingInspector))
1359+
1360+
// Add multiple sources to default group to force multiple discriminator checks
1361+
r.AddSource(SourceFunc("src1", HasFields("a"), func(raw []byte) (Parsed, bool) {
1362+
return Parsed{}, false
1363+
}))
1364+
r.AddSource(SourceFunc("src2", HasFields("b"), func(raw []byte) (Parsed, bool) {
1365+
return Parsed{}, false
1366+
}))
1367+
1368+
// Custom group with working inspector
1369+
customSource := SourceFunc("custom", HasFields("type"), func(raw []byte) (Parsed, bool) {
1370+
var env struct {
1371+
Type string `json:"type"`
1372+
Payload json.RawMessage `json:"payload"`
1373+
}
1374+
if json.Unmarshal(raw, &env) != nil {
1375+
return Parsed{}, false
1376+
}
1377+
return Parsed{Key: env.Type, Payload: env.Payload}, true
1378+
})
1379+
r.AddGroup(workingInspector, customSource)
1380+
1381+
Register(r, "test", &testHandler{})
1382+
1383+
msg := []byte(`{"type": "test", "payload": {}}`)
1384+
err := r.Process(context.Background(), msg)
1385+
1386+
s.Require().NoError(err)
1387+
// Failing inspector should only be called once, even with multiple sources
1388+
s.Assert().Equal(1, failingInspector.count, "failing inspector should only be called once (result cached)")
1389+
s.Assert().Equal(1, workingInspector.count)
1390+
}
1391+
1392+
// failingCountingInspector always fails but counts calls.
1393+
type failingCountingInspector struct {
1394+
count int
1395+
}
1396+
1397+
func (f *failingCountingInspector) Inspect(raw []byte) (View, error) {
1398+
f.count++
1399+
return nil, ErrInvalidJSON
1400+
}

0 commit comments

Comments
 (0)