diff --git a/cmd/neofs-ir/defaults_test.go b/cmd/neofs-ir/defaults_test.go index f9ceb982a8..12df39e28f 100644 --- a/cmd/neofs-ir/defaults_test.go +++ b/cmd/neofs-ir/defaults_test.go @@ -85,5 +85,10 @@ func TestValidateDefaultConfig(t *testing.T) { Settlement: config.Settlement{ BasicIncomeRate: 0, }, - Experimental: config.Experimental{ChainMetaData: false, AllowEC: false}}) + Experimental: config.Experimental{ChainMetaData: config.Metadata{ + Enabled: false, + SeedPort: 0, + RPCPort: 0, + P2PPort: 0, + }, AllowEC: false}}) } diff --git a/cmd/neofs-ir/validate_test.go b/cmd/neofs-ir/validate_test.go index 62a41ceb01..5a37df5662 100644 --- a/cmd/neofs-ir/validate_test.go +++ b/cmd/neofs-ir/validate_test.go @@ -321,8 +321,13 @@ func TestCheckForUnknownFieldsExample(t *testing.T) { BasicIncomeRate: 100, }, Experimental: config.Experimental{ - ChainMetaData: false, - AllowEC: true, + ChainMetaData: config.Metadata{ + Enabled: false, + SeedPort: 20334, + RPCPort: 30334, + P2PPort: 20334, + }, + AllowEC: true, }, Validator: config.Validator{ Enabled: true, diff --git a/config/example/ir.yaml b/config/example/ir.yaml index 110c53db31..29197fdbe2 100644 --- a/config/example/ir.yaml +++ b/config/example/ir.yaml @@ -179,7 +179,11 @@ settlement: basic_income_rate: 100 # Optional: override basic income rate value from network config; applied only in debug mode experimental: - chain_meta_data: false # Optional: allows creating containers with meta data handled via FS chain + chain_meta_data: # Optional: allows creating containers with meta data handled via FS chain, only supported with fschain.consensus configured + enabled: false + seed_port: 20334 # List of existing nodes to communicate with over Neo P2P protocol (addresses are taken from fschain.consensus.seed_nodes, but ports are changed to this value) + p2p_port: 20334 # Port for network addresses to listen Neo P2P on (addresses are taken from fschain.consensus.p2p, but ports are changed to this value) + rpc_port: 30334 # Port for network addresses to listen Neo RPC on (addresses are taken from fschain.consensus.rpc, but ports are changed to this value) allow_ec: true # Optional: allows creating containers with EC rules sn_validator: diff --git a/pkg/innerring/config/config.go b/pkg/innerring/config/config.go index 0c7ff38e85..f0c32f52df 100644 --- a/pkg/innerring/config/config.go +++ b/pkg/innerring/config/config.go @@ -106,8 +106,16 @@ type Settlement struct { // Experimental configures experimental features. type Experimental struct { - ChainMetaData bool `mapstructure:"chain_meta_data"` - AllowEC bool `mapstructure:"allow_ec"` + ChainMetaData Metadata `mapstructure:"chain_meta_data"` + AllowEC bool `mapstructure:"allow_ec"` +} + +// Metadata configures metadata chain. +type Metadata struct { + Enabled bool `mapstructure:"enabled"` + SeedPort uint16 `mapstructure:"seed_port"` + RPCPort uint16 `mapstructure:"rpc_port"` + P2PPort uint16 `mapstructure:"p2p_port"` } // Mainnet configures mainnet chain settings. diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index a9229f39c0..4aca8f100f 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -2,21 +2,37 @@ package innerring import ( "context" + "errors" "fmt" + "math" + "math/big" "net" + "path" + "slices" + "strconv" "sync/atomic" + "time" "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/storage/dbconfig" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" + "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/notary" + sc "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-contract/deploy" "github.com/nspcc-dev/neofs-node/internal/chaintime" "github.com/nspcc-dev/neofs-node/misc" + metachaingas "github.com/nspcc-dev/neofs-node/pkg/core/metachain/gas" "github.com/nspcc-dev/neofs-node/pkg/innerring/config" "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/blockchain" + "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/metachain" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/balance" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container" @@ -56,7 +72,8 @@ type ( Server struct { log *zap.Logger - bc *blockchain.Blockchain + bc *blockchain.Blockchain + metaChain *blockchain.Blockchain // event producers fsChainListener event.Listener @@ -94,7 +111,7 @@ type ( netmapProcessor *netmap.Processor containerProcessor *container.Processor - workers []func(context.Context) + workers []func(context.Context) error // Set of local resources that must be // initialized at the very beginning of @@ -242,7 +259,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { }() } - s.startWorkers(ctx) + s.startWorkers(ctx, intError) return nil } @@ -273,9 +290,14 @@ func (s *Server) resetEpochTimer(lastTickHeight uint32) (uint64, error) { return lastTickH.Timestamp + epochDuration*msInS, nil } -func (s *Server) startWorkers(ctx context.Context) { +func (s *Server) startWorkers(ctx context.Context, intError chan<- error) { for _, w := range s.workers { - go w(ctx) + go func() { + err := w(ctx) + if err != nil { + intError <- err + } + }() } } @@ -756,10 +778,143 @@ func New(ctx context.Context, log *zap.Logger, cfg *config.Config, errChan chan< nodeValidators = append(nodeValidators, external.New(cfg.Validator.URL, server.key)) } + var metaActor *notary.Actor + if cfg.Experimental.ChainMetaData.Enabled { + if !isLocalConsensus { + return nil, errors.New("experimental meta-on-chain is not supported for non-consensus IR nodes") + } + + v, err := server.fsChainClient.GetVersion() + if err != nil { + return nil, fmt.Errorf("fetching FS chain version: %w", err) + } + + metaSeeds, err := changePort(cfg.FSChain.Consensus.SeedNodes, cfg.Experimental.ChainMetaData.SeedPort) + if err != nil { + return nil, fmt.Errorf("parsing consensus seed nodes: %w", err) + } + metaRPCs, err := changePort(cfg.FSChain.Consensus.RPC.Listen, cfg.Experimental.ChainMetaData.RPCPort) + if err != nil { + return nil, fmt.Errorf("parsing consensus RPCs: %w", err) + } + metaP2Ps, err := changePort(cfg.FSChain.Consensus.P2P.Listen, cfg.Experimental.ChainMetaData.P2PPort) + if err != nil { + return nil, fmt.Errorf("parsing consensus P2Ps: %w", err) + } + + fsChainProtocol := v.Protocol + metaChainCfg := config.Consensus{ + Magic: uint32(fsChainProtocol.Network) + 1, + Committee: fsChainProtocol.StandbyCommittee, + Storage: config.Storage{ + Path: path.Join(path.Dir(cfg.FSChain.Consensus.Storage.Path), "meta_db.bolt"), + Type: dbconfig.BoltDB, + }, + TimePerBlock: 50 * time.Millisecond, + MaxTimePerBlock: 20 * time.Second, + MaxTraceableBlocks: fsChainProtocol.MaxTraceableBlocks, + MaxValidUntilBlockIncrement: fsChainProtocol.MaxValidUntilBlockIncrement, + SeedNodes: metaSeeds, + Hardforks: config.Hardforks{}, + ValidatorsHistory: config.ValidatorsHistory{}, + RPC: config.RPC{ + Listen: metaRPCs, + MaxWebSocketClients: cfg.FSChain.Consensus.RPC.MaxWebSocketClients, + SessionPoolSize: cfg.FSChain.Consensus.RPC.SessionPoolSize, + MaxGasInvoke: cfg.FSChain.Consensus.RPC.MaxGasInvoke, + }, + P2P: config.P2P{ + DialTimeout: cfg.FSChain.Consensus.P2P.DialTimeout, + ProtoTickInterval: cfg.FSChain.Consensus.P2P.ProtoTickInterval, + Listen: metaP2Ps, + Peers: cfg.FSChain.Consensus.P2P.Peers, + Ping: cfg.FSChain.Consensus.P2P.Ping, + }, + SetRolesInGenesis: true, + KeepOnlyLatestState: false, + RemoveUntraceableBlocks: false, + P2PNotaryRequestPayloadPoolSize: 1000, // default for blockchain.New() + } + + server.metaChain, err = metachain.NewMetaChain(&metaChainCfg, &cfg.Wallet, errChan, log.With(zap.String("component", "metadata chain (IR)"))) + if err != nil { + return nil, fmt.Errorf("init meta sidechain blockchain: %w", err) + } + server.workers = append(server.workers, func(ctx context.Context) error { + defer server.metaChain.Stop() + return server.metaChain.Run(ctx) + }) + + alphabetList, err := server.fsChainClient.NeoFSAlphabetList() + if err != nil { + return nil, fmt.Errorf("fetching FS chain Alphabet: %w", err) + } + metaCli, err := server.metaChain.BuildWSClient(ctx) + if err != nil { + return nil, fmt.Errorf("build meta chain client: %w", err) + } + alphaAcc := wallet.NewAccountFromPrivateKey(server.key) + err = alphaAcc.ConvertMultisig(sc.GetMajorityHonestNodeCount(len(alphabetList)), alphabetList) + if err != nil { + return nil, fmt.Errorf("build meta committee acc: %w", err) + } + metaActor, err = notary.NewActor(metaCli, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: alphaAcc.ScriptHash(), + Scopes: transaction.CalledByEntry, + }, + Account: alphaAcc, + }, + }, wallet.NewAccountFromPrivateKey(server.key)) + if err != nil { + return nil, fmt.Errorf("build meta committee actor: %w", err) + } + + server.workers = append(server.workers, func(ctx context.Context) error { + simpleAcc := wallet.NewAccountFromPrivateKey(server.key) + simpleAccHash := simpleAcc.ScriptHash() + act, err := actor.New(metaCli, []actor.SignerAccount{{ + Signer: transaction.Signer{ + Account: simpleAccHash, + Scopes: transaction.CalledByEntry, + }, + Account: simpleAcc, + }}) + if err != nil { + return fmt.Errorf("new meta notary actor: %w", err) + } + + gasAct := gas.New(act) + txHash, vub, err := gasAct.Transfer( + simpleAccHash, + notary.Hash, + big.NewInt(metachaingas.DefaultBalance*9/10), // default metadata chain balance but a little bit less + ¬ary.OnNEP17PaymentData{Account: &simpleAccHash, Till: math.MaxUint32}) + if err != nil { + if !errors.Is(err, neorpc.ErrAlreadyExists) { + return fmt.Errorf("can't make notary deposit in meta chain: %w", err) + } + } + + server.log.Debug("made meta chain notary deposit, awaiting...", zap.String("txHash", txHash.StringLE()), zap.Uint32("vub", vub)) + + _, err = act.WaitSuccess(ctx, txHash, vub, nil) + if err != nil { + return fmt.Errorf("waiting for meta chain notary deposit %s TX to be persisted: %w", txHash.StringLE(), err) + } + + server.log.Debug("meta chain notary deposit successful", zap.String("tx_hash", txHash.StringLE())) + + return nil + }) + } + // create netmap processor server.netmapProcessor, err = netmap.New(&netmap.Params{ Log: log, PoolSize: cfg.Workers.Netmap, + MetaClient: metaActor, NetmapClient: server.netmapClient, EpochTimer: server, EpochState: server, @@ -786,8 +941,9 @@ func New(ctx context.Context, log *zap.Logger, cfg *config.Config, errChan chan< PoolSize: cfg.Workers.Container, AlphabetState: server, ContainerClient: cnrClient, + MetaClient: metaActor, NetworkState: server.netmapClient, - MetaEnabled: cfg.Experimental.ChainMetaData, + MetaEnabled: cfg.Experimental.ChainMetaData.Enabled, AllowEC: cfg.Experimental.AllowEC, ChainTime: &server.chainTime, }) @@ -894,6 +1050,19 @@ func New(ctx context.Context, log *zap.Logger, cfg *config.Config, errChan chan< return server, nil } +func changePort(addrs []string, port uint16) ([]string, error) { + res := slices.Clone(addrs) + for i := range res { + host, _, err := net.SplitHostPort(res[i]) + if err != nil { + return nil, fmt.Errorf("[%d] address ('%s') cannot be parsed: %w", i, res[i], err) + } + res[i] = net.JoinHostPort(host, strconv.FormatUint(uint64(port), 10)) + } + + return res, nil +} + func initTimers(server *Server, cfg *config.Config, paymentProcessor *settlement.Processor) { basicIncomeTick, stopBasicIncomeFunc := util.SingleAsyncExecutingInstance(func() { epochN := server.EpochCounter() diff --git a/pkg/innerring/internal/metachain/chain.go b/pkg/innerring/internal/metachain/chain.go new file mode 100644 index 0000000000..61f545be27 --- /dev/null +++ b/pkg/innerring/internal/metachain/chain.go @@ -0,0 +1,14 @@ +package metachain + +import ( + metacontracts "github.com/nspcc-dev/neofs-node/pkg/core/metachain" + "github.com/nspcc-dev/neofs-node/pkg/innerring/config" + "github.com/nspcc-dev/neofs-node/pkg/innerring/internal/blockchain" + "go.uber.org/zap" +) + +// NewMetaChain returns side chain with redefined/custom native contracts. +// See [contracts.NewCustomNatives] for details. +func NewMetaChain(cfg *config.Consensus, wallet *config.Wallet, errChan chan<- error, log *zap.Logger) (*blockchain.Blockchain, error) { + return blockchain.New(cfg, wallet, errChan, log, metacontracts.NewCustomNatives) +} diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 249a6106f9..cb866ebffd 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -7,7 +7,11 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" + "github.com/nspcc-dev/neofs-node/pkg/core/metachain/meta" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" @@ -192,6 +196,55 @@ func (cp *Processor) approvePutContainer(mainTx transaction.Transaction, cnr con return } + if cp.metaEnabled { + var metaPolicy string + for k, v := range cnr.Attributes() { + if k == sysAttrChainMeta && (v == "optimistic" || v == "strict") { + metaPolicy = v + break + } + } + if metaPolicy != "" { + l.Debug("sending registration transaction for metadata container...", zap.Stringer("cid", id), zap.String("metaPolicy", metaPolicy)) + + _, err = cp.metaClient.WaitSuccess(cp.metaClient.Notarize(cp.metaClient.MakeTunedCall(meta.Hash, "registerMetaContainer", nil, func(r *result.Invoke, t *transaction.Transaction) error { + if r.State != vmstate.Halt.String() { + return fmt.Errorf("script failed (%s state) due to an error: %s", r.State, r.FaultException) + } + + vub, err := processors.CalculateVUB(cp.metaClient) + if err != nil { + return fmt.Errorf("could not calculate vub: %w", err) + } + + t.ValidUntilBlock = vub + t.Nonce = mainTx.Nonce + + // Add 10% GAS to prevent this errors: + // "at instruction 1689 (SYSCALL): System.Runtime.Log failed: insufficient amount of gas" + t.SystemFee += t.SystemFee / 10 + + return nil + }, id[:]))) + if err != nil { + l.Error("could not register meta container", zap.Stringer("cid", id), zap.Error(err)) + return + } + + l.Debug("registered meta container", zap.Stringer("cid", id), zap.String("metaPolicy", metaPolicy)) + + l.Debug("sending placement update transaction for metadata container...", zap.Stringer("cid", id), zap.String("metaPolicy", metaPolicy)) + + err := processors.UpdateMetaPlacement(cp.metaClient, id, vectors, policy, mainTx.Nonce) + if err != nil { + l.Error("could not update placement", zap.Stringer("cid", id), zap.Error(err)) + return + } + + l.Debug("updated meta container placement successfully...", zap.Stringer("cid", id), zap.String("metaPolicy", metaPolicy)) + } + } + l.Debug("container successfully approved") } diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go index 39b4e81707..de6122acc3 100644 --- a/pkg/innerring/processors/container/processor.go +++ b/pkg/innerring/processors/container/processor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/notary" "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neofs-node/pkg/core/nns" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" @@ -32,6 +33,7 @@ type ( objectPool *ants.Pool alphabetState AlphabetState cnrClient *container.Client // notary must be enabled + metaClient *notary.Actor netState NetworkState metaEnabled bool allowEC bool @@ -45,6 +47,7 @@ type ( PoolSize int AlphabetState AlphabetState ContainerClient *container.Client + MetaClient *notary.Actor NetworkState NetworkState MetaEnabled bool AllowEC bool @@ -88,6 +91,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/container: global state is not set") case p.ContainerClient == nil: return nil, errors.New("ir/container: Container client is not set") + case p.MetaEnabled && p.MetaClient == nil: + return nil, errors.New("ir/container: MetaClient is not set") case p.NetworkState == nil: return nil, errors.New("ir/container: network state is not set") case p.ChainTime == nil: @@ -111,6 +116,7 @@ func New(p *Params) (*Processor, error) { alphabetState: p.AlphabetState, cnrClient: p.ContainerClient, netState: p.NetworkState, + metaClient: p.MetaClient, metaEnabled: p.MetaEnabled, allowEC: p.AllowEC, chainTime: p.ChainTime, diff --git a/pkg/innerring/processors/meta_placement.go b/pkg/innerring/processors/meta_placement.go new file mode 100644 index 0000000000..ef90a7368d --- /dev/null +++ b/pkg/innerring/processors/meta_placement.go @@ -0,0 +1,93 @@ +package processors + +import ( + "bytes" + "crypto/elliptic" + "fmt" + "slices" + "sort" + + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/notary" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" + "github.com/nspcc-dev/neofs-node/pkg/core/metachain/meta" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +// UpdateMetaPlacement sends notary request using provided actor to update +// container's placement in metadata sidechain. +func UpdateMetaPlacement(metaClient *notary.Actor, cid cid.ID, vectors [][]netmap.NodeInfo, policy netmap.PlacementPolicy, nonce uint32) error { + var ( + placement meta.Placement + replicas = policy.Replicas() + ) + for i, v := range vectors { + var cnrVector meta.PlacementVector + rep := uint8(1) + if i < len(replicas) { + rep = uint8(policy.ReplicaNumberByIndex(i)) + } + cnrVector.REP = rep + + slices.SortFunc(v, func(a, b netmap.NodeInfo) int { + return bytes.Compare(a.PublicKey(), b.PublicKey()) + }) + + kk := make(keys.PublicKeys, 0, len(v)) + for _, n := range v { + k, err := keys.NewPublicKeyFromBytes(n.PublicKey(), elliptic.P256()) + if err != nil { + return fmt.Errorf("could not parse public key for %s meta container placement: %w", cid, err) + } + kk = append(kk, k) + } + + sort.Sort(kk) + cnrVector.Nodes = kk + + placement = append(placement, cnrVector) + } + + _, err := metaClient.WaitSuccess(metaClient.Notarize(metaClient.MakeTunedCall(meta.Hash, "updateContainerList", nil, func(r *result.Invoke, t *transaction.Transaction) error { + if r.State != vmstate.Halt.String() { + return fmt.Errorf("script failed (%s state) due to an error: %s", r.State, r.FaultException) + } + + vub, err := CalculateVUB(metaClient) + if err != nil { + return fmt.Errorf("could not calculate vub: %w", err) + } + + t.ValidUntilBlock = vub + t.Nonce = nonce + + // Add 10% GAS to prevent this errors: + // "at instruction 1689 (SYSCALL): System.Runtime.Log failed: insufficient amount of gas" + t.SystemFee += t.SystemFee / 10 + + return nil + }, cid[:], &placement))) + return err +} + +// CalculateVUB returns a suitable Valid Until Block value so that consensus +// nodes should have enough time to send their signatures for notary requests. +func CalculateVUB(cli *notary.Actor) (uint32, error) { + bc, err := cli.GetBlockCount() + if err != nil { + return 0, fmt.Errorf("can't get current blockchain height: %w", err) + } + + const ( + defaultNotaryValidTime = 50 + defaultNotaryRoundTime = 100 + ) + + minIndex := bc + defaultNotaryValidTime + rounded := (minIndex/defaultNotaryRoundTime + 1) * defaultNotaryRoundTime + + return rounded, nil +} diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index cc053bc32f..5e598bfa2f 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -7,6 +7,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -56,7 +57,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { if mapChanged { l.Debug("updating placements in Container contract...") - err = np.updatePlacementInContract(*networkMap, l) + err = np.updatePlacementInContract(*networkMap, epoch, l) if err != nil { l.Error("can't update placements in Container contract", zap.Error(err)) } else { @@ -67,7 +68,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.handleNotaryDeposit(ev) } -func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) error { +func (np *Processor) updatePlacementInContract(nm netmap.NetMap, epoch uint64, l *zap.Logger) error { cids, err := np.containerWrp.List(nil) if err != nil { return fmt.Errorf("can't get containers list: %w", err) @@ -100,6 +101,26 @@ func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) replicas[i] = 1 // each EC part is stored in a single copy } + if np.metaClient != nil { + var metaPolicy string + for k, v := range cnr.Attributes() { + if k == "__NEOFS__METAINFO_CONSISTENCY" && (v == "optimistic" || v == "strict") { + metaPolicy = v + break + } + } + if metaPolicy != "" { + np.log.Debug("updating meta container placements...", zap.Stringer("cid", cID)) + + err = processors.UpdateMetaPlacement(np.metaClient, cID, vectors, policy, uint32(epoch)) + if err != nil { + np.log.Error("could not update meta container placement", zap.Stringer("cid", cID), zap.Error(err)) + } else { + np.log.Debug("updated meta container placements...", zap.Stringer("cid", cID)) + } + } + } + err = np.containerWrp.UpdateContainerPlacement(cID, vectors, replicas) if err != nil { blockTimeMs, err := np.netmapClient.Morph().MsPerBlock() diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index eb99c0d469..ba8d734be2 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "time" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/notary" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" @@ -54,6 +55,7 @@ type ( alphabetState AlphabetState curMap atomic.Value + metaClient *notary.Actor netmapClient *nmClient.Client containerWrp *container.Client @@ -67,6 +69,7 @@ type ( Params struct { Log *zap.Logger PoolSize int + MetaClient *notary.Actor NetmapClient *nmClient.Client EpochTimer EpochTimerReseter EpochState EpochState @@ -123,6 +126,7 @@ func New(p *Params) (*Processor, error) { alphabetState: p.AlphabetState, netmapClient: p.NetmapClient, containerWrp: p.ContainerWrapper, + metaClient: p.MetaClient, handleAlphabetSync: p.AlphabetSyncHandler,