Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions indexer/cmd/oapigen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ func main() {
return nil, nil
})

huma.Register(grp, huma.Operation{
OperationID: "initiate-replay",
Method: http.MethodPost,
Path: "/initiatereplay",
Description: "Initiate a replay for messages that match a given criteria to update the index.",
}, func(ctx context.Context, input *v1.ReplayInput) (*v1.ReplayResponse, error) {
return nil, nil
})

oapi := api.OpenAPI()

// Remove $schema properties from all schemas (it was only in ErrorResponse)
Expand Down
4 changes: 4 additions & 0 deletions indexer/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func NewV1API(lggr logger.Logger, cfg *config.Config, storage common.IndexerStor
messagesHandler := v1.NewMessagesHandler(storage, lggr, monitoring)
v1Group.GET("/messages", messagesHandler.Handle)

// Replay endpoint.
replayHandler := v1.NewReplayHandler(lggr, monitoring)
v1Group.GET("/replay", replayHandler.Handle)

// App readiness and health endpoints
healthHandler := v1.NewHealthHandler(storage, lggr, monitoring)
router.GET("/health", healthHandler.Handle)
Expand Down
65 changes: 65 additions & 0 deletions indexer/pkg/api/handlers/v1/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package v1

import (
"net/http"

"github.com/gin-gonic/gin"

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

type ReplayInput struct {
// SourceChainSelectors []protocol.ChainSelector `doc:"Source chain selectors to filter results by. If empty, results from all source chains will be returned." query:"sourceChainSelectors"`
// DestChainSelectors []protocol.ChainSelector `doc:"Destination chain selectors to filter results by. If empty, results from all destination chains will be returned." query:"destChainSelectors"`
// Start string `doc:"Start time used to filter results. If not provided, results start from the beginning. Accepted formats: RFC3339, unix epoch time (in milliseconds)." form:"start" query:"start"`
// End string `doc:"End time used to filter results. If not provided, the current server time is used. Accepted formats: RFC3339, unix epoch time (in milliseconds)." form:"end" query:"end"`
// Limit uint64 `doc:"Maximum number of results to return. If not provided, defaults to 100." form:"limit" query:"limit"`
// Offset uint64 `doc:"Number of results to skip before starting to return results. If not provided, defaults to 0." form:"offset" query:"offset"`
}

type ReplayResponse struct {
Success bool `json:"success" doc:"Indicates whether the request was successful."`
// VerifierResults map[string][]common.VerifierResultWithMetadata `json:"verifierResults" doc:"A map of message IDs to their corresponding verifier results. Each key is a message ID, and the value is a list of verifier results associated with that message."`
}

type ReplayHandler struct {
// storage common.IndexerStorage
lggr logger.Logger
monitoring common.IndexerMonitoring
}

func NewReplayHandler(lggr logger.Logger, monitoring common.IndexerMonitoring) *ReplayHandler {
return &ReplayHandler{
lggr: lggr,
monitoring: monitoring,
}
}

func (h *ReplayHandler) Handle(c *gin.Context) {
/*
req := ReplayInput{}

if err := c.ShouldBindQuery(&req); err != nil {
c.JSON(http.StatusBadRequest, makeErrorResponse(http.StatusBadRequest, err.Error()))
return
}
verifierResponse, err := h.storage.QueryCCVData(c.Request.Context(), startTime, endTime, req.SourceChainSelectors, req.DestChainSelectors, req.Limit, req.Offset)
if err != nil {
c.JSON(http.StatusInternalServerError, makeErrorResponse(http.StatusInternalServerError, err.Error()))
return
}

h.lggr.Debugw("/v1/verifierresults", "number of messages returned", len(verifierResponse))
c.JSON(http.StatusOK, VerifierResultsResponse{
Success: true,
VerifierResults: verifierResponse,
})
*/
h.lggr.Debugw("/v1/replay")
c.JSON(http.StatusOK, VerifierResultsResponse{
Success: true,
// VerifierResults: verifierResponse,
})
}
38 changes: 37 additions & 1 deletion indexer/pkg/discovery/message_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -150,7 +151,42 @@
}

func (a *AggregatorMessageDiscovery) Replay(ctx context.Context, start, end uint64) error {
return nil
// Basic validation
if end < start {
return fmt.Errorf("end must be >= start")
}

// Acquire the reader lock to prevent concurrent reads while replaying
a.readerLock.Lock()
defer a.readerLock.Unlock()

// Ensure the reader supports setting the since value
if ok := a.aggregatorReader.SetSinceValue(int64(start)); !ok {
return fmt.Errorf("underlying reader does not support SetSinceValue, cannot replay")
}

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
// Check current since value
cur, _ := a.aggregatorReader.GetSinceValue()
if uint64(cur) > end {
return nil
}

found, err := a.callReader(ctx)
if err != nil {
return err
}

// If no data found, we're likely caught up; return nil
if !found {
return nil
}
}
}
}

func (a *AggregatorMessageDiscovery) run(ctx context.Context) {
Expand Down Expand Up @@ -184,8 +220,8 @@
defer ticker.Stop()

for {
select {

Check failure on line 223 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / test-coverage-report

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock)

Check failure on line 223 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / test-coverage

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock)

Check failure on line 223 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / lint

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock)
case <-ctx.Done():

Check failure on line 224 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / test-coverage-report

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock)

Check failure on line 224 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / test-coverage

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock)

Check failure on line 224 in indexer/pkg/discovery/message_discovery.go

View workflow job for this annotation

GitHub Actions / lint

a.readerLock undefined (type *AggregatorMessageDiscovery has no field or method readerLock) (typecheck)
a.logger.Info("updateSequenceNumber stopped due to context cancellation")
return
case <-ticker.C:
Expand Down
1 change: 1 addition & 0 deletions protocol/message_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ type DiscoveryStorageReader interface {
GetSinceValue() int64
// SetSinceValue allows moves the cursor of the reader within the discovery service
// this happens automatically within the reader, this is an escape hatch mainly used to move the cursor backwards for replays.
// When reading from aggregator, since is a sequence number.
SetSinceValue(since int64)
}

Expand Down
Loading