@@ -14,6 +14,7 @@ import (
1414
1515 "github.com/Arkiv-Network/sqlite-bitmap-store/store"
1616 "github.com/ethereum/go-ethereum/common"
17+ "github.com/ethereum/go-ethereum/metrics"
1718 "github.com/golang-migrate/migrate/v4"
1819 "github.com/golang-migrate/migrate/v4/database/sqlite3"
1920 "github.com/golang-migrate/migrate/v4/source/iofs"
@@ -23,6 +24,21 @@ import (
2324 "github.com/Arkiv-Network/arkiv-events/events"
2425)
2526
27+ var (
28+ // Metrics for tracking operations
29+ metricOperationStarted = metrics .NewRegisteredCounter ("arkiv_store/operations_started" , nil )
30+ metricOperationSuccessful = metrics .NewRegisteredCounter ("arkiv_store/operations_successful" , nil )
31+ metricCreates = metrics .NewRegisteredCounter ("arkiv_store/creates" , nil )
32+ metricUpdates = metrics .NewRegisteredCounter ("arkiv_store/updates" , nil )
33+ metricDeletes = metrics .NewRegisteredCounter ("arkiv_store/deletes" , nil )
34+ metricExtends = metrics .NewRegisteredCounter ("arkiv_store/extends" , nil )
35+ metricOwnerChanges = metrics .NewRegisteredCounter ("arkiv_store/owner_changes" , nil )
36+ // Tracks operation duration (ms) using an exponential decay sample so the histogram
37+ // is more responsive to recent performance by weighting newer measurements higher
38+ // (sample size 100, alpha 0.4).
39+ metricOperationTime = metrics .NewRegisteredHistogram ("arkiv_store/operation_time_ms" , nil , metrics .NewExpDecaySample (100 , 0.4 ))
40+ )
41+
2642type SQLiteStore struct {
2743 writePool * sql.DB
2844 readPool * sql.DB
@@ -99,18 +115,23 @@ func (s *SQLiteStore) GetLastBlock(ctx context.Context) (uint64, error) {
99115 return store .New (s .writePool ).GetLastBlock (ctx )
100116}
101117
118+ type blockStats struct {
119+ creates int
120+ updates int
121+ deletes int
122+ extends int
123+ ownerChanges int
124+ }
125+
102126func (s * SQLiteStore ) FollowEvents (ctx context.Context , iterator arkivevents.BatchIterator ) error {
103127
104128 for batch := range iterator {
105129 if batch .Error != nil {
106130 return fmt .Errorf ("failed to follow events: %w" , batch .Error )
107131 }
108132
109- totalCreates := 0
110- totalUpdates := 0
111- totalDeletes := 0
112- totalExtends := 0
113- totalOwnerChanges := 0
133+ // We will calculate totals for the log at the end, but track per-block for metrics
134+ stats := make (map [uint64 ]* blockStats )
114135
115136 err := func () error {
116137
@@ -138,20 +159,22 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
138159
139160 startTime := time .Now ()
140161
162+ metricOperationStarted .Inc (1 )
163+
141164 mainLoop:
142165 for _ , block := range batch .Batch .Blocks {
143166
144- updates := 0
145- deletes := 0
146- extends := 0
147- creates := 0
148- ownerChanges := 0
149-
150167 if block .Number <= uint64 (lastBlockFromDB ) {
151168 s .log .Info ("skipping block" , "block" , block .Number , "lastBlockFromDB" , lastBlockFromDB )
152169 continue mainLoop
153170 }
154171
172+ // Initialize stats for this block
173+ if _ , ok := stats [block .Number ]; ! ok {
174+ stats [block .Number ] = & blockStats {}
175+ }
176+ blockStat := stats [block .Number ]
177+
155178 updatesMap := map [common.Hash ][]* events.OPUpdate {}
156179
157180 for _ , operation := range block .Operations {
@@ -162,15 +185,14 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
162185 }
163186 }
164187
165- // blockNumber := block.Number
166188 operationLoop:
167189 for _ , operation := range block .Operations {
168190
169191 switch {
170192
171193 case operation .Create != nil :
172194 // expiresAtBlock := blockNumber + operation.Create.BTL
173- creates ++
195+ blockStat . creates ++
174196 key := operation .Create .Key
175197
176198 stringAttributes := maps .Clone (operation .Create .StringAttributes )
@@ -225,14 +247,14 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
225247 }
226248 }
227249 case operation .Update != nil :
228- updates ++
229250
230251 updates := updatesMap [operation .Update .Key ]
231252 lastUpdate := updates [len (updates )- 1 ]
232253
233254 if operation .Update != lastUpdate {
234255 continue operationLoop
235256 }
257+ blockStat .updates ++
236258
237259 key := operation .Update .Key .Bytes ()
238260
@@ -319,7 +341,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
319341
320342 case operation .Delete != nil || operation .Expire != nil :
321343
322- deletes ++
344+ blockStat . deletes ++
323345 var key []byte
324346 if operation .Delete != nil {
325347 key = common .Hash (* operation .Delete ).Bytes ()
@@ -363,7 +385,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
363385
364386 case operation .ExtendBTL != nil :
365387
366- extends ++
388+ blockStat . extends ++
367389
368390 key := operation .ExtendBTL .Key .Bytes ()
369391
@@ -403,7 +425,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
403425 }
404426
405427 case operation .ChangeOwner != nil :
406- ownerChanges ++
428+ blockStat . ownerChanges ++
407429 key := operation .ChangeOwner .Key .Bytes ()
408430
409431 latestPayload , err := st .GetPayloadForEntityKey (ctx , key )
@@ -449,12 +471,8 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
449471
450472 }
451473
452- s .log .Info ("block updated" , "block" , block .Number , "creates" , creates , "updates" , updates , "deletes" , deletes , "extends" , extends , "ownerChanges" , ownerChanges )
453- totalCreates += creates
454- totalUpdates += updates
455- totalDeletes += deletes
456- totalExtends += extends
457- totalOwnerChanges += ownerChanges
474+ // Log per block if needed, but we can now rely on the map for totals later
475+ s .log .Info ("block updated" , "block" , block .Number , "creates" , blockStat .creates , "updates" , blockStat .updates , "deletes" , blockStat .deletes , "extends" , blockStat .extends , "ownerChanges" , blockStat .ownerChanges )
458476 }
459477
460478 err = st .UpsertLastBlock (ctx , lastBlock )
@@ -472,7 +490,55 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat
472490 return fmt .Errorf ("failed to commit transaction: %w" , err )
473491 }
474492
475- s .log .Info ("batch processed" , "firstBlock" , firstBlock , "lastBlock" , lastBlock , "processingTime" , time .Since (startTime ).Milliseconds (), "creates" , totalCreates , "updates" , totalUpdates , "deletes" , totalDeletes , "extends" , totalExtends , "ownerChanges" , totalOwnerChanges )
493+ // Calculate batch totals for logging and update metrics PER BLOCK
494+ var (
495+ totalCreates int
496+ totalUpdates int
497+ totalDeletes int
498+ totalExtends int
499+ totalOwnerChanges int
500+ )
501+
502+ // Iterate blocks again to preserve order and update metrics per block
503+ for _ , block := range batch .Batch .Blocks {
504+ if stat , ok := stats [block .Number ]; ok {
505+ totalCreates += stat .creates
506+ totalUpdates += stat .updates
507+ totalDeletes += stat .deletes
508+ totalExtends += stat .extends
509+ totalOwnerChanges += stat .ownerChanges
510+
511+ // Update metrics specifically per block
512+ if stat .creates > 0 {
513+ metricCreates .Inc (int64 (stat .creates ))
514+ }
515+ if stat .updates > 0 {
516+ metricUpdates .Inc (int64 (stat .updates ))
517+ }
518+ if stat .deletes > 0 {
519+ metricDeletes .Inc (int64 (stat .deletes ))
520+ }
521+ if stat .extends > 0 {
522+ metricExtends .Inc (int64 (stat .extends ))
523+ }
524+ if stat .ownerChanges > 0 {
525+ metricOwnerChanges .Inc (int64 (stat .ownerChanges ))
526+ }
527+ }
528+ }
529+
530+ metricOperationSuccessful .Inc (1 )
531+ metricOperationTime .Update (time .Since (startTime ).Milliseconds ())
532+
533+ s .log .Info ("batch processed" ,
534+ "firstBlock" , firstBlock ,
535+ "lastBlock" , lastBlock ,
536+ "processingTime" , time .Since (startTime ).Milliseconds (),
537+ "creates" , totalCreates ,
538+ "updates" , totalUpdates ,
539+ "deletes" , totalDeletes ,
540+ "extends" , totalExtends ,
541+ "ownerChanges" , totalOwnerChanges )
476542
477543 return nil
478544 }()
0 commit comments