Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ replace (
)

require (
cosmossdk.io/errors v1.0.2
cosmossdk.io/math v1.5.3
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/DataDog/zstd v1.5.7
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
cosmossdk.io/collections v1.3.1 // indirect
cosmossdk.io/core v0.11.3 // indirect
cosmossdk.io/depinject v1.2.1 // indirect
cosmossdk.io/errors v1.0.2 // indirect
cosmossdk.io/log v1.6.1 // indirect
cosmossdk.io/schema v1.1.0 // indirect
cosmossdk.io/store v1.1.2 // indirect
Expand Down
38 changes: 26 additions & 12 deletions pkg/logtrace/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"runtime"
"strings"
"sync/atomic"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -20,10 +21,14 @@ const CorrelationIDKey ContextKey = "correlation_id"
const OriginKey ContextKey = "origin"

var (
logger *zap.Logger
minLevel zapcore.Level = zapcore.InfoLevel // effective minimum log level
loggerPtr atomic.Pointer[zap.Logger]
minLevel atomic.Int32 // effective minimum log level as zapcore.Level
)

func init() {
minLevel.Store(int32(zapcore.InfoLevel))
}

// Setup initializes the logger for readable output in all modes.
func Setup(serviceName string) {
var err error
Expand All @@ -42,20 +47,24 @@ func Setup(serviceName string) {
// Always respect the LOG_LEVEL environment variable.
lvl := getLogLevel()
config.Level = zap.NewAtomicLevelAt(lvl)
// Persist the effective minimum so non-core sinks (e.g., Datadog) can
// filter entries consistently with the console logger.
minLevel = lvl

// Build the logger from the customized config.
var built *zap.Logger
if tracingEnabled {
logger, err = config.Build(zap.AddCallerSkip(1), zap.AddStacktrace(zapcore.ErrorLevel))
built, err = config.Build(zap.AddCallerSkip(1), zap.AddStacktrace(zapcore.ErrorLevel))
} else {
logger, err = config.Build()
built, err = config.Build()
}
if err != nil {
panic(err)
}

// Publish atomically so concurrent Setup/log calls cannot race on package
// globals. The effective minimum is stored after the logger so a racing log
// call always sees either the old complete pair or a conservative new logger
// with the previous Datadog gate for one call.
loggerPtr.Store(built)
minLevel.Store(int32(lvl))

// Initialize Datadog forwarding (minimal integration in separate file)
SetupDatadog(serviceName)
}
Expand Down Expand Up @@ -120,12 +129,17 @@ func extractCorrelationID(ctx context.Context) string {

// logWithLevel logs a message with structured fields.
func logWithLevel(level zapcore.Level, ctx context.Context, message string, fields Fields) {
if logger == nil {
lg := loggerPtr.Load()
if lg == nil {
Setup("unknown-service") // Fallback if Setup wasn't called
lg = loggerPtr.Load()
if lg == nil {
return
}
}

// Drop early if below the configured level (keeps Datadog in sync)
if !logger.Core().Enabled(level) {
if !lg.Core().Enabled(level) {
return
}

Expand All @@ -149,7 +163,7 @@ func logWithLevel(level zapcore.Level, ctx context.Context, message string, fiel
}

// Log with the structured fields using a level check/write
if ce := logger.Check(level, message); ce != nil {
if ce := lg.Check(level, message); ce != nil {
ce.Write(zapFields...)
} else {
// Should not happen due to early Enabled check, but guard anyway
Expand All @@ -159,7 +173,7 @@ func logWithLevel(level zapcore.Level, ctx context.Context, message string, fiel
// Forward to Datadog (non-blocking, best-effort) only if level is enabled
// for the current configuration. This prevents forwarding debug entries
// when the logger is configured for info and above.
if level >= minLevel {
if int32(level) >= minLevel.Load() {
ForwardDatadog(level, ctx, message, fields)
}
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/logtrace/race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//go:build race

package logtrace

import (
"context"
"sync"
"testing"
)

func TestSetupConcurrentWithLoggingRaceFree(t *testing.T) {
ctx := context.Background()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if i%10 == 0 {
Setup("race-test")
}
Debug(ctx, "debug", Fields{"i": i})
Info(ctx, "info", Fields{"i": i})
Warn(ctx, "warn", Fields{"i": i})
}(i)
}
wg.Wait()
}
29 changes: 23 additions & 6 deletions pkg/lumera/chainerrors/chainerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
//
// The predicates here:
//
// 1. Prefer typed sentinel matching via errors.Is.
// 2. Fall through to gRPC status codes for query-side rejections.
// 3. Keep an English-substring fallback so we remain correct against any
// currently-deployed chain build whose error path doesn't preserve the
// typed sentinel through the wire (defense-in-depth, removable once
// every chain build in production guarantees end-to-end ABCIError).
// 1. Prefer typed sentinel matching via errors.Is.
// 2. Fall through to gRPC status codes for query-side rejections.
// 3. Keep an English-substring fallback so we remain correct against any
// currently-deployed chain build whose error path doesn't preserve the
// typed sentinel through the wire (defense-in-depth, removable once
// every chain build in production guarantees end-to-end ABCIError).
//
// IsTransientGrpc is the safety valve: any path that classifies an error as
// "definitely a chain-side reject" (and would therefore destructively clean
Expand Down Expand Up @@ -69,6 +69,23 @@ func IsHealOpInvalidState(err error) bool {
// matched any error containing "not found" (gRPC "block N not found", codec
// lookup miss, key-not-found inside Cosmos SDK), which led to destructive
// cleanup on transient query failures.

// IsHealOpPastDeadline reports whether err is the chain-side invalid-state
// rejection for a heal-op whose deadline has already passed. As of Lumera
// chain x/audit/v1/types/errors.go there is no dedicated past-deadline
// sentinel; the tx path uses ErrHealOpInvalidState for several heal-op
// rejections. Keep this predicate phrase-anchored so callers can short-circuit
// deadline rejects without treating every invalid-state error as expired.
func IsHealOpPastDeadline(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return errors.Is(err, audittypes.ErrHealOpInvalidState) &&
strings.Contains(msg, "heal op") &&
strings.Contains(msg, "deadline")
}

func IsHealOpNotFound(err error) bool {
if err == nil {
return false
Expand Down
13 changes: 12 additions & 1 deletion pkg/lumera/chainerrors/chainerrors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"testing"

audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
errorsmod "cosmossdk.io/errors"
audittypes "github.com/LumeraProtocol/lumera/x/audit/v1/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -167,3 +167,14 @@ func TestRegression_TransientNotFoundDoesNotMatchHealOpNotFound(t *testing.T) {
}
}
}

func TestIsHealOpPastDeadline(t *testing.T) {
deadlineErr := fmt.Errorf("submit claim: %w", errorsmod.Wrap(audittypes.ErrHealOpInvalidState, "heal op deadline has passed"))
if !IsHealOpPastDeadline(deadlineErr) {
t.Fatalf("expected deadline invalid-state error to match")
}
stateErr := fmt.Errorf("submit claim: %w", errorsmod.Wrap(audittypes.ErrHealOpInvalidState, "heal op status VERIFIED does not accept healer completion claim"))
if IsHealOpPastDeadline(stateErr) {
t.Fatalf("generic invalid-state error must not be treated as deadline")
}
}
30 changes: 0 additions & 30 deletions pkg/lumera/modules/audit/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,6 @@ func (m *module) GetEpochReportsByReporter(ctx context.Context, reporterAccount
return resp, nil
}

func (m *module) GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error) {
resp, err := m.client.NodeSuspicionState(ctx, &types.QueryNodeSuspicionStateRequest{
SupernodeAccount: supernodeAccount,
})
if err != nil {
return nil, fmt.Errorf("failed to get node suspicion state: %w", err)
}
return resp, nil
}

func (m *module) GetReporterReliabilityState(ctx context.Context, reporterAccount string) (*types.QueryReporterReliabilityStateResponse, error) {
resp, err := m.client.ReporterReliabilityState(ctx, &types.QueryReporterReliabilityStateRequest{
ReporterSupernodeAccount: reporterAccount,
})
if err != nil {
return nil, fmt.Errorf("failed to get reporter reliability state: %w", err)
}
return resp, nil
}

func (m *module) GetTicketDeteriorationState(ctx context.Context, ticketID string) (*types.QueryTicketDeteriorationStateResponse, error) {
resp, err := m.client.TicketDeteriorationState(ctx, &types.QueryTicketDeteriorationStateRequest{
TicketId: ticketID,
})
if err != nil {
return nil, fmt.Errorf("failed to get ticket deterioration state: %w", err)
}
return resp, nil
}

func (m *module) GetHealOp(ctx context.Context, healOpID uint64) (*types.QueryHealOpResponse, error) {
resp, err := m.client.HealOp(ctx, &types.QueryHealOpRequest{
HealOpId: healOpID,
Expand Down
5 changes: 0 additions & 5 deletions pkg/lumera/modules/audit/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ type Module interface {
GetEpochReport(ctx context.Context, epochID uint64, supernodeAccount string) (*types.QueryEpochReportResponse, error)
GetEpochReportsByReporter(ctx context.Context, reporterAccount string, epochID uint64) (*types.QueryEpochReportsByReporterResponse, error)

// LEP-6 storage-truth state queries.
GetNodeSuspicionState(ctx context.Context, supernodeAccount string) (*types.QueryNodeSuspicionStateResponse, error)
GetReporterReliabilityState(ctx context.Context, reporterAccount string) (*types.QueryReporterReliabilityStateResponse, error)
GetTicketDeteriorationState(ctx context.Context, ticketID string) (*types.QueryTicketDeteriorationStateResponse, error)

// LEP-6 heal-op queries.
GetHealOp(ctx context.Context, healOpID uint64) (*types.QueryHealOpResponse, error)
GetHealOpsByStatus(ctx context.Context, status types.HealOpStatus, pagination *query.PageRequest) (*types.QueryHealOpsByStatusResponse, error)
Expand Down
9 changes: 9 additions & 0 deletions pkg/lumera/modules/audit_msg/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ func TestSubmitStorageRecheckEvidenceValidatesInputsBeforeTxExecution(t *testing
_, err = m.SubmitStorageRecheckEvidence(context.Background(), 7, "target", "ticket", "challenged", strings.Repeat(" ", 3), audittypes.StorageProofResultClass_STORAGE_PROOF_RESULT_CLASS_RECHECK_CONFIRMED_FAIL, "")
require.ErrorContains(t, err, "recheck transcript hash cannot be empty")
}

func TestSubmitEvidenceValidatesInputsBeforeTxExecution(t *testing.T) {
m := &module{}
_, err := m.SubmitEvidence(context.Background(), " ", audittypes.EvidenceType_EVIDENCE_TYPE_UNSPECIFIED, "action", "{}")
require.ErrorContains(t, err, "subject address cannot be empty")

_, err = m.SubmitEvidence(context.Background(), "subject", audittypes.EvidenceType_EVIDENCE_TYPE_UNSPECIFIED, "action", " ")
require.ErrorContains(t, err, "metadata cannot be empty")
}
9 changes: 7 additions & 2 deletions pkg/metrics/lep6/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MetricsSnapshot struct {
HealVerificationsAlreadyExistsTotal uint64
HealFinalizePublishesTotal uint64
HealFinalizeCleanupsTotal map[string]uint64 // status
HealOrphanedStagingCleanupsTotal uint64
SelfHealingPendingClaims int64
SelfHealingStagingBytes int64

Expand Down Expand Up @@ -135,6 +136,7 @@ var metrics = struct {
healVerificationsAlreadyExist atomic.Uint64
healFinalizePublishes atomic.Uint64
healFinalizeCleanups counterMap
healOrphanedStagingCleanups atomic.Uint64
selfHealingPendingClaims atomic.Int64
selfHealingStagingBytes atomic.Int64

Expand Down Expand Up @@ -162,6 +164,7 @@ func Reset() {
metrics.healVerificationsAlreadyExist.Store(0)
metrics.healFinalizePublishes.Store(0)
metrics.healFinalizeCleanups.reset()
metrics.healOrphanedStagingCleanups.Store(0)
metrics.selfHealingPendingClaims.Store(0)
metrics.selfHealingStagingBytes.Store(0)
metrics.recheckCandidatesFound.Store(0)
Expand Down Expand Up @@ -189,6 +192,7 @@ func Snapshot() MetricsSnapshot {
HealVerificationsAlreadyExistsTotal: metrics.healVerificationsAlreadyExist.Load(),
HealFinalizePublishesTotal: metrics.healFinalizePublishes.Load(),
HealFinalizeCleanupsTotal: metrics.healFinalizeCleanups.snapshot(),
HealOrphanedStagingCleanupsTotal: metrics.healOrphanedStagingCleanups.Load(),
SelfHealingPendingClaims: metrics.selfHealingPendingClaims.Load(),
SelfHealingStagingBytes: metrics.selfHealingStagingBytes.Load(),
RecheckCandidatesFoundTotal: metrics.recheckCandidatesFound.Load(),
Expand All @@ -199,8 +203,8 @@ func Snapshot() MetricsSnapshot {
}
}

func IncDispatchResult(resultClass string) { metrics.dispatchResults.inc(resultClass, 1) }
func IncDispatchSignFailure(context string) { metrics.dispatchSignFailures.inc(context, 1) }
func IncDispatchResult(resultClass string) { metrics.dispatchResults.inc(resultClass, 1) }
func IncDispatchSignFailure(context string) { metrics.dispatchSignFailures.inc(context, 1) }
func IncDispatchInternalFailure(stage string) { metrics.dispatchInternalFailures.inc(stage, 1) }
func IncDispatchThrottled(policy string, dropped int) {
if dropped > 0 {
Expand Down Expand Up @@ -237,6 +241,7 @@ func IncHealVerification(outcome string, verified bool) {
func IncHealVerificationAlreadyExists() { metrics.healVerificationsAlreadyExist.Add(1) }
func IncHealFinalizePublish() { metrics.healFinalizePublishes.Add(1) }
func IncHealFinalizeCleanup(status string) { metrics.healFinalizeCleanups.inc(status, 1) }
func IncHealOrphanedStagingCleanup() { metrics.healOrphanedStagingCleanups.Add(1) }
func SetSelfHealingPendingClaims(count int) {
metrics.selfHealingPendingClaims.Store(nonNegativeInt64(count))
}
Expand Down
Loading