From 3f68f5fd93366561716206b9c45a013876e35611 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 10 Sep 2025 16:08:20 -0700 Subject: [PATCH 01/11] Use pebble DB backend for consensus follower --- state/state.go | 113 +++++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 56 deletions(-) diff --git a/state/state.go b/state/state.go index c5e04bf..f098b66 100644 --- a/state/state.go +++ b/state/state.go @@ -15,7 +15,6 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v2" flowcrypto "github.com/onflow/crypto" "github.com/onflow/flow-go/cmd/bootstrap/utils" hotstuff "github.com/onflow/flow-go/consensus/hotstuff/model" @@ -25,13 +24,14 @@ import ( "github.com/onflow/flow-go/module/compliance" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/storage/operation" - "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/rosetta/cache" "github.com/onflow/rosetta/config" "github.com/onflow/rosetta/indexdb" - "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/model" "github.com/onflow/rosetta/process" + "github.com/rs/zerolog/log" "golang.org/x/crypto/openpgp" "golang.org/x/crypto/openpgp/armor" "google.golang.org/grpc/codes" @@ -98,18 +98,18 @@ func (i *Indexer) downloadRootState(ctx context.Context, spork *config.Spork, sp bootstrapDir := filepath.Join(sporkDir, "public-root-information") err := os.MkdirAll(bootstrapDir, 0o744) if err != nil { - log.Fatalf("Failed to create the spork bootstrap directory: %s", err) + log.Fatal().Msgf("Failed to create the spork bootstrap directory: %s", err) } data := download(ctx, spork.Consensus.RootProtocolStateURL) dst := filepath.Join(bootstrapDir, "root-protocol-state-snapshot.json") err = os.WriteFile(dst, data, 0o600) if err != nil { - log.Fatalf("Failed to write to %s: %s", dst, err) + log.Fatal().Msgf("Failed to write to %s: %s", dst, err) } i.root = &stateSnapshot{} err = json.Unmarshal(data, i.root) if err != nil { - log.Fatalf("Failed to decode root protocol state snapshot: %s", err) + log.Fatal().Msgf("Failed to decode root protocol state snapshot: %s", err) } if spork.Consensus.DisableSignatureCheck { return @@ -117,24 +117,24 @@ func (i *Indexer) downloadRootState(ctx context.Context, spork *config.Spork, sp signer := bytes.NewReader([]byte(spork.Consensus.SigningKey)) keyring, err := openpgp.ReadArmoredKeyRing(signer) if err != nil { - log.Fatalf("Failed to read PGP keyring from the configured signing_key: %s", err) + log.Fatal().Msgf("Failed to read PGP keyring from the configured signing_key: %s", err) } sig := download(ctx, spork.Consensus.RootProtocolStateSignatureURL) block, err := armor.Decode(bytes.NewReader(sig)) if err != nil { - log.Fatalf( + log.Fatal().Msgf( "Failed to decode PGP signature block from %s: %s", spork.Consensus.RootProtocolStateSignatureURL, err, ) } if block.Type != openpgp.SignatureType { - log.Fatalf("Failed to get PGP signature block: got %q instead", block.Type) + log.Fatal().Msgf("Failed to get PGP signature block: got %q instead", block.Type) } _, err = openpgp.CheckDetachedSignature( keyring, bytes.NewReader(data), block.Body, ) if err != nil { - log.Fatalf("Failed to get valid PGP signature for the root protocol state: %s", err) + log.Fatal().Msgf("Failed to get valid PGP signature for the root protocol state: %s", err) } } @@ -154,7 +154,7 @@ func (i *Indexer) findRootBlock(ctx context.Context, spork *config.Spork) *model client := spork.AccessNodes.Client() block, err := client.BlockByHeight(ctx, spork.RootBlock) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to fetch the %s root block: %s", spork, err, ) @@ -162,7 +162,7 @@ func (i *Indexer) findRootBlock(ctx context.Context, spork *config.Spork) *model continue } if block.Height != spork.RootBlock { - log.Errorf( + log.Error().Msgf( "Unexpected block height (%d) when fetching the root block of %s at height %d", block.Height, spork, spork.RootBlock, ) @@ -198,7 +198,7 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui if skipCache { ctx = cache.Skip(rctx) if !skipCacheWarned { - log.Warnf( + log.Warn().Msgf( "Skipping cache for getting verified parent for block %x at height %d", hash, height, ) @@ -210,14 +210,14 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui client := spork.AccessNodes.Client() hdr, err := client.BlockHeaderByHeight(ctx, height) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to fetch header for block %x at height %d: %s", hash, height, err, ) continue } if !bytes.Equal(hdr.Id, hash) { - log.Errorf( + log.Error().Msgf( "Got unexpected header value for block at height %d: expected %x, got %x", height, hash, hdr.Id, ) @@ -226,14 +226,14 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui } block, err := client.BlockByHeight(ctx, height) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to fetch block %x at height %d: %s", hash, height, err, ) continue } if !bytes.Equal(block.Id, hash) { - log.Errorf( + log.Error().Msgf( "Got unexpected block value at height %d: expected %x, got %x", height, hash, block.Id, ) @@ -264,17 +264,17 @@ func (i *Indexer) handleResyncFrom() { } genesis := i.Store.Genesis().Height if height < genesis { - log.Fatalf( + log.Fatal().Msgf( "The resync_from value (%d) cannot be less than the height of the genesis block (%d)", height, genesis, ) } err := i.Store.ResetTo(height) if err != nil { - log.Fatalf("Failed to reset data for resync_from: %s", err) + log.Fatal().Msgf("Failed to reset data for resync_from: %s", err) } i.lastIndexed = i.Store.Latest() - log.Infof( + log.Info().Msgf( "Successfully reset indexed data to block %x at height %d", i.lastIndexed.Hash, i.lastIndexed.Height, ) @@ -339,7 +339,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last if skipCache { ctx = cache.Skip(rctx) if !skipCacheWarned { - log.Warnf( + log.Warn().Msgf( "Skipping cache for retrieving block at height %d", height, ) @@ -359,13 +359,13 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last if synced { time.Sleep(time.Second) } else { - log.Errorf( + log.Error().Msgf( "Failed to fetch header for block at height %d: %s", height, err, ) } default: - log.Errorf( + log.Error().Msgf( "Failed to fetch header for block at height %d: %s", height, err, ) @@ -374,11 +374,11 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last } block, err := client.BlockByHeight(ctx, height) if err != nil { - log.Errorf("Failed to fetch block at height %d: %s", height, err) + log.Error().Msgf("Failed to fetch block at height %d: %s", height, err) continue } if !bytes.Equal(block.ParentId, parent) { - log.Errorf( + log.Error().Msgf( "Unexpected parent hash found for block %x at height %d: expected %x, got %x", block.Id, height, parent, block.ParentId, ) @@ -393,7 +393,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last blockID := flow.Identifier{} err := operation.LookupBlockHeight(i.consensus.Reader(), height, &blockID) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to get block ID for height %d from consensus: %s", height, err, ) @@ -403,7 +403,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last continue } if !bytes.Equal(blockID[:], block.Id) { - log.Errorf( + log.Error().Msgf( "Mismatching block ID found for height %d: %x from Access API, %x from consensus", height, block.Id, blockID[:], ) @@ -411,14 +411,14 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last continue } } - log.Infof("Retrieved block %x at height %d", block.Id, block.Height) + log.Info().Msgf("Retrieved block %x at height %d", block.Id, block.Height) blocks[string(block.Id)] = height // NOTE(tav): We assume that block seals will only ever be seen in // block height order. for _, seal := range block.BlockSeals { blockHeight, ok := blocks[string(seal.BlockId)] if !ok { - log.Warnf( + log.Warn().Msgf( "Skipping seal for block %x in block %x at height %d", seal.BlockId, block.Id, height, ) @@ -430,7 +430,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last sealID := flow.Identifier{} err := operation.LookupBySealedBlockID(i.consensus.Reader(), blockID, &sealID) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to get seal ID for block %x from consensus: %s", seal.BlockId, err, ) @@ -439,21 +439,21 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last blockSeal := &flow.Seal{} err = operation.RetrieveSeal(i.consensus.Reader(), sealID, blockSeal) if err != nil { - log.Errorf( + log.Error().Msgf( "Failed to get seal %x for block %x from consensus: %s", sealID[:], seal.BlockId, err, ) continue inner } if !bytes.Equal(seal.BlockId, blockSeal.BlockID[:]) { - log.Errorf( + log.Error().Msgf( "Unverifiable seal found in block %x at height %d: got seal for block %x via Acccess API, found seal for block %x via consensus", block.Id, height, seal.BlockId, blockSeal.BlockID[:], ) continue inner } if !bytes.Equal(seal.ResultId, blockSeal.ResultID[:]) { - log.Errorf( + log.Error().Msgf( "Unverifiable execution result for block %x found in block %x at height %d: got %x via Acccess API, got %x via consensus", seal.BlockId, block.Id, height, seal.ResultId, blockSeal.ResultID[:], ) @@ -468,14 +468,14 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last parent = block.Id if time.Since(block.Timestamp.AsTime()) < syncWindow { if !synced { - log.Infof("Indexer is close to tip") + log.Info().Msgf("Indexer is close to tip") synced = true i.mu.Lock() i.synced = true i.mu.Unlock() } } else if synced { - log.Errorf("Indexer is not close to tip") + log.Error().Msgf("Indexer is not close to tip") synced = false i.mu.Lock() i.synced = false @@ -501,7 +501,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM return } hashes = append(hashes, hash) - log.Infof("Retrieved block %x at height %d", hash, height) + log.Info().Msgf("Retrieved block %x at height %d", hash, height) } if !bytes.Equal(lastIndexed.Hash, parent) { // NOTE(tav): If we've arrived at this state, it's effectively a fatal @@ -512,7 +512,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM // This will most likely only happen if we got corrupted block data when // we looked up the "genesis" block, i.e. the "root" block of the first // spork amongst the configured sporks. - log.Errorf( + log.Error().Msgf( "Unable to establish a hash chain from live spork root block to last indexed block %x at height %d: found %x as parent instead", lastIndexed.Hash, lastIndexed.Height, parent, ) @@ -534,7 +534,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM func (i *Indexer) initState() { accts, err := i.Store.Accounts() if err != nil { - log.Fatalf("Failed to load accounts from the index database: %s", err) + log.Fatal().Msgf("Failed to load accounts from the index database: %s", err) } i.accts = map[string]bool{} for acct, isProxy := range accts { @@ -542,7 +542,7 @@ func (i *Indexer) initState() { } i.feeAddr, err = hex.DecodeString(i.Chain.Contracts.FlowFees) if err != nil { - log.Fatalf( + log.Fatal().Msgf( "Invalid FlowFees contract address %q: %s", i.Chain.Contracts.FlowFees, err, ) @@ -571,9 +571,9 @@ func (i *Indexer) initStore(ctx context.Context) bool { return false } if err := i.Store.SetGenesis(genesis); err != nil { - log.Fatalf("Couldn't set the genesis block on the index database: %s", err) + log.Fatal().Msgf("Couldn't set the genesis block on the index database: %s", err) } - log.Infof("Genesis block set to block %x at height %d", genesis.Hash, genesis.Height) + log.Info().Msgf("Genesis block set to block %x at height %d", genesis.Hash, genesis.Height) i.lastIndexed = genesis i.liveRoot = i.findRootBlock(ctx, i.Chain.LatestSpork()) return i.liveRoot != nil @@ -625,7 +625,7 @@ func (i *Indexer) nextBackoff(d time.Duration) time.Duration { } func (i *Indexer) onBlockFinalized(f *hotstuff.Block) { - log.Infof( + log.Info().Msgf( "Got finalized block via consensus follower: %x (block timestamp: %s)", f.BlockID[:], time.UnixMilli(int64(f.Timestamp)).Format(time.RFC3339), ) @@ -636,32 +636,33 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { sporkDir := i.Chain.PathFor(spork.String()) i.downloadRootState(ctx, spork, sporkDir) dbDir := filepath.Join(sporkDir, "consensus") - opts := badger.DefaultOptions(dbDir).WithLogger(log.Badger{Prefix: "consensus"}) - db, err := badger.Open(opts) + + dbLog := log.With().Str("pebbledb", "consensusFollower").Logger() + db, err := pebble.SafeOpen(dbLog, dbDir) if err != nil { - log.Fatalf("Failed to open consensus database at %s: %s", dbDir, err) + log.Fatal().Msgf("Failed to open consensus database at %s: %s", dbDir, err) } - protocolDB := badgerimpl.ToDB(db) + protocolDB := pebbleimpl.ToDB(db) // Initialize a private key for joining the unstaked peer-to-peer network. // This can be ephemeral, so we generate a new one each time we start. seed := make([]byte, flowcrypto.KeyGenSeedMinLen) n, err := rand.Read(seed) if err != nil || n != flowcrypto.KeyGenSeedMinLen { - log.Fatalf("Could not generate seed for the consensus follower private key") + log.Fatal().Msgf("Could not generate seed for the consensus follower private key") } key, err := utils.GeneratePublicNetworkingKey(seed) if err != nil { - log.Fatalf("Could not generate the consensus follower private key") + log.Fatal().Msgf("Could not generate the consensus follower private key") } nodes := []follower.BootstrapNodeInfo{} for _, node := range spork.Consensus.SeedNodes { rawkey, err := hex.DecodeString(node.PublicKey) if err != nil { - log.Fatalf("Failed to hex decode the seed node key %q: %s", key, err) + log.Fatal().Msgf("Failed to hex decode the seed node key %q: %s", key, err) } pubkey, err := flowcrypto.DecodePublicKey(flowcrypto.ECDSAP256, rawkey) if err != nil { - log.Fatalf("Failed to decode the seed node key %q: %s", key, err) + log.Fatal().Msgf("Failed to decode the seed node key %q: %s", key, err) } nodes = append(nodes, follower.BootstrapNodeInfo{ Host: node.Host, @@ -688,7 +689,7 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { }), ) if err != nil { - log.Fatalf("Failed to create the consensus follower: %s", err) + log.Fatal().Msgf("Failed to create the consensus follower: %s", err) } i.consensus = protocolDB ctx, cancel := context.WithCancel(ctx) @@ -708,7 +709,7 @@ func (i *Indexer) scheduleJobs(ctx context.Context, startHeight uint64) { client := pool.Client() block, err := client.LatestFinalizedBlockHeader(ctx) if err != nil { - log.Errorf("Failed to fetch latest sealed block: %s", err) + log.Error().Msgf("Failed to fetch latest sealed block: %s", err) time.Sleep(time.Second) continue } @@ -722,7 +723,7 @@ func (i *Indexer) scheduleJobs(ctx context.Context, startHeight uint64) { for height := startHeight + 1; height <= block.Height; height++ { i.jobs <- height if height%200 == 0 { - log.Warnf("Added job to prefetch block %d", height) + log.Warn().Msgf("Added job to prefetch block %d", height) } } startHeight = block.Height @@ -733,18 +734,18 @@ func download(ctx context.Context, src string) []byte { for { req, err := http.NewRequestWithContext(ctx, "GET", src, nil) if err != nil { - log.Fatalf("Failed to create HTTP request to %s: %s", src, err) + log.Fatal().Msgf("Failed to create HTTP request to %s: %s", src, err) } resp, err := httpClient.Do(req) if err != nil { - log.Errorf("Failed to download %s: %s", src, err) + log.Error().Msgf("Failed to download %s: %s", src, err) time.Sleep(time.Second) continue } data, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { - log.Errorf("Failed to read data from %s: %s", src, err) + log.Error().Msgf("Failed to read data from %s: %s", src, err) time.Sleep(time.Second) continue } From 606b922cffaa84de519f983c42450d0e5d0db6ed Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Sep 2025 12:15:35 -0700 Subject: [PATCH 02/11] Use pebble DB backend for indexDB --- indexdb/indexdb.go | 463 ++++++++++++++++----------------------------- 1 file changed, 159 insertions(+), 304 deletions(-) diff --git a/indexdb/indexdb.go b/indexdb/indexdb.go index 2b1a3d4..0e3b76a 100644 --- a/indexdb/indexdb.go +++ b/indexdb/indexdb.go @@ -13,11 +13,15 @@ import ( "sort" "sync" - "github.com/dgraph-io/badger/v3" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/model" "github.com/onflow/rosetta/process" "github.com/onflow/rosetta/trace" + zerolog "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) @@ -61,7 +65,7 @@ type BlockData struct { // Store aggregates the blockchain data for Rosetta API calls. type Store struct { - db *badger.DB + db storage.DB genesis *model.BlockMeta latest *model.BlockMeta mu sync.RWMutex // protects genesis and latest @@ -77,45 +81,33 @@ func (s *Store) Accounts() (map[[8]byte]bool, error) { // If this ever becomes a bottleneck, we could add a method that returns // batches of accounts instead. accts := map[[8]byte]bool{} - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix := []byte("a") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - key := it.Item().Key() + accountPrefix := []byte("a") + err := operation.IterateKeys(s.db.Reader(), accountPrefix, accountPrefix, + operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { acct := [8]byte{} - copy(acct[:], key[1:9]) - accts[acct] = false - it.Next() - } - it = txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix = []byte("p") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - key := it.Item().Key() + copy(acct[:], keyCopy[1:9]) + accts[acct] = false // not a proxy account + return nil + }), storage.IteratorOption{}) + if err != nil { + return nil, fmt.Errorf("indexdb: failed to get all accounts: %w", err) + } + proxyAccountPrefix := []byte("p") + err = operation.IterateKeys(s.db.Reader(), proxyAccountPrefix, proxyAccountPrefix, + operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { acct := [8]byte{} - copy(acct[:], key[1:9]) + copy(acct[:], keyCopy[1:9]) if _, ok := accts[acct]; !ok { return fmt.Errorf( "indexdb: found proxy account %x that doesn't have an account balance", acct, ) } - accts[acct] = true - it.Next() - } - return nil - }) + accts[acct] = true // is a proxy account + return nil + }), storage.IteratorOption{}) if err != nil { - return nil, fmt.Errorf("indexdb: failed to get all accounts: %s", err) + return nil, fmt.Errorf("indexdb: failed to get all accounts: %w", err) } return accts, nil } @@ -124,53 +116,35 @@ func (s *Store) Accounts() (map[[8]byte]bool, error) { // corresponding account info. func (s *Store) AccountsInfo() (map[string]*AccountInfo, error) { data := map[string]*AccountInfo{} - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix := []byte("a") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - item := it.Item() - key := item.Key() - acct := hex.EncodeToString(key[1:9]) + accountPrefix := []byte("a") + err := operation.IterateKeys(s.db.Reader(), accountPrefix, accountPrefix, + func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { + acct := hex.EncodeToString(keyCopy[1:9]) info, ok := data[acct] if !ok { info = &AccountInfo{} data[acct] = info } - balance := uint64(0) - err := item.Value(func(val []byte) error { - balance = binary.BigEndian.Uint64(val) - return nil - }) + var balanceData []byte + err = getValue(&balanceData) if err != nil { - return err + return true, err } + balance := binary.BigEndian.Uint64(balanceData) info.Changes = append(info.Changes, [2]uint64{ - binary.BigEndian.Uint64(key[9:]), + binary.BigEndian.Uint64(keyCopy[9:]), balance, }) - it.Next() - } - return nil - }) + return false, nil + }, storage.IteratorOption{}) if err != nil { return nil, fmt.Errorf("indexdb: failed to process indexed accounts: %s", err) } - err = s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix := []byte("p") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - key := it.Item().Key() - acct := hex.EncodeToString(key[1:9]) + + proxyAccountPrefix := []byte("p") + err = operation.IterateKeys(s.db.Reader(), proxyAccountPrefix, proxyAccountPrefix, + operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { + acct := hex.EncodeToString(keyCopy[1:9]) info, ok := data[acct] if !ok { return fmt.Errorf( @@ -179,10 +153,8 @@ func (s *Store) AccountsInfo() (map[string]*AccountInfo, error) { ) } info.Proxy = true - it.Next() - } - return nil - }) + return nil + }), storage.IteratorOption{}) if err != nil { return nil, fmt.Errorf("indexdb: failed to process proxy accounts: %s", err) } @@ -283,21 +255,18 @@ func (s *Store) Genesis() *model.BlockMeta { return genesis.Clone() } genesis = &model.BlockMeta{} - err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte("genesis")) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - return proto.Unmarshal(val, genesis) - }) - }) + value, closer, err := s.db.Reader().Get([]byte("genesis")) if err != nil { - if err != badger.ErrKeyNotFound { + if err != storage.ErrNotFound { log.Fatalf("Failed to get genesis block from the index database: %s", err) } return nil } + defer closer.Close() + err = proto.Unmarshal(value, genesis) + if err != nil { + log.Fatalf("Failed to unmarshal genesis block from the index database: %s", err) + } s.mu.Lock() s.genesis = genesis s.mu.Unlock() @@ -311,25 +280,16 @@ func (s *Store) HasBalance(acct []byte, height uint64) (bool, error) { key[0] = 'a' copy(key[1:9], acct) binary.BigEndian.PutUint64(key[9:], height) - ok := false - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{ - Reverse: true, - }) - defer it.Close() - it.Seek(key) - if it.ValidForPrefix(key[:9]) { - ok = true - } - return nil - }) + r := s.db.Reader() + seeker := r.NewSeeker() + _, err := seeker.SeekLE(key[:9], key) if err != nil { return false, fmt.Errorf( "indexdb: failed to check balance existence for account %x at height %d: %s", acct, height, err, ) } - return ok, nil + return true, nil } // HashForHeight returns the block hash at the given height. @@ -338,51 +298,35 @@ func (s *Store) HashForHeight(height uint64) ([]byte, error) { heightEnc := make([]byte, 8) binary.BigEndian.PutUint64(heightEnc, height) key := append([]byte("d"), heightEnc...) - err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(key) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - hash = make([]byte, len(val)) - copy(hash, val) - return nil - }) - }) + value, closer, err := s.db.Reader().Get(key) if err != nil { - if err == badger.ErrKeyNotFound { + if err == storage.ErrNotFound { return nil, ErrBlockNotIndexed } return nil, fmt.Errorf( "indexdb: failed to get hash for height %d: %s", height, err, ) } + defer closer.Close() + hash = make([]byte, len(value)) + copy(hash, value) return hash, nil } // HeightForHash returns the block height for the given hash. func (s *Store) HeightForHash(hash []byte) (uint64, error) { - height := uint64(0) key := append([]byte("c"), hash...) - err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(key) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - height = binary.BigEndian.Uint64(val) - return nil - }) - }) + value, closer, err := s.db.Reader().Get(key) if err != nil { - if err == badger.ErrKeyNotFound { + if err == storage.ErrNotFound { return 0, ErrBlockNotIndexed } return 0, fmt.Errorf( "indexdb: failed to get height for hash %x: %s", hash, err, ) } - return height, nil + defer closer.Close() + return binary.BigEndian.Uint64(value), nil } // Index indexes the given block data. @@ -465,65 +409,45 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo if err != nil { log.Fatalf("Failed to encode model.BlockMeta: %s", err) } - err = s.db.Update(func(txn *badger.Txn) error { + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { for _, update := range updates { - it := txn.NewIterator(badger.IteratorOptions{ - Reverse: true, - }) - defer it.Close() - cur := int64(0) - it.Seek(update.key) - if it.ValidForPrefix(update.key[:9]) { - item := it.Item() - if bytes.Equal(item.Key(), update.key) { - it.Next() - if it.ValidForPrefix(update.key[:9]) { - err := item.Value(func(val []byte) error { - cur = int64(binary.BigEndian.Uint64(val)) - return nil - }) - if err != nil { - return err - } - } - } else { - err := item.Value(func(val []byte) error { - cur = int64(binary.BigEndian.Uint64(val)) - return nil - }) - if err != nil { - return err - } + newVal := int64(update.diff) + key, err := rbw.GlobalReader().NewSeeker().SeekLE(update.key[:9], update.key) + if err == nil { + value, closer, err := rbw.GlobalReader().Get(key) + if err != nil { } + existing := int64(binary.BigEndian.Uint64(value)) + closer.Close() + newVal += existing } - val := cur + update.diff - if val < 0 { + if newVal < 0 { return fmt.Errorf( "indexdb: invalid balance found for account %x: %d", - update.key[1:9], val, + update.key[1:9], newVal, ) } balance := make([]byte, 8) - binary.BigEndian.PutUint64(balance, uint64(val)) - if err := txn.Set(update.key, balance); err != nil { + binary.BigEndian.PutUint64(balance, uint64(newVal)) + if err := rbw.Writer().Set(update.key, balance); err != nil { return err } } for _, acct := range proxyAccts { - if err := txn.Set(acct, []byte("1")); err != nil { + if err := rbw.Writer().Set(acct, []byte("1")); err != nil { return err } } - if err := txn.Set(blockKey, blockValue); err != nil { + if err := rbw.Writer().Set(blockKey, blockValue); err != nil { return err } - if err := txn.Set(hash2heightKey, hval); err != nil { + if err := rbw.Writer().Set(hash2heightKey, hval); err != nil { return err } - if err := txn.Set(height2hashKey, hash); err != nil { + if err := rbw.Writer().Set(height2hashKey, hash); err != nil { return err } - if err := txn.Set([]byte("latest"), latestEnc); err != nil { + if err := rbw.Writer().Set([]byte("latest"), latestEnc); err != nil { return err } return nil @@ -548,21 +472,19 @@ func (s *Store) Latest() *model.BlockMeta { return latest.Clone() } latest = &model.BlockMeta{} - err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte("latest")) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - return proto.Unmarshal(val, latest) - }) - }) + + value, closer, err := s.db.Reader().Get([]byte("latest")) if err != nil { - if err != badger.ErrKeyNotFound { + if err != storage.ErrNotFound { log.Fatalf("Failed to get latest block from the index database: %s", err) } return nil } + defer closer.Close() + err = proto.Unmarshal(value, latest) + if err != nil { + log.Fatalf("Failed to unmarshal latest block from the index database: %s", err) + } s.mu.Lock() s.latest = latest s.mu.Unlock() @@ -573,54 +495,34 @@ func (s *Store) Latest() *model.BlockMeta { // creation heights from the index database. func (s *Store) PurgeProxyAccounts() { accts := map[string][]byte{} - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix := []byte("p") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - key := it.Item().KeyCopy(nil) - accts[string(key[1:9])] = key - it.Next() - } - return nil - }) + err := operation.IterateKeys(s.db.Reader(), []byte("p"), []byte("p"), operation.KeyOnlyIterateFunc( + func(keyCopy []byte) error { + accts[string(keyCopy[1:9])] = keyCopy + return nil + }), storage.IteratorOption{}) if err != nil { log.Fatalf("Failed to iterate over proxy keys: %s", err) } accountBalanceKeys := [][]byte{} - err = s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - prefix := []byte("a") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - key := it.Item().KeyCopy(nil) - _, isProxy := accts[string(key[1:9])] + err = operation.IterateKeys(s.db.Reader(), []byte("a"), []byte("a"), operation.KeyOnlyIterateFunc( + func(keyCopy []byte) error { + _, isProxy := accts[string(keyCopy[1:9])] if isProxy { - accountBalanceKeys = append(accountBalanceKeys, key) + accountBalanceKeys = append(accountBalanceKeys, keyCopy) } - it.Next() - } - return nil - }) + return nil + }), storage.IteratorOption{}) if err != nil { log.Fatalf("Failed to iterate over account balance keys: %s", err) } - err = s.db.Update(func(txn *badger.Txn) error { + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { for _, key := range accts { - if err := txn.Delete(key); err != nil { + if err := rbw.Writer().Delete(key); err != nil { return err } } for _, key := range accountBalanceKeys { - if err := txn.Delete(key); err != nil { + if err := rbw.Writer().Delete(key); err != nil { return err } } @@ -638,73 +540,42 @@ func (s *Store) ResetTo(base uint64) error { return nil } delKeys := [][]byte{} - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - prefix := []byte("a") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - item := it.Item() - key := item.Key() - height := binary.BigEndian.Uint64(key[9:]) + err := operation.IterateKeys(s.db.Reader(), []byte("a"), []byte("a"), operation.KeyOnlyIterateFunc( + func(keyCopy []byte) error { + height := binary.BigEndian.Uint64(keyCopy[9:]) if height > base { - delKeys = append(delKeys, item.KeyCopy(nil)) + delKeys = append(delKeys, keyCopy) } - it.Next() - } - it.Close() - return nil - }) + return nil + }), storage.IteratorOption{}) if err != nil { return fmt.Errorf("indexdb: failed to get account balance keys to delete: %s", err) } - err = s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - prefix := []byte("p") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - item := it.Item() - key := item.Key() - height := binary.BigEndian.Uint64(key[9:]) + err = operation.IterateKeys(s.db.Reader(), []byte("p"), []byte("p"), operation.KeyOnlyIterateFunc( + func(keyCopy []byte) error { + height := binary.BigEndian.Uint64(keyCopy[9:]) if height > base { - delKeys = append(delKeys, item.KeyCopy(nil)) + delKeys = append(delKeys, keyCopy) } - it.Next() - } - it.Close() - return nil - }) + return nil + }), storage.IteratorOption{}) if err != nil { return fmt.Errorf("indexdb: failed to get proxy account keys to delete: %s", err) } last := uint64(0) - err = s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - prefix := []byte("d") - it.Seek(prefix) - for { - if !it.ValidForPrefix(prefix) { - break - } - item := it.Item() - key := item.Key() - height := binary.BigEndian.Uint64(key[1:]) + err = operation.IterateKeys(s.db.Reader(), []byte("d"), []byte("d"), + func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { + height := binary.BigEndian.Uint64(keyCopy[1:]) if height > base { - key = item.KeyCopy(nil) - delKeys = append(delKeys, key) - key = make([]byte, 9) + delKeys = append(delKeys, keyCopy) + key := make([]byte, 9) key[0] = 'b' binary.BigEndian.PutUint64(key[1:], height) delKeys = append(delKeys, key) - hash, err := item.ValueCopy(nil) + var hash []byte + err := getValue(&hash) if err != nil { - it.Close() - return err + return true, err } key = make([]byte, len(hash)+1) key[0] = 'c' @@ -713,27 +584,22 @@ func (s *Store) ResetTo(base uint64) error { } else { last = height } - it.Next() - } - it.Close() - return nil - }) + return false, nil + }, storage.IteratorOption{}) if err != nil { return fmt.Errorf("indexdb: failed to get keys to delete: %s", err) } sort.Slice(delKeys, func(i, j int) bool { return bytes.Compare(delKeys[i], delKeys[j]) == -1 }) - batch := s.db.NewWriteBatch() - batch.SetMaxPendingTxns(1024) - for _, key := range delKeys { - err := batch.Delete(key) - if err != nil { - batch.Cancel() - return fmt.Errorf("indexdb: failed to delete keys: %s", err) + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + for _, key := range delKeys { + if err := rbw.Writer().Delete(key); err != nil { + return fmt.Errorf("indexdb: failed to delete keys: %s", err) + } } - } - err = batch.Flush() + return nil + }) if err != nil { return fmt.Errorf("indexdb: failed to flush the batched deletion: %s", err) } @@ -754,8 +620,8 @@ func (s *Store) ResetTo(base uint64) error { if err != nil { log.Fatalf("Failed to encode model.BlockMeta: %s", err) } - err = s.db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte("latest"), latestEnc) + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + return rbw.Writer().Set([]byte("latest"), latestEnc) }) if err != nil { return fmt.Errorf("indexdb: failed to set latest value: %s", err) @@ -784,20 +650,20 @@ func (s *Store) SetGenesis(val *model.BlockMeta) error { } hash2heightKey := append([]byte("c"), val.Hash...) height2hashKey := append([]byte("d"), hval...) - err = s.db.Update(func(txn *badger.Txn) error { - if err := txn.Set([]byte("genesis"), genesis); err != nil { + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + if err := rbw.Writer().Set([]byte("genesis"), genesis); err != nil { return err } - if err := txn.Set(blockKey, blockValue); err != nil { + if err := rbw.Writer().Set(blockKey, blockValue); err != nil { return err } - if err := txn.Set(hash2heightKey, hval); err != nil { + if err := rbw.Writer().Set(hash2heightKey, hval); err != nil { return err } - if err := txn.Set(height2hashKey, val.Hash); err != nil { + if err := rbw.Writer().Set(height2hashKey, val.Hash); err != nil { return err } - return txn.Set([]byte("latest"), genesis) + return rbw.Writer().Set([]byte("latest"), genesis) }) if err != nil { return fmt.Errorf("indexdb: failed to set genesis value: %s", err) @@ -815,25 +681,17 @@ func (s *Store) balanceByHeight(acct []byte, height uint64) (uint64, error) { key[0] = 'a' copy(key[1:9], acct) binary.BigEndian.PutUint64(key[9:], height) - err := s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{ - Reverse: true, - }) - defer it.Close() - it.Seek(key) - if !it.ValidForPrefix(key[:9]) { - return nil + latestAsOfHeight, err := s.db.Reader().NewSeeker().SeekLE(key[:9], key) + if err == nil { + value, closer, err := s.db.Reader().Get(latestAsOfHeight) + if err != nil { + return 0, fmt.Errorf( + "indexdb: failed to get balance for account %x at height %d: %s", + acct, height, err, + ) } - return it.Item().Value(func(val []byte) error { - balance = binary.BigEndian.Uint64(val) - return nil - }) - }) - if err != nil { - return 0, fmt.Errorf( - "indexdb: failed to get balance for account %x at height %d: %s", - acct, height, err, - ) + defer closer.Close() + balance = binary.BigEndian.Uint64(value) } return balance, nil } @@ -843,18 +701,15 @@ func (s *Store) blockByHeight(height uint64) (*model.IndexedBlock, error) { key := make([]byte, 9) key[0] = 'b' binary.BigEndian.PutUint64(key[1:], height) - err := s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(key) - if err != nil { - return err + value, closer, err := s.db.Reader().Get(key) + if err != nil { + if err == storage.ErrNotFound { + return nil, ErrBlockNotIndexed } - return item.Value(func(val []byte) error { - return proto.Unmarshal(val, block) - }) - }) - if err == badger.ErrKeyNotFound { - return nil, ErrBlockNotIndexed + return nil, fmt.Errorf("indexdb: failed to get block at height %d: %s", height, err) } + defer closer.Close() + err = proto.Unmarshal(value, block) return block, err } @@ -880,8 +735,8 @@ type accountUpdate struct { // New opens the database at the given directory and returns the corresponding // Store. func New(dir string) *Store { - opts := badger.DefaultOptions(dir).WithLogger(log.Badger{Prefix: "index"}) - db, err := badger.Open(opts) + dbLog := zerolog.With().Str("pebbledb", "index").Logger() + db, err := pebble.SafeOpen(dbLog, dir) if err != nil { log.Fatalf("Failed to open the index database at %s: %s", dir, err) } @@ -892,6 +747,6 @@ func New(dir string) *Store { } }) return &Store{ - db: db, + db: pebbleimpl.ToDB(db), } } From 334ae84df4e2d779a7237aa0779f402593435c78 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Sep 2025 12:18:17 -0700 Subject: [PATCH 03/11] Use zerolog in indexdb --- indexdb/indexdb.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/indexdb/indexdb.go b/indexdb/indexdb.go index 0e3b76a..3b9fa46 100644 --- a/indexdb/indexdb.go +++ b/indexdb/indexdb.go @@ -17,11 +17,10 @@ import ( "github.com/onflow/flow-go/storage/operation" "github.com/onflow/flow-go/storage/operation/pebbleimpl" "github.com/onflow/flow-go/storage/pebble" - "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/model" "github.com/onflow/rosetta/process" "github.com/onflow/rosetta/trace" - zerolog "github.com/rs/zerolog/log" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) @@ -234,15 +233,15 @@ func (s *Store) BlockByHeight(height uint64) (*BlockData, error) { func (s *Store) ExportAccounts(filename string) { data, err := s.AccountsInfo() if err != nil { - log.Fatalf("Failed to export accounts: %s", err) + log.Fatal().Msgf("Failed to export accounts: %s", err) } enc, err := json.Marshal(data) if err != nil { - log.Fatalf("Failed to encode data for export: %s", err) + log.Fatal().Msgf("Failed to encode data for export: %s", err) } err = os.WriteFile(filename, enc, 0o600) if err != nil { - log.Fatalf("Failed to write to %s: %s", filename, err) + log.Fatal().Msgf("Failed to write to %s: %s", filename, err) } } @@ -258,14 +257,14 @@ func (s *Store) Genesis() *model.BlockMeta { value, closer, err := s.db.Reader().Get([]byte("genesis")) if err != nil { if err != storage.ErrNotFound { - log.Fatalf("Failed to get genesis block from the index database: %s", err) + log.Fatal().Msgf("Failed to get genesis block from the index database: %s", err) } return nil } defer closer.Close() err = proto.Unmarshal(value, genesis) if err != nil { - log.Fatalf("Failed to unmarshal genesis block from the index database: %s", err) + log.Fatal().Msgf("Failed to unmarshal genesis block from the index database: %s", err) } s.mu.Lock() s.genesis = genesis @@ -335,11 +334,11 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo latest := s.latest s.mu.RUnlock() if latest.Height >= height { - log.Infof("Skipping re-indexing of previously indexed block at height %d", height) + log.Info().Msgf("Skipping re-indexing of previously indexed block at height %d", height) return nil } if height != latest.Height+1 { - log.Fatalf( + log.Fatal().Msgf( "Invalid index request for height %d (current height is %d)", height, latest.Height, ) @@ -373,7 +372,7 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo proxyAccts = append(proxyAccts, key) } default: - log.Fatalf( + log.Fatal().Msgf( "Unknown transaction operation type: %d (%s)", op.Type, op.Type, ) @@ -396,7 +395,7 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo blockKey := append([]byte("b"), hval...) blockValue, err := proto.Marshal(block) if err != nil { - log.Fatalf("Failed to encode model.IndexedBlock: %s", err) + log.Fatal().Msgf("Failed to encode model.IndexedBlock: %s", err) } hash2heightKey := append([]byte("c"), hash...) height2hashKey := append([]byte("d"), hval...) @@ -407,7 +406,7 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo } latestEnc, err := proto.Marshal(latest) if err != nil { - log.Fatalf("Failed to encode model.BlockMeta: %s", err) + log.Fatal().Msgf("Failed to encode model.BlockMeta: %s", err) } err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { for _, update := range updates { @@ -476,14 +475,14 @@ func (s *Store) Latest() *model.BlockMeta { value, closer, err := s.db.Reader().Get([]byte("latest")) if err != nil { if err != storage.ErrNotFound { - log.Fatalf("Failed to get latest block from the index database: %s", err) + log.Fatal().Msgf("Failed to get latest block from the index database: %s", err) } return nil } defer closer.Close() err = proto.Unmarshal(value, latest) if err != nil { - log.Fatalf("Failed to unmarshal latest block from the index database: %s", err) + log.Fatal().Msgf("Failed to unmarshal latest block from the index database: %s", err) } s.mu.Lock() s.latest = latest @@ -501,7 +500,7 @@ func (s *Store) PurgeProxyAccounts() { return nil }), storage.IteratorOption{}) if err != nil { - log.Fatalf("Failed to iterate over proxy keys: %s", err) + log.Fatal().Msgf("Failed to iterate over proxy keys: %s", err) } accountBalanceKeys := [][]byte{} err = operation.IterateKeys(s.db.Reader(), []byte("a"), []byte("a"), operation.KeyOnlyIterateFunc( @@ -513,7 +512,7 @@ func (s *Store) PurgeProxyAccounts() { return nil }), storage.IteratorOption{}) if err != nil { - log.Fatalf("Failed to iterate over account balance keys: %s", err) + log.Fatal().Msgf("Failed to iterate over account balance keys: %s", err) } err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { for _, key := range accts { @@ -529,7 +528,7 @@ func (s *Store) PurgeProxyAccounts() { return nil }) if err != nil { - log.Fatalf("Failed to purge proxy accounts data: %s", err) + log.Fatal().Msgf("Failed to purge proxy accounts data: %s", err) } } @@ -618,7 +617,7 @@ func (s *Store) ResetTo(base uint64) error { } latestEnc, err := proto.Marshal(latest) if err != nil { - log.Fatalf("Failed to encode model.BlockMeta: %s", err) + log.Fatal().Msgf("Failed to encode model.BlockMeta: %s", err) } err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { return rbw.Writer().Set([]byte("latest"), latestEnc) @@ -646,7 +645,7 @@ func (s *Store) SetGenesis(val *model.BlockMeta) error { Timestamp: val.Timestamp, }) if err != nil { - log.Fatalf("Failed to encode model.IndexedBlock: %s", err) + log.Fatal().Msgf("Failed to encode model.IndexedBlock: %s", err) } hash2heightKey := append([]byte("c"), val.Hash...) height2hashKey := append([]byte("d"), hval...) @@ -735,15 +734,15 @@ type accountUpdate struct { // New opens the database at the given directory and returns the corresponding // Store. func New(dir string) *Store { - dbLog := zerolog.With().Str("pebbledb", "index").Logger() + dbLog := log.With().Str("pebbledb", "index").Logger() db, err := pebble.SafeOpen(dbLog, dir) if err != nil { - log.Fatalf("Failed to open the index database at %s: %s", dir, err) + log.Fatal().Msgf("Failed to open the index database at %s: %s", dir, err) } process.SetExitHandler(func() { - log.Infof("Closing the index database") + log.Info().Msgf("Closing the index database") if err := db.Close(); err != nil { - log.Errorf("Got error closing the index database: %s", err) + log.Error().Msgf("Got error closing the index database: %s", err) } }) return &Store{ From f08396e367e7fc13889b280fd2c4d9264a338adc Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 12 Sep 2025 12:22:56 -0700 Subject: [PATCH 04/11] Use storage with badger backend for cache Pebble does not support the `DropAll` method. --- cache/cache.go | 55 +++++++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 13b077c..eb78e19 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -3,12 +3,17 @@ package cache import ( "context" + "errors" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "github.com/dgraph-io/badger/v3" "github.com/golang/protobuf/proto" - "github.com/onflow/rosetta/log" + rosettalog "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/process" "github.com/onflow/rosetta/trace" "google.golang.org/grpc" @@ -43,12 +48,13 @@ var nonIdempotent = map[string]bool{ // The key for each entry is made up by hashing together the request method and // message using BLAKE3. And the value is the protobuf-encoded response value. type Store struct { - db *badger.DB + db storage.DB + badger *badger.DB // the underlying badger DB is retained only for the DropAll method. } // DropAll drops all data stored in the underlying cache database. func (s *Store) DropAll() error { - return s.db.DropAll() + return s.badger.DropAll() } // InterceptUnary implements the gRPC middleware for caching certain Access API @@ -80,13 +86,13 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte // different binary versions. enc, err := proto.Marshal(req.(proto.Message)) if err != nil { - log.Errorf("Failed to encode the gRPC request for caching: %s", err) + log.Error().Msgf("Failed to encode the gRPC request for caching: %s", err) cacheMiss.Add(ctx, 1, mOpt) return invoker(ctx, method, req, res, cc, opts...) } hash, err := getHash(method, enc) if err != nil { - log.Errorf("Failed to hash the gRPC request for caching: %s", err) + log.Error().Msgf("Failed to hash the gRPC request for caching: %s", err) cacheMiss.Add(ctx, 1, mOpt) return invoker(ctx, method, req, res, cc, opts...) } @@ -96,22 +102,16 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte default: } _, span := trace.NewSpan(ctx, "flow.access_api.cache.Lookup") - err = s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(hash) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - return proto.Unmarshal(val, res.(proto.Message)) - }) - }) + item, closer, err := s.db.Reader().Get(hash) if err == nil { + err = proto.Unmarshal(item, res.(proto.Message)) + closer.Close() trace.EndSpanOk(span) if debug { if callerID == "" { - log.Infof("+ Using cached Access API response for %s", method) + log.Info().Msgf("+ Using cached Access API response for %s", method) } else { - log.Infof( + log.Info().Msgf( "+ Using cached Access API response for %s (%s)", method, callerID, ) @@ -120,8 +120,8 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte cacheHit.Add(ctx, 1, mOpt) return nil } - if err != badger.ErrKeyNotFound { - log.Errorf("Got unexpected error when decoding gRPC response for caching: %s", err) + if err != storage.ErrNotFound { + log.Error().Msgf("Got unexpected error when decoding gRPC response for caching: %s", err) trace.EndSpanErr(span, err) } else { span.End() @@ -138,7 +138,7 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte trace.EndSpanOk(span) val, err := proto.Marshal(res.(proto.Message)) if err != nil { - log.Fatalf("Failed to encode gRPC response for caching: %s", err) + log.Fatal().Msgf("Failed to encode gRPC response for caching: %s", err) } select { case <-ctx.Done(): @@ -146,11 +146,11 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte default: } _, span = trace.NewSpan(ctx, "flow.access_api.cache.Store") - err = s.db.Update(func(txn *badger.Txn) error { - return txn.Set(hash, val) + err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + return rbw.Writer().Set(hash, val) }) if err != nil { - log.Errorf("Got unexpected error when persisting gRPC response for caching: %s", err) + log.Error().Msgf("Got unexpected error when persisting gRPC response for caching: %s", err) trace.EndSpanErr(span, err) } else { trace.EndSpanOk(span) @@ -170,19 +170,20 @@ func Context(parent context.Context, callerID string) context.Context { // New opens the database at the given directory and returns the corresponding // Store. func New(dir string) *Store { - opts := badger.DefaultOptions(dir).WithLogger(log.Badger{Prefix: "cache"}) + opts := badger.DefaultOptions(dir).WithLogger(rosettalog.Badger{Prefix: "cache"}) db, err := badger.Open(opts) if err != nil { - log.Fatalf("Failed to open the cache database at %s: %s", dir, err) + log.Fatal().Msgf("Failed to open the cache database at %s: %s", dir, err) } process.SetExitHandler(func() { - log.Infof("Closing the cache database") + log.Info().Msgf("Closing the cache database") if err := db.Close(); err != nil { - log.Errorf("Got error closing the cache database: %s", err) + log.Error().Msgf("Got error closing the cache database: %s", err) } }) return &Store{ - db: db, + db: badgerimpl.ToDB(db), + badger: db, } } From 381da8478b9fae9fd39b0b05c2148cedef3d2ffd Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 10:25:21 -0700 Subject: [PATCH 05/11] Revert storage layer updates for cache and indexDB --- cache/cache.go | 55 +++-- indexdb/indexdb.go | 500 +++++++++++++++++++++++++++++---------------- 2 files changed, 350 insertions(+), 205 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index eb78e19..13b077c 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -3,17 +3,12 @@ package cache import ( "context" - "errors" - - "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/operation/badgerimpl" - "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "github.com/dgraph-io/badger/v3" "github.com/golang/protobuf/proto" - rosettalog "github.com/onflow/rosetta/log" + "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/process" "github.com/onflow/rosetta/trace" "google.golang.org/grpc" @@ -48,13 +43,12 @@ var nonIdempotent = map[string]bool{ // The key for each entry is made up by hashing together the request method and // message using BLAKE3. And the value is the protobuf-encoded response value. type Store struct { - db storage.DB - badger *badger.DB // the underlying badger DB is retained only for the DropAll method. + db *badger.DB } // DropAll drops all data stored in the underlying cache database. func (s *Store) DropAll() error { - return s.badger.DropAll() + return s.db.DropAll() } // InterceptUnary implements the gRPC middleware for caching certain Access API @@ -86,13 +80,13 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte // different binary versions. enc, err := proto.Marshal(req.(proto.Message)) if err != nil { - log.Error().Msgf("Failed to encode the gRPC request for caching: %s", err) + log.Errorf("Failed to encode the gRPC request for caching: %s", err) cacheMiss.Add(ctx, 1, mOpt) return invoker(ctx, method, req, res, cc, opts...) } hash, err := getHash(method, enc) if err != nil { - log.Error().Msgf("Failed to hash the gRPC request for caching: %s", err) + log.Errorf("Failed to hash the gRPC request for caching: %s", err) cacheMiss.Add(ctx, 1, mOpt) return invoker(ctx, method, req, res, cc, opts...) } @@ -102,16 +96,22 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte default: } _, span := trace.NewSpan(ctx, "flow.access_api.cache.Lookup") - item, closer, err := s.db.Reader().Get(hash) + err = s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(hash) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return proto.Unmarshal(val, res.(proto.Message)) + }) + }) if err == nil { - err = proto.Unmarshal(item, res.(proto.Message)) - closer.Close() trace.EndSpanOk(span) if debug { if callerID == "" { - log.Info().Msgf("+ Using cached Access API response for %s", method) + log.Infof("+ Using cached Access API response for %s", method) } else { - log.Info().Msgf( + log.Infof( "+ Using cached Access API response for %s (%s)", method, callerID, ) @@ -120,8 +120,8 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte cacheHit.Add(ctx, 1, mOpt) return nil } - if err != storage.ErrNotFound { - log.Error().Msgf("Got unexpected error when decoding gRPC response for caching: %s", err) + if err != badger.ErrKeyNotFound { + log.Errorf("Got unexpected error when decoding gRPC response for caching: %s", err) trace.EndSpanErr(span, err) } else { span.End() @@ -138,7 +138,7 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte trace.EndSpanOk(span) val, err := proto.Marshal(res.(proto.Message)) if err != nil { - log.Fatal().Msgf("Failed to encode gRPC response for caching: %s", err) + log.Fatalf("Failed to encode gRPC response for caching: %s", err) } select { case <-ctx.Done(): @@ -146,11 +146,11 @@ func (s *Store) InterceptUnary(ctx context.Context, method string, req, res inte default: } _, span = trace.NewSpan(ctx, "flow.access_api.cache.Store") - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { - return rbw.Writer().Set(hash, val) + err = s.db.Update(func(txn *badger.Txn) error { + return txn.Set(hash, val) }) if err != nil { - log.Error().Msgf("Got unexpected error when persisting gRPC response for caching: %s", err) + log.Errorf("Got unexpected error when persisting gRPC response for caching: %s", err) trace.EndSpanErr(span, err) } else { trace.EndSpanOk(span) @@ -170,20 +170,19 @@ func Context(parent context.Context, callerID string) context.Context { // New opens the database at the given directory and returns the corresponding // Store. func New(dir string) *Store { - opts := badger.DefaultOptions(dir).WithLogger(rosettalog.Badger{Prefix: "cache"}) + opts := badger.DefaultOptions(dir).WithLogger(log.Badger{Prefix: "cache"}) db, err := badger.Open(opts) if err != nil { - log.Fatal().Msgf("Failed to open the cache database at %s: %s", dir, err) + log.Fatalf("Failed to open the cache database at %s: %s", dir, err) } process.SetExitHandler(func() { - log.Info().Msgf("Closing the cache database") + log.Infof("Closing the cache database") if err := db.Close(); err != nil { - log.Error().Msgf("Got error closing the cache database: %s", err) + log.Errorf("Got error closing the cache database: %s", err) } }) return &Store{ - db: badgerimpl.ToDB(db), - badger: db, + db: db, } } diff --git a/indexdb/indexdb.go b/indexdb/indexdb.go index 3b9fa46..2b1a3d4 100644 --- a/indexdb/indexdb.go +++ b/indexdb/indexdb.go @@ -13,14 +13,11 @@ import ( "sort" "sync" - "github.com/onflow/flow-go/storage" - "github.com/onflow/flow-go/storage/operation" - "github.com/onflow/flow-go/storage/operation/pebbleimpl" - "github.com/onflow/flow-go/storage/pebble" + "github.com/dgraph-io/badger/v3" + "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/model" "github.com/onflow/rosetta/process" "github.com/onflow/rosetta/trace" - "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) @@ -64,7 +61,7 @@ type BlockData struct { // Store aggregates the blockchain data for Rosetta API calls. type Store struct { - db storage.DB + db *badger.DB genesis *model.BlockMeta latest *model.BlockMeta mu sync.RWMutex // protects genesis and latest @@ -80,33 +77,45 @@ func (s *Store) Accounts() (map[[8]byte]bool, error) { // If this ever becomes a bottleneck, we could add a method that returns // batches of accounts instead. accts := map[[8]byte]bool{} - accountPrefix := []byte("a") - err := operation.IterateKeys(s.db.Reader(), accountPrefix, accountPrefix, - operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix := []byte("a") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + key := it.Item().Key() acct := [8]byte{} - copy(acct[:], keyCopy[1:9]) - accts[acct] = false // not a proxy account - return nil - }), storage.IteratorOption{}) - if err != nil { - return nil, fmt.Errorf("indexdb: failed to get all accounts: %w", err) - } - proxyAccountPrefix := []byte("p") - err = operation.IterateKeys(s.db.Reader(), proxyAccountPrefix, proxyAccountPrefix, - operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { + copy(acct[:], key[1:9]) + accts[acct] = false + it.Next() + } + it = txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix = []byte("p") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + key := it.Item().Key() acct := [8]byte{} - copy(acct[:], keyCopy[1:9]) + copy(acct[:], key[1:9]) if _, ok := accts[acct]; !ok { return fmt.Errorf( "indexdb: found proxy account %x that doesn't have an account balance", acct, ) } - accts[acct] = true // is a proxy account - return nil - }), storage.IteratorOption{}) + accts[acct] = true + it.Next() + } + return nil + }) if err != nil { - return nil, fmt.Errorf("indexdb: failed to get all accounts: %w", err) + return nil, fmt.Errorf("indexdb: failed to get all accounts: %s", err) } return accts, nil } @@ -115,35 +124,53 @@ func (s *Store) Accounts() (map[[8]byte]bool, error) { // corresponding account info. func (s *Store) AccountsInfo() (map[string]*AccountInfo, error) { data := map[string]*AccountInfo{} - accountPrefix := []byte("a") - err := operation.IterateKeys(s.db.Reader(), accountPrefix, accountPrefix, - func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { - acct := hex.EncodeToString(keyCopy[1:9]) + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix := []byte("a") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + item := it.Item() + key := item.Key() + acct := hex.EncodeToString(key[1:9]) info, ok := data[acct] if !ok { info = &AccountInfo{} data[acct] = info } - var balanceData []byte - err = getValue(&balanceData) + balance := uint64(0) + err := item.Value(func(val []byte) error { + balance = binary.BigEndian.Uint64(val) + return nil + }) if err != nil { - return true, err + return err } - balance := binary.BigEndian.Uint64(balanceData) info.Changes = append(info.Changes, [2]uint64{ - binary.BigEndian.Uint64(keyCopy[9:]), + binary.BigEndian.Uint64(key[9:]), balance, }) - return false, nil - }, storage.IteratorOption{}) + it.Next() + } + return nil + }) if err != nil { return nil, fmt.Errorf("indexdb: failed to process indexed accounts: %s", err) } - - proxyAccountPrefix := []byte("p") - err = operation.IterateKeys(s.db.Reader(), proxyAccountPrefix, proxyAccountPrefix, - operation.KeyOnlyIterateFunc(func(keyCopy []byte) error { - acct := hex.EncodeToString(keyCopy[1:9]) + err = s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix := []byte("p") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + key := it.Item().Key() + acct := hex.EncodeToString(key[1:9]) info, ok := data[acct] if !ok { return fmt.Errorf( @@ -152,8 +179,10 @@ func (s *Store) AccountsInfo() (map[string]*AccountInfo, error) { ) } info.Proxy = true - return nil - }), storage.IteratorOption{}) + it.Next() + } + return nil + }) if err != nil { return nil, fmt.Errorf("indexdb: failed to process proxy accounts: %s", err) } @@ -233,15 +262,15 @@ func (s *Store) BlockByHeight(height uint64) (*BlockData, error) { func (s *Store) ExportAccounts(filename string) { data, err := s.AccountsInfo() if err != nil { - log.Fatal().Msgf("Failed to export accounts: %s", err) + log.Fatalf("Failed to export accounts: %s", err) } enc, err := json.Marshal(data) if err != nil { - log.Fatal().Msgf("Failed to encode data for export: %s", err) + log.Fatalf("Failed to encode data for export: %s", err) } err = os.WriteFile(filename, enc, 0o600) if err != nil { - log.Fatal().Msgf("Failed to write to %s: %s", filename, err) + log.Fatalf("Failed to write to %s: %s", filename, err) } } @@ -254,18 +283,21 @@ func (s *Store) Genesis() *model.BlockMeta { return genesis.Clone() } genesis = &model.BlockMeta{} - value, closer, err := s.db.Reader().Get([]byte("genesis")) + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte("genesis")) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return proto.Unmarshal(val, genesis) + }) + }) if err != nil { - if err != storage.ErrNotFound { - log.Fatal().Msgf("Failed to get genesis block from the index database: %s", err) + if err != badger.ErrKeyNotFound { + log.Fatalf("Failed to get genesis block from the index database: %s", err) } return nil } - defer closer.Close() - err = proto.Unmarshal(value, genesis) - if err != nil { - log.Fatal().Msgf("Failed to unmarshal genesis block from the index database: %s", err) - } s.mu.Lock() s.genesis = genesis s.mu.Unlock() @@ -279,16 +311,25 @@ func (s *Store) HasBalance(acct []byte, height uint64) (bool, error) { key[0] = 'a' copy(key[1:9], acct) binary.BigEndian.PutUint64(key[9:], height) - r := s.db.Reader() - seeker := r.NewSeeker() - _, err := seeker.SeekLE(key[:9], key) + ok := false + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{ + Reverse: true, + }) + defer it.Close() + it.Seek(key) + if it.ValidForPrefix(key[:9]) { + ok = true + } + return nil + }) if err != nil { return false, fmt.Errorf( "indexdb: failed to check balance existence for account %x at height %d: %s", acct, height, err, ) } - return true, nil + return ok, nil } // HashForHeight returns the block hash at the given height. @@ -297,35 +338,51 @@ func (s *Store) HashForHeight(height uint64) ([]byte, error) { heightEnc := make([]byte, 8) binary.BigEndian.PutUint64(heightEnc, height) key := append([]byte("d"), heightEnc...) - value, closer, err := s.db.Reader().Get(key) + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + hash = make([]byte, len(val)) + copy(hash, val) + return nil + }) + }) if err != nil { - if err == storage.ErrNotFound { + if err == badger.ErrKeyNotFound { return nil, ErrBlockNotIndexed } return nil, fmt.Errorf( "indexdb: failed to get hash for height %d: %s", height, err, ) } - defer closer.Close() - hash = make([]byte, len(value)) - copy(hash, value) return hash, nil } // HeightForHash returns the block height for the given hash. func (s *Store) HeightForHash(hash []byte) (uint64, error) { + height := uint64(0) key := append([]byte("c"), hash...) - value, closer, err := s.db.Reader().Get(key) + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + height = binary.BigEndian.Uint64(val) + return nil + }) + }) if err != nil { - if err == storage.ErrNotFound { + if err == badger.ErrKeyNotFound { return 0, ErrBlockNotIndexed } return 0, fmt.Errorf( "indexdb: failed to get height for hash %x: %s", hash, err, ) } - defer closer.Close() - return binary.BigEndian.Uint64(value), nil + return height, nil } // Index indexes the given block data. @@ -334,11 +391,11 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo latest := s.latest s.mu.RUnlock() if latest.Height >= height { - log.Info().Msgf("Skipping re-indexing of previously indexed block at height %d", height) + log.Infof("Skipping re-indexing of previously indexed block at height %d", height) return nil } if height != latest.Height+1 { - log.Fatal().Msgf( + log.Fatalf( "Invalid index request for height %d (current height is %d)", height, latest.Height, ) @@ -372,7 +429,7 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo proxyAccts = append(proxyAccts, key) } default: - log.Fatal().Msgf( + log.Fatalf( "Unknown transaction operation type: %d (%s)", op.Type, op.Type, ) @@ -395,7 +452,7 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo blockKey := append([]byte("b"), hval...) blockValue, err := proto.Marshal(block) if err != nil { - log.Fatal().Msgf("Failed to encode model.IndexedBlock: %s", err) + log.Fatalf("Failed to encode model.IndexedBlock: %s", err) } hash2heightKey := append([]byte("c"), hash...) height2hashKey := append([]byte("d"), hval...) @@ -406,47 +463,67 @@ func (s *Store) Index(ctx context.Context, height uint64, hash []byte, block *mo } latestEnc, err := proto.Marshal(latest) if err != nil { - log.Fatal().Msgf("Failed to encode model.BlockMeta: %s", err) + log.Fatalf("Failed to encode model.BlockMeta: %s", err) } - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + err = s.db.Update(func(txn *badger.Txn) error { for _, update := range updates { - newVal := int64(update.diff) - key, err := rbw.GlobalReader().NewSeeker().SeekLE(update.key[:9], update.key) - if err == nil { - value, closer, err := rbw.GlobalReader().Get(key) - if err != nil { + it := txn.NewIterator(badger.IteratorOptions{ + Reverse: true, + }) + defer it.Close() + cur := int64(0) + it.Seek(update.key) + if it.ValidForPrefix(update.key[:9]) { + item := it.Item() + if bytes.Equal(item.Key(), update.key) { + it.Next() + if it.ValidForPrefix(update.key[:9]) { + err := item.Value(func(val []byte) error { + cur = int64(binary.BigEndian.Uint64(val)) + return nil + }) + if err != nil { + return err + } + } + } else { + err := item.Value(func(val []byte) error { + cur = int64(binary.BigEndian.Uint64(val)) + return nil + }) + if err != nil { + return err + } } - existing := int64(binary.BigEndian.Uint64(value)) - closer.Close() - newVal += existing } - if newVal < 0 { + val := cur + update.diff + if val < 0 { return fmt.Errorf( "indexdb: invalid balance found for account %x: %d", - update.key[1:9], newVal, + update.key[1:9], val, ) } balance := make([]byte, 8) - binary.BigEndian.PutUint64(balance, uint64(newVal)) - if err := rbw.Writer().Set(update.key, balance); err != nil { + binary.BigEndian.PutUint64(balance, uint64(val)) + if err := txn.Set(update.key, balance); err != nil { return err } } for _, acct := range proxyAccts { - if err := rbw.Writer().Set(acct, []byte("1")); err != nil { + if err := txn.Set(acct, []byte("1")); err != nil { return err } } - if err := rbw.Writer().Set(blockKey, blockValue); err != nil { + if err := txn.Set(blockKey, blockValue); err != nil { return err } - if err := rbw.Writer().Set(hash2heightKey, hval); err != nil { + if err := txn.Set(hash2heightKey, hval); err != nil { return err } - if err := rbw.Writer().Set(height2hashKey, hash); err != nil { + if err := txn.Set(height2hashKey, hash); err != nil { return err } - if err := rbw.Writer().Set([]byte("latest"), latestEnc); err != nil { + if err := txn.Set([]byte("latest"), latestEnc); err != nil { return err } return nil @@ -471,19 +548,21 @@ func (s *Store) Latest() *model.BlockMeta { return latest.Clone() } latest = &model.BlockMeta{} - - value, closer, err := s.db.Reader().Get([]byte("latest")) + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte("latest")) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return proto.Unmarshal(val, latest) + }) + }) if err != nil { - if err != storage.ErrNotFound { - log.Fatal().Msgf("Failed to get latest block from the index database: %s", err) + if err != badger.ErrKeyNotFound { + log.Fatalf("Failed to get latest block from the index database: %s", err) } return nil } - defer closer.Close() - err = proto.Unmarshal(value, latest) - if err != nil { - log.Fatal().Msgf("Failed to unmarshal latest block from the index database: %s", err) - } s.mu.Lock() s.latest = latest s.mu.Unlock() @@ -494,41 +573,61 @@ func (s *Store) Latest() *model.BlockMeta { // creation heights from the index database. func (s *Store) PurgeProxyAccounts() { accts := map[string][]byte{} - err := operation.IterateKeys(s.db.Reader(), []byte("p"), []byte("p"), operation.KeyOnlyIterateFunc( - func(keyCopy []byte) error { - accts[string(keyCopy[1:9])] = keyCopy - return nil - }), storage.IteratorOption{}) + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix := []byte("p") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + key := it.Item().KeyCopy(nil) + accts[string(key[1:9])] = key + it.Next() + } + return nil + }) if err != nil { - log.Fatal().Msgf("Failed to iterate over proxy keys: %s", err) + log.Fatalf("Failed to iterate over proxy keys: %s", err) } accountBalanceKeys := [][]byte{} - err = operation.IterateKeys(s.db.Reader(), []byte("a"), []byte("a"), operation.KeyOnlyIterateFunc( - func(keyCopy []byte) error { - _, isProxy := accts[string(keyCopy[1:9])] + err = s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + defer it.Close() + prefix := []byte("a") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + key := it.Item().KeyCopy(nil) + _, isProxy := accts[string(key[1:9])] if isProxy { - accountBalanceKeys = append(accountBalanceKeys, keyCopy) + accountBalanceKeys = append(accountBalanceKeys, key) } - return nil - }), storage.IteratorOption{}) + it.Next() + } + return nil + }) if err != nil { - log.Fatal().Msgf("Failed to iterate over account balance keys: %s", err) + log.Fatalf("Failed to iterate over account balance keys: %s", err) } - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { + err = s.db.Update(func(txn *badger.Txn) error { for _, key := range accts { - if err := rbw.Writer().Delete(key); err != nil { + if err := txn.Delete(key); err != nil { return err } } for _, key := range accountBalanceKeys { - if err := rbw.Writer().Delete(key); err != nil { + if err := txn.Delete(key); err != nil { return err } } return nil }) if err != nil { - log.Fatal().Msgf("Failed to purge proxy accounts data: %s", err) + log.Fatalf("Failed to purge proxy accounts data: %s", err) } } @@ -539,42 +638,73 @@ func (s *Store) ResetTo(base uint64) error { return nil } delKeys := [][]byte{} - err := operation.IterateKeys(s.db.Reader(), []byte("a"), []byte("a"), operation.KeyOnlyIterateFunc( - func(keyCopy []byte) error { - height := binary.BigEndian.Uint64(keyCopy[9:]) + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + prefix := []byte("a") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + item := it.Item() + key := item.Key() + height := binary.BigEndian.Uint64(key[9:]) if height > base { - delKeys = append(delKeys, keyCopy) + delKeys = append(delKeys, item.KeyCopy(nil)) } - return nil - }), storage.IteratorOption{}) + it.Next() + } + it.Close() + return nil + }) if err != nil { return fmt.Errorf("indexdb: failed to get account balance keys to delete: %s", err) } - err = operation.IterateKeys(s.db.Reader(), []byte("p"), []byte("p"), operation.KeyOnlyIterateFunc( - func(keyCopy []byte) error { - height := binary.BigEndian.Uint64(keyCopy[9:]) + err = s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + prefix := []byte("p") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + item := it.Item() + key := item.Key() + height := binary.BigEndian.Uint64(key[9:]) if height > base { - delKeys = append(delKeys, keyCopy) + delKeys = append(delKeys, item.KeyCopy(nil)) } - return nil - }), storage.IteratorOption{}) + it.Next() + } + it.Close() + return nil + }) if err != nil { return fmt.Errorf("indexdb: failed to get proxy account keys to delete: %s", err) } last := uint64(0) - err = operation.IterateKeys(s.db.Reader(), []byte("d"), []byte("d"), - func(keyCopy []byte, getValue func(destVal any) error) (bail bool, err error) { - height := binary.BigEndian.Uint64(keyCopy[1:]) + err = s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{}) + prefix := []byte("d") + it.Seek(prefix) + for { + if !it.ValidForPrefix(prefix) { + break + } + item := it.Item() + key := item.Key() + height := binary.BigEndian.Uint64(key[1:]) if height > base { - delKeys = append(delKeys, keyCopy) - key := make([]byte, 9) + key = item.KeyCopy(nil) + delKeys = append(delKeys, key) + key = make([]byte, 9) key[0] = 'b' binary.BigEndian.PutUint64(key[1:], height) delKeys = append(delKeys, key) - var hash []byte - err := getValue(&hash) + hash, err := item.ValueCopy(nil) if err != nil { - return true, err + it.Close() + return err } key = make([]byte, len(hash)+1) key[0] = 'c' @@ -583,22 +713,27 @@ func (s *Store) ResetTo(base uint64) error { } else { last = height } - return false, nil - }, storage.IteratorOption{}) + it.Next() + } + it.Close() + return nil + }) if err != nil { return fmt.Errorf("indexdb: failed to get keys to delete: %s", err) } sort.Slice(delKeys, func(i, j int) bool { return bytes.Compare(delKeys[i], delKeys[j]) == -1 }) - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { - for _, key := range delKeys { - if err := rbw.Writer().Delete(key); err != nil { - return fmt.Errorf("indexdb: failed to delete keys: %s", err) - } + batch := s.db.NewWriteBatch() + batch.SetMaxPendingTxns(1024) + for _, key := range delKeys { + err := batch.Delete(key) + if err != nil { + batch.Cancel() + return fmt.Errorf("indexdb: failed to delete keys: %s", err) } - return nil - }) + } + err = batch.Flush() if err != nil { return fmt.Errorf("indexdb: failed to flush the batched deletion: %s", err) } @@ -617,10 +752,10 @@ func (s *Store) ResetTo(base uint64) error { } latestEnc, err := proto.Marshal(latest) if err != nil { - log.Fatal().Msgf("Failed to encode model.BlockMeta: %s", err) + log.Fatalf("Failed to encode model.BlockMeta: %s", err) } - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { - return rbw.Writer().Set([]byte("latest"), latestEnc) + err = s.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte("latest"), latestEnc) }) if err != nil { return fmt.Errorf("indexdb: failed to set latest value: %s", err) @@ -645,24 +780,24 @@ func (s *Store) SetGenesis(val *model.BlockMeta) error { Timestamp: val.Timestamp, }) if err != nil { - log.Fatal().Msgf("Failed to encode model.IndexedBlock: %s", err) + log.Fatalf("Failed to encode model.IndexedBlock: %s", err) } hash2heightKey := append([]byte("c"), val.Hash...) height2hashKey := append([]byte("d"), hval...) - err = s.db.WithReaderBatchWriter(func(rbw storage.ReaderBatchWriter) error { - if err := rbw.Writer().Set([]byte("genesis"), genesis); err != nil { + err = s.db.Update(func(txn *badger.Txn) error { + if err := txn.Set([]byte("genesis"), genesis); err != nil { return err } - if err := rbw.Writer().Set(blockKey, blockValue); err != nil { + if err := txn.Set(blockKey, blockValue); err != nil { return err } - if err := rbw.Writer().Set(hash2heightKey, hval); err != nil { + if err := txn.Set(hash2heightKey, hval); err != nil { return err } - if err := rbw.Writer().Set(height2hashKey, val.Hash); err != nil { + if err := txn.Set(height2hashKey, val.Hash); err != nil { return err } - return rbw.Writer().Set([]byte("latest"), genesis) + return txn.Set([]byte("latest"), genesis) }) if err != nil { return fmt.Errorf("indexdb: failed to set genesis value: %s", err) @@ -680,17 +815,25 @@ func (s *Store) balanceByHeight(acct []byte, height uint64) (uint64, error) { key[0] = 'a' copy(key[1:9], acct) binary.BigEndian.PutUint64(key[9:], height) - latestAsOfHeight, err := s.db.Reader().NewSeeker().SeekLE(key[:9], key) - if err == nil { - value, closer, err := s.db.Reader().Get(latestAsOfHeight) - if err != nil { - return 0, fmt.Errorf( - "indexdb: failed to get balance for account %x at height %d: %s", - acct, height, err, - ) + err := s.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{ + Reverse: true, + }) + defer it.Close() + it.Seek(key) + if !it.ValidForPrefix(key[:9]) { + return nil } - defer closer.Close() - balance = binary.BigEndian.Uint64(value) + return it.Item().Value(func(val []byte) error { + balance = binary.BigEndian.Uint64(val) + return nil + }) + }) + if err != nil { + return 0, fmt.Errorf( + "indexdb: failed to get balance for account %x at height %d: %s", + acct, height, err, + ) } return balance, nil } @@ -700,15 +843,18 @@ func (s *Store) blockByHeight(height uint64) (*model.IndexedBlock, error) { key := make([]byte, 9) key[0] = 'b' binary.BigEndian.PutUint64(key[1:], height) - value, closer, err := s.db.Reader().Get(key) - if err != nil { - if err == storage.ErrNotFound { - return nil, ErrBlockNotIndexed + err := s.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err != nil { + return err } - return nil, fmt.Errorf("indexdb: failed to get block at height %d: %s", height, err) + return item.Value(func(val []byte) error { + return proto.Unmarshal(val, block) + }) + }) + if err == badger.ErrKeyNotFound { + return nil, ErrBlockNotIndexed } - defer closer.Close() - err = proto.Unmarshal(value, block) return block, err } @@ -734,18 +880,18 @@ type accountUpdate struct { // New opens the database at the given directory and returns the corresponding // Store. func New(dir string) *Store { - dbLog := log.With().Str("pebbledb", "index").Logger() - db, err := pebble.SafeOpen(dbLog, dir) + opts := badger.DefaultOptions(dir).WithLogger(log.Badger{Prefix: "index"}) + db, err := badger.Open(opts) if err != nil { - log.Fatal().Msgf("Failed to open the index database at %s: %s", dir, err) + log.Fatalf("Failed to open the index database at %s: %s", dir, err) } process.SetExitHandler(func() { - log.Info().Msgf("Closing the index database") + log.Infof("Closing the index database") if err := db.Close(); err != nil { - log.Error().Msgf("Got error closing the index database: %s", err) + log.Errorf("Got error closing the index database: %s", err) } }) return &Store{ - db: pebbleimpl.ToDB(db), + db: db, } } From 0ea0bc777d6cd422b41d1de499c9b68e29931012 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 10:49:36 -0700 Subject: [PATCH 06/11] update flow-go to v0.43.0-rc.1 --- go.mod | 11 +++++------ go.sum | 18 ++++++++---------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 7ca0c0d..8d09d32 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/coinbase/rosetta-sdk-go v0.8.9 github.com/coinbase/rosetta-sdk-go/types v1.0.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 - github.com/dgraph-io/badger/v2 v2.2007.4 github.com/dgraph-io/badger/v3 v3.2103.2 github.com/ethereum/go-ethereum v1.16.3 github.com/golang/protobuf v1.5.4 @@ -14,8 +13,8 @@ require ( github.com/libp2p/go-libp2p v0.38.2 github.com/onflow/cadence v1.7.0 github.com/onflow/crypto v0.25.3 - github.com/onflow/flow-go v0.43.0-dev-pebble.1.0.20250910132853-12699a150fd9 - github.com/onflow/flow/protobuf/go/flow v0.4.12 + github.com/onflow/flow-go v0.43.0-rc.1 + github.com/onflow/flow/protobuf/go/flow v0.4.15 github.com/rs/zerolog v1.29.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.37.0 @@ -49,6 +48,7 @@ require ( github.com/cockroachdb/swiss v0.0.0-20250624142022-d6e517c1d961 // indirect github.com/crate-crypto/go-eth-kzg v1.3.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect + github.com/dgraph-io/badger/v2 v2.2007.4 // indirect github.com/emicklei/dot v1.6.2 // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect @@ -203,7 +203,6 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-datastore v0.8.2 // indirect - github.com/ipfs/go-ds-badger2 v0.1.4 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect @@ -263,8 +262,8 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onflow/atree v0.10.1 // indirect - github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250825173510-91e6f28b0224 // indirect; v1.2.4-0.20230703193002-53362441b57d // indirect - github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250825173510-91e6f28b0224 // indirect; v1.2.3 // indirect + github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81 // indirect; v1.2.4-0.20230703193002-53362441b57d // indirect + github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81 // indirect; v1.2.3 // indirect github.com/onflow/flow-ft/lib/go/contracts v1.0.1 // indirect github.com/onflow/flow-go-sdk v1.8.1 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.2.4 // indirect diff --git a/go.sum b/go.sum index 54e82f5..9095c8f 100644 --- a/go.sum +++ b/go.sum @@ -540,8 +540,6 @@ github.com/ipfs/go-datastore v0.8.2 h1:Jy3wjqQR6sg/LhyY0NIePZC3Vux19nLtg7dx0TVqr github.com/ipfs/go-datastore v0.8.2/go.mod h1:W+pI1NsUsz3tcsAACMtfC+IZdnQTnC/7VfPoJBQuts0= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ds-badger2 v0.1.4 h1:4EDTEDV/Ft/zr5AaJXp2IojvApwevlUD9uahMDONWTE= -github.com/ipfs/go-ds-badger2 v0.1.4/go.mod h1:6WOt9PzJ98Tu7gizJ35NuXDORsYxQ3c4/3gjqF+kq0c= github.com/ipfs/go-ds-pebble v0.5.0 h1:lXffYCAKVD7nLLPqwJ9D8IxgO7Kz8woiX021tezdsIM= github.com/ipfs/go-ds-pebble v0.5.0/go.mod h1:aiCRVcj3K60sxc6k5C+HO9C6rouqiSkjR/WKnbTcMfQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= @@ -739,26 +737,26 @@ github.com/onflow/crypto v0.25.3 h1:XQ3HtLsw8h1+pBN+NQ1JYM9mS2mVXTyg55OldaAIF7U= github.com/onflow/crypto v0.25.3/go.mod h1:+1igaXiK6Tjm9wQOBD1EGwW7bYWMUGKtwKJ/2QL/OWs= github.com/onflow/fixed-point v0.1.1 h1:j0jYZVO8VGyk1476alGudEg7XqCkeTVxb5ElRJRKS90= github.com/onflow/fixed-point v0.1.1/go.mod h1:gJdoHqKtToKdOZbvryJvDZfcpzC7d2fyWuo3ZmLtcGY= -github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250825173510-91e6f28b0224 h1:YKdTSG+6uu+dZpKCB2fmNWOmKGLn7+0MxFbtUFZdBjc= -github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250825173510-91e6f28b0224/go.mod h1:/Yne6g7V2Fy1sm/vE78us221bYvVvL5cA8cOzN/uTCI= -github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250825173510-91e6f28b0224 h1:6ZBTrqPQIbnek7mdtJwbQux87qcJPTz82DDgGhpsHzI= -github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250825173510-91e6f28b0224/go.mod h1:yBkysayvSKZ/yFO3fEX4YQ/FEZtV6Tnov8ix0lBeiqM= +github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81 h1:DNJHSGCvnkYsMHt69qhAtFNrmIWEZAWi2Rl5Rh3+hQQ= +github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81/go.mod h1:Hqg7WnS6t/26Fn2aryEvF8A4FHmUIlDYahR0G2Pu7/U= +github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81 h1:JlmB4a6/sOi1R/phjpZaVB5cgh/4k5UdV2vetT62JXo= +github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81/go.mod h1:yBkysayvSKZ/yFO3fEX4YQ/FEZtV6Tnov8ix0lBeiqM= github.com/onflow/flow-evm-bridge v0.1.0 h1:7X2osvo4NnQgHj8aERUmbYtv9FateX8liotoLnPL9nM= github.com/onflow/flow-evm-bridge v0.1.0/go.mod h1:5UYwsnu6WcBNrwitGFxphCl5yq7fbWYGYuiCSTVF6pk= github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3SsEftzXG2JlmSe24= github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A= github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM= github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= -github.com/onflow/flow-go v0.43.0-dev-pebble.1.0.20250910132853-12699a150fd9 h1:LzgHQI7A8rINyfzKEF6x2gsrzx7zGBQpmgdHJjJtQqM= -github.com/onflow/flow-go v0.43.0-dev-pebble.1.0.20250910132853-12699a150fd9/go.mod h1:VkvpX4p4imUpPR+FhL0Qw7Qx32zQ/QOQRz2Vl2uu50Y= +github.com/onflow/flow-go v0.43.0-rc.1 h1:m2YcTEleyjFJYebVL/cLAI7naNzvHA1qBP4fvUdizts= +github.com/onflow/flow-go v0.43.0-rc.1/go.mod h1:pItY/Cv+SVS9yccx+8jFmYAWfEum9zEcdQGBzMuBDrw= github.com/onflow/flow-go-sdk v1.8.1 h1:BPp7p10RrpOdezQ3RJ+nheOqpalHlTB9bRocVkLsGNU= github.com/onflow/flow-go-sdk v1.8.1/go.mod h1:w6bxCznDhJJCDybn1jCUAz3rEO4/7XY9EgWRFrj0zoo= github.com/onflow/flow-nft/lib/go/contracts v1.2.4 h1:gWJgSSgIGo0qWOqr90+khQ69VoYF9vNlqzF+Yh6YYy4= github.com/onflow/flow-nft/lib/go/contracts v1.2.4/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY= github.com/onflow/flow-nft/lib/go/templates v1.2.1 h1:SAALMZPDw9Eb9p5kSLnmnFxjyig1MLiT4JUlLp0/bSE= github.com/onflow/flow-nft/lib/go/templates v1.2.1/go.mod h1:W6hOWU0xltPqNpv9gQX8Pj8Jtf0OmRxc1XX2V0kzJaI= -github.com/onflow/flow/protobuf/go/flow v0.4.12 h1:nMJHVuz2iRQnzEwvmruCaMrQvm/dfdWtbKroi3o/42M= -github.com/onflow/flow/protobuf/go/flow v0.4.12/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.4.15 h1:7Xt7kkqeeygWMw/S327uKu11FPJghNm1pvam2HXbC7g= +github.com/onflow/flow/protobuf/go/flow v0.4.15/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.16.2 h1:yhC3DA5PTNmUmu7ziq8GmWyQ23KNjle4jCabxpKYyNk= github.com/onflow/go-ethereum v1.16.2/go.mod h1:1vsrG/9APHPqt+mVFni60hIXkqkVdU9WQayNjYi/Ah4= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= From bdd7d6082731636ae4902695483ad67504e5226e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 14:53:09 -0700 Subject: [PATCH 07/11] Revert logging changes for consistency --- state/state.go | 105 +++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/state/state.go b/state/state.go index f098b66..7832f2d 100644 --- a/state/state.go +++ b/state/state.go @@ -29,9 +29,10 @@ import ( "github.com/onflow/rosetta/cache" "github.com/onflow/rosetta/config" "github.com/onflow/rosetta/indexdb" + "github.com/onflow/rosetta/log" "github.com/onflow/rosetta/model" "github.com/onflow/rosetta/process" - "github.com/rs/zerolog/log" + zerolog "github.com/rs/zerolog/log" "golang.org/x/crypto/openpgp" "golang.org/x/crypto/openpgp/armor" "google.golang.org/grpc/codes" @@ -98,18 +99,18 @@ func (i *Indexer) downloadRootState(ctx context.Context, spork *config.Spork, sp bootstrapDir := filepath.Join(sporkDir, "public-root-information") err := os.MkdirAll(bootstrapDir, 0o744) if err != nil { - log.Fatal().Msgf("Failed to create the spork bootstrap directory: %s", err) + log.Fatalf("Failed to create the spork bootstrap directory: %s", err) } data := download(ctx, spork.Consensus.RootProtocolStateURL) dst := filepath.Join(bootstrapDir, "root-protocol-state-snapshot.json") err = os.WriteFile(dst, data, 0o600) if err != nil { - log.Fatal().Msgf("Failed to write to %s: %s", dst, err) + log.Fatalf("Failed to write to %s: %s", dst, err) } i.root = &stateSnapshot{} err = json.Unmarshal(data, i.root) if err != nil { - log.Fatal().Msgf("Failed to decode root protocol state snapshot: %s", err) + log.Fatalf("Failed to decode root protocol state snapshot: %s", err) } if spork.Consensus.DisableSignatureCheck { return @@ -117,24 +118,24 @@ func (i *Indexer) downloadRootState(ctx context.Context, spork *config.Spork, sp signer := bytes.NewReader([]byte(spork.Consensus.SigningKey)) keyring, err := openpgp.ReadArmoredKeyRing(signer) if err != nil { - log.Fatal().Msgf("Failed to read PGP keyring from the configured signing_key: %s", err) + log.Fatalf("Failed to read PGP keyring from the configured signing_key: %s", err) } sig := download(ctx, spork.Consensus.RootProtocolStateSignatureURL) block, err := armor.Decode(bytes.NewReader(sig)) if err != nil { - log.Fatal().Msgf( + log.Fatalf( "Failed to decode PGP signature block from %s: %s", spork.Consensus.RootProtocolStateSignatureURL, err, ) } if block.Type != openpgp.SignatureType { - log.Fatal().Msgf("Failed to get PGP signature block: got %q instead", block.Type) + log.Fatalf("Failed to get PGP signature block: got %q instead", block.Type) } _, err = openpgp.CheckDetachedSignature( keyring, bytes.NewReader(data), block.Body, ) if err != nil { - log.Fatal().Msgf("Failed to get valid PGP signature for the root protocol state: %s", err) + log.Fatalf("Failed to get valid PGP signature for the root protocol state: %s", err) } } @@ -154,7 +155,7 @@ func (i *Indexer) findRootBlock(ctx context.Context, spork *config.Spork) *model client := spork.AccessNodes.Client() block, err := client.BlockByHeight(ctx, spork.RootBlock) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to fetch the %s root block: %s", spork, err, ) @@ -162,7 +163,7 @@ func (i *Indexer) findRootBlock(ctx context.Context, spork *config.Spork) *model continue } if block.Height != spork.RootBlock { - log.Error().Msgf( + log.Errorf( "Unexpected block height (%d) when fetching the root block of %s at height %d", block.Height, spork, spork.RootBlock, ) @@ -198,7 +199,7 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui if skipCache { ctx = cache.Skip(rctx) if !skipCacheWarned { - log.Warn().Msgf( + log.Warnf( "Skipping cache for getting verified parent for block %x at height %d", hash, height, ) @@ -210,14 +211,14 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui client := spork.AccessNodes.Client() hdr, err := client.BlockHeaderByHeight(ctx, height) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to fetch header for block %x at height %d: %s", hash, height, err, ) continue } if !bytes.Equal(hdr.Id, hash) { - log.Error().Msgf( + log.Errorf( "Got unexpected header value for block at height %d: expected %x, got %x", height, hash, hdr.Id, ) @@ -226,14 +227,14 @@ func (i *Indexer) getVerifiedParent(rctx context.Context, hash []byte, height ui } block, err := client.BlockByHeight(ctx, height) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to fetch block %x at height %d: %s", hash, height, err, ) continue } if !bytes.Equal(block.Id, hash) { - log.Error().Msgf( + log.Errorf( "Got unexpected block value at height %d: expected %x, got %x", height, hash, block.Id, ) @@ -264,17 +265,17 @@ func (i *Indexer) handleResyncFrom() { } genesis := i.Store.Genesis().Height if height < genesis { - log.Fatal().Msgf( + log.Fatalf( "The resync_from value (%d) cannot be less than the height of the genesis block (%d)", height, genesis, ) } err := i.Store.ResetTo(height) if err != nil { - log.Fatal().Msgf("Failed to reset data for resync_from: %s", err) + log.Fatalf("Failed to reset data for resync_from: %s", err) } i.lastIndexed = i.Store.Latest() - log.Info().Msgf( + log.Infof( "Successfully reset indexed data to block %x at height %d", i.lastIndexed.Hash, i.lastIndexed.Height, ) @@ -339,7 +340,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last if skipCache { ctx = cache.Skip(rctx) if !skipCacheWarned { - log.Warn().Msgf( + log.Warnf( "Skipping cache for retrieving block at height %d", height, ) @@ -359,13 +360,13 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last if synced { time.Sleep(time.Second) } else { - log.Error().Msgf( + log.Errorf( "Failed to fetch header for block at height %d: %s", height, err, ) } default: - log.Error().Msgf( + log.Errorf( "Failed to fetch header for block at height %d: %s", height, err, ) @@ -374,11 +375,11 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last } block, err := client.BlockByHeight(ctx, height) if err != nil { - log.Error().Msgf("Failed to fetch block at height %d: %s", height, err) + log.Errorf("Failed to fetch block at height %d: %s", height, err) continue } if !bytes.Equal(block.ParentId, parent) { - log.Error().Msgf( + log.Errorf( "Unexpected parent hash found for block %x at height %d: expected %x, got %x", block.Id, height, parent, block.ParentId, ) @@ -393,7 +394,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last blockID := flow.Identifier{} err := operation.LookupBlockHeight(i.consensus.Reader(), height, &blockID) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to get block ID for height %d from consensus: %s", height, err, ) @@ -403,7 +404,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last continue } if !bytes.Equal(blockID[:], block.Id) { - log.Error().Msgf( + log.Errorf( "Mismatching block ID found for height %d: %x from Access API, %x from consensus", height, block.Id, blockID[:], ) @@ -411,14 +412,14 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last continue } } - log.Info().Msgf("Retrieved block %x at height %d", block.Id, block.Height) + log.Infof("Retrieved block %x at height %d", block.Id, block.Height) blocks[string(block.Id)] = height // NOTE(tav): We assume that block seals will only ever be seen in // block height order. for _, seal := range block.BlockSeals { blockHeight, ok := blocks[string(seal.BlockId)] if !ok { - log.Warn().Msgf( + log.Warnf( "Skipping seal for block %x in block %x at height %d", seal.BlockId, block.Id, height, ) @@ -430,7 +431,7 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last sealID := flow.Identifier{} err := operation.LookupBySealedBlockID(i.consensus.Reader(), blockID, &sealID) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to get seal ID for block %x from consensus: %s", seal.BlockId, err, ) @@ -439,21 +440,21 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last blockSeal := &flow.Seal{} err = operation.RetrieveSeal(i.consensus.Reader(), sealID, blockSeal) if err != nil { - log.Error().Msgf( + log.Errorf( "Failed to get seal %x for block %x from consensus: %s", sealID[:], seal.BlockId, err, ) continue inner } if !bytes.Equal(seal.BlockId, blockSeal.BlockID[:]) { - log.Error().Msgf( + log.Errorf( "Unverifiable seal found in block %x at height %d: got seal for block %x via Acccess API, found seal for block %x via consensus", block.Id, height, seal.BlockId, blockSeal.BlockID[:], ) continue inner } if !bytes.Equal(seal.ResultId, blockSeal.ResultID[:]) { - log.Error().Msgf( + log.Errorf( "Unverifiable execution result for block %x found in block %x at height %d: got %x via Acccess API, got %x via consensus", seal.BlockId, block.Id, height, seal.ResultId, blockSeal.ResultID[:], ) @@ -468,14 +469,14 @@ func (i *Indexer) indexLiveSpork(rctx context.Context, spork *config.Spork, last parent = block.Id if time.Since(block.Timestamp.AsTime()) < syncWindow { if !synced { - log.Info().Msgf("Indexer is close to tip") + log.Infof("Indexer is close to tip") synced = true i.mu.Lock() i.synced = true i.mu.Unlock() } } else if synced { - log.Error().Msgf("Indexer is not close to tip") + log.Errorf("Indexer is not close to tip") synced = false i.mu.Lock() i.synced = false @@ -501,7 +502,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM return } hashes = append(hashes, hash) - log.Info().Msgf("Retrieved block %x at height %d", hash, height) + log.Infof("Retrieved block %x at height %d", hash, height) } if !bytes.Equal(lastIndexed.Hash, parent) { // NOTE(tav): If we've arrived at this state, it's effectively a fatal @@ -512,7 +513,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM // This will most likely only happen if we got corrupted block data when // we looked up the "genesis" block, i.e. the "root" block of the first // spork amongst the configured sporks. - log.Error().Msgf( + log.Errorf( "Unable to establish a hash chain from live spork root block to last indexed block %x at height %d: found %x as parent instead", lastIndexed.Hash, lastIndexed.Height, parent, ) @@ -534,7 +535,7 @@ func (i *Indexer) indexPastSporks(ctx context.Context, lastIndexed *model.BlockM func (i *Indexer) initState() { accts, err := i.Store.Accounts() if err != nil { - log.Fatal().Msgf("Failed to load accounts from the index database: %s", err) + log.Fatalf("Failed to load accounts from the index database: %s", err) } i.accts = map[string]bool{} for acct, isProxy := range accts { @@ -542,7 +543,7 @@ func (i *Indexer) initState() { } i.feeAddr, err = hex.DecodeString(i.Chain.Contracts.FlowFees) if err != nil { - log.Fatal().Msgf( + log.Fatalf( "Invalid FlowFees contract address %q: %s", i.Chain.Contracts.FlowFees, err, ) @@ -571,9 +572,9 @@ func (i *Indexer) initStore(ctx context.Context) bool { return false } if err := i.Store.SetGenesis(genesis); err != nil { - log.Fatal().Msgf("Couldn't set the genesis block on the index database: %s", err) + log.Fatalf("Couldn't set the genesis block on the index database: %s", err) } - log.Info().Msgf("Genesis block set to block %x at height %d", genesis.Hash, genesis.Height) + log.Infof("Genesis block set to block %x at height %d", genesis.Hash, genesis.Height) i.lastIndexed = genesis i.liveRoot = i.findRootBlock(ctx, i.Chain.LatestSpork()) return i.liveRoot != nil @@ -625,7 +626,7 @@ func (i *Indexer) nextBackoff(d time.Duration) time.Duration { } func (i *Indexer) onBlockFinalized(f *hotstuff.Block) { - log.Info().Msgf( + log.Infof( "Got finalized block via consensus follower: %x (block timestamp: %s)", f.BlockID[:], time.UnixMilli(int64(f.Timestamp)).Format(time.RFC3339), ) @@ -637,10 +638,10 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { i.downloadRootState(ctx, spork, sporkDir) dbDir := filepath.Join(sporkDir, "consensus") - dbLog := log.With().Str("pebbledb", "consensusFollower").Logger() + dbLog := zerolog.With().Str("pebbledb", "consensusFollower").Logger() db, err := pebble.SafeOpen(dbLog, dbDir) if err != nil { - log.Fatal().Msgf("Failed to open consensus database at %s: %s", dbDir, err) + log.Fatalf("Failed to open consensus database at %s: %s", dbDir, err) } protocolDB := pebbleimpl.ToDB(db) // Initialize a private key for joining the unstaked peer-to-peer network. @@ -648,21 +649,21 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { seed := make([]byte, flowcrypto.KeyGenSeedMinLen) n, err := rand.Read(seed) if err != nil || n != flowcrypto.KeyGenSeedMinLen { - log.Fatal().Msgf("Could not generate seed for the consensus follower private key") + log.Fatalf("Could not generate seed for the consensus follower private key") } key, err := utils.GeneratePublicNetworkingKey(seed) if err != nil { - log.Fatal().Msgf("Could not generate the consensus follower private key") + log.Fatalf("Could not generate the consensus follower private key") } nodes := []follower.BootstrapNodeInfo{} for _, node := range spork.Consensus.SeedNodes { rawkey, err := hex.DecodeString(node.PublicKey) if err != nil { - log.Fatal().Msgf("Failed to hex decode the seed node key %q: %s", key, err) + log.Fatalf("Failed to hex decode the seed node key %q: %s", key, err) } pubkey, err := flowcrypto.DecodePublicKey(flowcrypto.ECDSAP256, rawkey) if err != nil { - log.Fatal().Msgf("Failed to decode the seed node key %q: %s", key, err) + log.Fatalf("Failed to decode the seed node key %q: %s", key, err) } nodes = append(nodes, follower.BootstrapNodeInfo{ Host: node.Host, @@ -689,7 +690,7 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { }), ) if err != nil { - log.Fatal().Msgf("Failed to create the consensus follower: %s", err) + log.Fatalf("Failed to create the consensus follower: %s", err) } i.consensus = protocolDB ctx, cancel := context.WithCancel(ctx) @@ -709,7 +710,7 @@ func (i *Indexer) scheduleJobs(ctx context.Context, startHeight uint64) { client := pool.Client() block, err := client.LatestFinalizedBlockHeader(ctx) if err != nil { - log.Error().Msgf("Failed to fetch latest sealed block: %s", err) + log.Errorf("Failed to fetch latest sealed block: %s", err) time.Sleep(time.Second) continue } @@ -723,7 +724,7 @@ func (i *Indexer) scheduleJobs(ctx context.Context, startHeight uint64) { for height := startHeight + 1; height <= block.Height; height++ { i.jobs <- height if height%200 == 0 { - log.Warn().Msgf("Added job to prefetch block %d", height) + log.Warnf("Added job to prefetch block %d", height) } } startHeight = block.Height @@ -734,18 +735,18 @@ func download(ctx context.Context, src string) []byte { for { req, err := http.NewRequestWithContext(ctx, "GET", src, nil) if err != nil { - log.Fatal().Msgf("Failed to create HTTP request to %s: %s", src, err) + log.Fatalf("Failed to create HTTP request to %s: %s", src, err) } resp, err := httpClient.Do(req) if err != nil { - log.Error().Msgf("Failed to download %s: %s", src, err) + log.Errorf("Failed to download %s: %s", src, err) time.Sleep(time.Second) continue } data, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { - log.Error().Msgf("Failed to read data from %s: %s", src, err) + log.Errorf("Failed to read data from %s: %s", src, err) time.Sleep(time.Second) continue } From a23735d6bf6a51fc4ff2b9325af555eeb0fd17f6 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 15:38:21 -0700 Subject: [PATCH 08/11] clean up whitespace in Makefile --- Makefile | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index d23cf6b..169fc42 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ gen-originator-account: } \ }' "${FLOW_JSON}" > flow.json.tmp && mv flow.json.tmp "${FLOW_JSON}" || { echo "Failed to update ${FLOW_JSON} with jq"; exit 1; }; \ jq --arg address "$$address" '.originators += [$$address]' "${ROSETTA_ENV}.json" > env.json.tmp && mv env.json.tmp "${ROSETTA_ENV}.json"; \ - echo "$(ACCOUNT_NAME),$$KEYS,0x$$address" >> $(ACCOUNT_KEYS_FILENAME); \ + echo "$(ACCOUNT_NAME),$$KEYS,0x$$address" >> $(ACCOUNT_KEYS_FILENAME); \ echo "Updated $(FLOW_JSON), $(ROSETTA_ENV).json and $(ACCOUNT_KEYS_FILENAME)"; .PHONY: fund-accounts @@ -67,22 +67,22 @@ create-originator-derived-account: ROOT_ORIGINATOR_PUBLIC_KEY=$$(grep '$(ORIGINATOR_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f2); \ ROOT_ORIGINATOR_PRIVATE_KEY=$$(grep '$(ORIGINATOR_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f4 ); \ ROOT_ORIGINATOR_ADDRESS=$$(grep '$(ORIGINATOR_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f5); \ - echo "Originator address: $$ROOT_ORIGINATOR_ADDRESS"; \ - TX_HASH=$$(python3 rosetta_handler.py rosetta-create-derived-account $(ROSETTA_HOST_URL) $$ROOT_ORIGINATOR_ADDRESS $$ROOT_ORIGINATOR_PUBLIC_KEY $$ROOT_ORIGINATOR_PRIVATE_KEY $$NEW_ACCOUNT_PUBLIC_ROSETTA_KEY); \ + echo "Originator address: $$ROOT_ORIGINATOR_ADDRESS"; \ + TX_HASH=$$(python3 rosetta_handler.py rosetta-create-derived-account $(ROSETTA_HOST_URL) $$ROOT_ORIGINATOR_ADDRESS $$ROOT_ORIGINATOR_PUBLIC_KEY $$ROOT_ORIGINATOR_PRIVATE_KEY $$NEW_ACCOUNT_PUBLIC_ROSETTA_KEY); \ ADDRESS=$$(flow transactions get $$TX_HASH -f $(FLOW_JSON) -n $(ROSETTA_ENV) -o json | jq -r '.events[] | select(.type == "flow.AccountCreated") | .values.value.fields[] | select(.name == "address") | .value.value'); \ echo "TX_HASH: $$TX_HASH , ADDRESS: $$ADDRESS"; \ - echo "$(NEW_ACCOUNT_NAME),$$NEW_ACCOUNT_PUBLIC_FLOW_KEY,$$NEW_ACCOUNT_PUBLIC_ROSETTA_KEY,$$NEW_ACCOUNT_PRIVATE_KEY,$$ADDRESS" >> $(ACCOUNT_KEYS_FILENAME); + echo "$(NEW_ACCOUNT_NAME),$$NEW_ACCOUNT_PUBLIC_FLOW_KEY,$$NEW_ACCOUNT_PUBLIC_ROSETTA_KEY,$$NEW_ACCOUNT_PRIVATE_KEY,$$ADDRESS" >> $(ACCOUNT_KEYS_FILENAME); .PHONY: rosetta-transfer-funds rosetta-transfer-funds: PAYER_PUBLIC_KEY=$$(grep '$(PAYER_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f2); \ PAYER_PRIVATE_KEY=$$(grep '$(PAYER_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f4 ); \ PAYER_ADDRESS=$$(grep '$(PAYER_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f5); \ - echo "Payer address: $$PAYER_ADDRESS"; \ + echo "Payer address: $$PAYER_ADDRESS"; \ RECIPIENT_ADDRESS=$$(grep '$(RECIPIENT_NAME)' $(ACCOUNT_KEYS_FILENAME) | cut -d ',' -f5); \ - echo "Recipient address: $$RECIPIENT_ADDRESS"; \ - TX_HASH=$$(python3 rosetta_handler.py rosetta-transfer-funds $(ROSETTA_HOST_URL) $$PAYER_ADDRESS $$PAYER_PUBLIC_KEY $$PAYER_PRIVATE_KEY $$RECIPIENT_ADDRESS $$AMOUNT); \ - echo "Funding sent: $$TX_HASH"; + echo "Recipient address: $$RECIPIENT_ADDRESS"; \ + TX_HASH=$$(python3 rosetta_handler.py rosetta-transfer-funds $(ROSETTA_HOST_URL) $$PAYER_ADDRESS $$PAYER_PUBLIC_KEY $$PAYER_PRIVATE_KEY $$RECIPIENT_ADDRESS $$AMOUNT); \ + echo "Funding sent: $$TX_HASH"; # Use this target to verify that the accounts configured in the Rosetta environment JSON have the specified contracts deployed .PHONY: verify-configured-contract-addresses @@ -92,22 +92,22 @@ verify-configured-contract-addresses: KEY=$$(echo $$contract | cut -d= -f1); \ VALUE=$$(echo $$contract | cut -d= -f2); \ if [ "$$VALUE" = "0000000000000000" ]; then \ - continue; \ - fi; \ + continue; \ + fi; \ CONTRACTS_FOUND=$$(flow accounts get $$VALUE -f $(FLOW_JSON) -n $(ROSETTA_ENV) -o json | \ jq -r '.contracts | join(",") '); \ found=false ; \ for contract in $$(echo $$CONTRACTS_FOUND | tr ',' ' '); do \ - lowercase_contract_name=$$(echo $$contract | tr '[:upper:]' '[:lower:]'); \ - if [ "$$KEY" = "$$lowercase_contract_name" ]; then \ - found=true ; \ - break; \ - fi; \ - done; \ - if [ "$$found" = "false" ]; then \ - echo "Contract $$KEY configured in $(ROSETTA_ENV).json is not deployed to configured address $$VALUE" ;\ + lowercase_contract_name=$$(echo $$contract | tr '[:upper:]' '[:lower:]'); \ + if [ "$$KEY" = "$$lowercase_contract_name" ]; then \ + found=true ; \ + break; \ + fi; \ + done; \ + if [ "$$found" = "false" ]; then \ + echo "Contract $$KEY configured in $(ROSETTA_ENV).json is not deployed to configured address $$VALUE" ;\ fi; \ - done ; \ + done ; .PHONY: build build: go-test go-build @@ -139,4 +139,4 @@ test-reset: test-cleanup: test-reset rm -f flow.json rm -f account-keys.csv - rm -rf flow-go \ No newline at end of file + rm -rf flow-go From 98aa5c836ca7697abaebfce18eacb9e174a5b9b5 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 15:40:50 -0700 Subject: [PATCH 09/11] Update rosetta_handler.py - Single definition of the network identifier - Slightly improve error displayed if preprocess_transaction fails --- rosetta_handler.py | 34 +++++++++++++--------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/rosetta_handler.py b/rosetta_handler.py index 50e7377..6250b10 100644 --- a/rosetta_handler.py +++ b/rosetta_handler.py @@ -11,6 +11,10 @@ def cli(): pass +network_identifier = { + "blockchain": "flow", + "network": "localnet", +} # Shared function used internally but not standalone from CLI @@ -24,10 +28,7 @@ def preprocess_transaction(rosetta_host_url, root_originator, operations, metada endpoint = "/construction/preprocess" target_url = rosetta_host_url + endpoint data = { - "network_identifier": { - "blockchain": "flow", - "network": "localnet" - }, + "network_identifier": network_identifier, "operations": operations, "metadata": { "payer": root_originator @@ -36,17 +37,17 @@ def preprocess_transaction(rosetta_host_url, root_originator, operations, metada if metadata: for key in metadata: data["metadata"][key] = metadata[key] - return request_router(target_url, data) + result = request_router(target_url, data) + if "options" not in result: + raise RuntimeError(result) + return result def metadata_transaction(rosetta_host_url, options): endpoint = "/construction/metadata" target_url = rosetta_host_url + endpoint data = { - "network_identifier": { - "blockchain": "flow", - "network": "localnet" - }, + "network_identifier": network_identifier, "options": options } return request_router(target_url, data) @@ -56,10 +57,7 @@ def payloads_transaction(rosetta_host_url, operations, protobuf): endpoint = "/construction/payloads" target_url = rosetta_host_url + endpoint data = { - "network_identifier": { - "blockchain": "flow", - "network": "localnet" - }, + "network_identifier": network_identifier, "operations": operations, "metadata": { "protobuf": protobuf @@ -72,10 +70,7 @@ def combine_transaction(rosetta_host_url, unsigned_tx, root_originator, hex_byte endpoint = "/construction/combine" target_url = rosetta_host_url + endpoint data = { - "network_identifier": { - "blockchain": "flow", - "network": "localnet" - }, + "network_identifier": network_identifier, "unsigned_transaction": unsigned_tx, "signatures": [ { @@ -103,10 +98,7 @@ def submit_transaction(rosetta_host_url, signed_tx): endpoint = "/construction/submit" target_url = rosetta_host_url + endpoint data = { - "network_identifier": { - "blockchain": "flow", - "network": "localnet" - }, + "network_identifier": network_identifier, "signed_transaction": signed_tx } return request_router(target_url, data) From c4c43a5b80ad9fcf22d5296facf9694388d05181 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 16:01:44 -0700 Subject: [PATCH 10/11] apply suggestions from AI --- state/state.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/state/state.go b/state/state.go index 7832f2d..6019aa5 100644 --- a/state/state.go +++ b/state/state.go @@ -643,6 +643,12 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { if err != nil { log.Fatalf("Failed to open consensus database at %s: %s", dbDir, err) } + process.SetExitHandler(func() { + log.Infof("Closing the consensus follower database") + if err := db.Close(); err != nil { + log.Errorf("Got error closing the consensus follower database: %s", err) + } + }) protocolDB := pebbleimpl.ToDB(db) // Initialize a private key for joining the unstaked peer-to-peer network. // This can be ephemeral, so we generate a new one each time we start. @@ -659,11 +665,11 @@ func (i *Indexer) runConsensusFollower(ctx context.Context) { for _, node := range spork.Consensus.SeedNodes { rawkey, err := hex.DecodeString(node.PublicKey) if err != nil { - log.Fatalf("Failed to hex decode the seed node key %q: %s", key, err) + log.Fatalf("Failed to hex decode the seed node key %q: %s", node.PublicKey, err) } pubkey, err := flowcrypto.DecodePublicKey(flowcrypto.ECDSAP256, rawkey) if err != nil { - log.Fatalf("Failed to decode the seed node key %q: %s", key, err) + log.Fatalf("Failed to decode the seed node key %q: %s", rawkey, err) } nodes = append(nodes, follower.BootstrapNodeInfo{ Host: node.Host, From 8207d578a0558460d74ebbaff6a24725f357bf92 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 15 Sep 2025 16:49:07 -0700 Subject: [PATCH 11/11] update flow-go to v0.43.0-rc.2 --- go.mod | 10 +++++----- go.sum | 20 ++++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 8d09d32..d773cee 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/libp2p/go-libp2p v0.38.2 github.com/onflow/cadence v1.7.0 github.com/onflow/crypto v0.25.3 - github.com/onflow/flow-go v0.43.0-rc.1 + github.com/onflow/flow-go v0.43.0-rc.2 github.com/onflow/flow/protobuf/go/flow v0.4.15 github.com/rs/zerolog v1.29.0 github.com/stretchr/testify v1.11.1 @@ -69,7 +69,7 @@ require ( github.com/onflow/fixed-point v0.1.1 // indirect github.com/onflow/flow-evm-bridge v0.1.0 // indirect github.com/onflow/flow-ft/lib/go/templates v1.0.1 // indirect - github.com/onflow/flow-nft/lib/go/templates v1.2.1 // indirect + github.com/onflow/flow-nft/lib/go/templates v1.3.0 // indirect github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 // indirect github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect @@ -262,11 +262,11 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onflow/atree v0.10.1 // indirect - github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81 // indirect; v1.2.4-0.20230703193002-53362441b57d // indirect - github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81 // indirect; v1.2.3 // indirect + github.com/onflow/flow-core-contracts/lib/go/contracts v1.8.0 // indirect; v1.2.4-0.20230703193002-53362441b57d // indirect + github.com/onflow/flow-core-contracts/lib/go/templates v1.8.0 // indirect; v1.2.3 // indirect github.com/onflow/flow-ft/lib/go/contracts v1.0.1 // indirect github.com/onflow/flow-go-sdk v1.8.1 // indirect - github.com/onflow/flow-nft/lib/go/contracts v1.2.4 // indirect + github.com/onflow/flow-nft/lib/go/contracts v1.3.0 // indirect github.com/onflow/go-ethereum v1.16.2 // indirect github.com/onflow/sdks v0.6.0-preview.1 // indirect github.com/onflow/wal v1.0.2 // indirect diff --git a/go.sum b/go.sum index 9095c8f..1ac5021 100644 --- a/go.sum +++ b/go.sum @@ -737,24 +737,24 @@ github.com/onflow/crypto v0.25.3 h1:XQ3HtLsw8h1+pBN+NQ1JYM9mS2mVXTyg55OldaAIF7U= github.com/onflow/crypto v0.25.3/go.mod h1:+1igaXiK6Tjm9wQOBD1EGwW7bYWMUGKtwKJ/2QL/OWs= github.com/onflow/fixed-point v0.1.1 h1:j0jYZVO8VGyk1476alGudEg7XqCkeTVxb5ElRJRKS90= github.com/onflow/fixed-point v0.1.1/go.mod h1:gJdoHqKtToKdOZbvryJvDZfcpzC7d2fyWuo3ZmLtcGY= -github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81 h1:DNJHSGCvnkYsMHt69qhAtFNrmIWEZAWi2Rl5Rh3+hQQ= -github.com/onflow/flow-core-contracts/lib/go/contracts v1.7.4-0.20250911124549-394bdee30b81/go.mod h1:Hqg7WnS6t/26Fn2aryEvF8A4FHmUIlDYahR0G2Pu7/U= -github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81 h1:JlmB4a6/sOi1R/phjpZaVB5cgh/4k5UdV2vetT62JXo= -github.com/onflow/flow-core-contracts/lib/go/templates v1.7.2-0.20250911124549-394bdee30b81/go.mod h1:yBkysayvSKZ/yFO3fEX4YQ/FEZtV6Tnov8ix0lBeiqM= +github.com/onflow/flow-core-contracts/lib/go/contracts v1.8.0 h1:Kq6MRKwmHfoTTDty/Ib7is1peaSMBQ2GHcnhDgsRH4c= +github.com/onflow/flow-core-contracts/lib/go/contracts v1.8.0/go.mod h1:h15J7unHnlTARjudbb0m5KO3EoHIucoBgShAqE6NoNA= +github.com/onflow/flow-core-contracts/lib/go/templates v1.8.0 h1:vcCIJP8vYuo0t77Hbxh2IEpGSZoEudB58Qpci11LSCE= +github.com/onflow/flow-core-contracts/lib/go/templates v1.8.0/go.mod h1:twSVyUt3rNrgzAmxtBX+1Gw64QlPemy17cyvnXYy1Ug= github.com/onflow/flow-evm-bridge v0.1.0 h1:7X2osvo4NnQgHj8aERUmbYtv9FateX8liotoLnPL9nM= github.com/onflow/flow-evm-bridge v0.1.0/go.mod h1:5UYwsnu6WcBNrwitGFxphCl5yq7fbWYGYuiCSTVF6pk= github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3SsEftzXG2JlmSe24= github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A= github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM= github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= -github.com/onflow/flow-go v0.43.0-rc.1 h1:m2YcTEleyjFJYebVL/cLAI7naNzvHA1qBP4fvUdizts= -github.com/onflow/flow-go v0.43.0-rc.1/go.mod h1:pItY/Cv+SVS9yccx+8jFmYAWfEum9zEcdQGBzMuBDrw= +github.com/onflow/flow-go v0.43.0-rc.2 h1:U0yhR9asL/4tTIbGq7vrBEwUr6JFupNRySSRU4EM5jc= +github.com/onflow/flow-go v0.43.0-rc.2/go.mod h1:zz8rd/tod51kd33im9x6uDmGiyeb5qCZhKr7g01/wgI= github.com/onflow/flow-go-sdk v1.8.1 h1:BPp7p10RrpOdezQ3RJ+nheOqpalHlTB9bRocVkLsGNU= github.com/onflow/flow-go-sdk v1.8.1/go.mod h1:w6bxCznDhJJCDybn1jCUAz3rEO4/7XY9EgWRFrj0zoo= -github.com/onflow/flow-nft/lib/go/contracts v1.2.4 h1:gWJgSSgIGo0qWOqr90+khQ69VoYF9vNlqzF+Yh6YYy4= -github.com/onflow/flow-nft/lib/go/contracts v1.2.4/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY= -github.com/onflow/flow-nft/lib/go/templates v1.2.1 h1:SAALMZPDw9Eb9p5kSLnmnFxjyig1MLiT4JUlLp0/bSE= -github.com/onflow/flow-nft/lib/go/templates v1.2.1/go.mod h1:W6hOWU0xltPqNpv9gQX8Pj8Jtf0OmRxc1XX2V0kzJaI= +github.com/onflow/flow-nft/lib/go/contracts v1.3.0 h1:DmNop+O0EMyicZvhgdWboFG57xz5t9Qp81FKlfKyqJc= +github.com/onflow/flow-nft/lib/go/contracts v1.3.0/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY= +github.com/onflow/flow-nft/lib/go/templates v1.3.0 h1:uGIBy4GEY6Z9hKP7sm5nA5kwvbvLWW4nWx5NN9Wg0II= +github.com/onflow/flow-nft/lib/go/templates v1.3.0/go.mod h1:gVbb5fElaOwKhV5UEUjM+JQTjlsguHg2jwRupfM/nng= github.com/onflow/flow/protobuf/go/flow v0.4.15 h1:7Xt7kkqeeygWMw/S327uKu11FPJghNm1pvam2HXbC7g= github.com/onflow/flow/protobuf/go/flow v0.4.15/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-ethereum v1.16.2 h1:yhC3DA5PTNmUmu7ziq8GmWyQ23KNjle4jCabxpKYyNk=