Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/grpc/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *BlobServiceServer) GetAll(ctx context.Context, req *pb.GetAllRequest) (
nsList[i] = ns
}

var allBlobs []types.Blob
allBlobs := make([]types.Blob, 0, len(nsList)*8)
for _, ns := range nsList {
blobs, err := s.svc.Store().GetBlobs(ctx, ns, req.Height, req.Height, 0, 0)
if err != nil {
Expand Down
28 changes: 23 additions & 5 deletions pkg/api/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,13 @@ func (n *Notifier) Subscribe(namespaces []types.Namespace) (*Subscription, error
}

id := n.nextID.Add(1)
nsSet := make(map[types.Namespace]struct{}, len(namespaces))
for _, ns := range namespaces {
nsSet[ns] = struct{}{}

var nsSet map[types.Namespace]struct{}
if len(namespaces) > 0 {
nsSet = make(map[types.Namespace]struct{}, len(namespaces))
for _, ns := range namespaces {
nsSet[ns] = struct{}{}
}
}

sub := &Subscription{
Expand Down Expand Up @@ -161,13 +165,27 @@ func (n *Notifier) Publish(event HeightEvent) {

// filterEvent returns an event with blobs filtered to the subscriber's
// namespace set. If the subscriber watches all namespaces, the event is
// returned as-is.
// returned as-is. Avoids allocation when no blobs match.
func (n *Notifier) filterEvent(event HeightEvent, sub *Subscription) HeightEvent {
if len(sub.namespaces) == 0 {
return event
}

filtered := make([]types.Blob, 0, len(event.Blobs))
// Count matches first to avoid allocating when nothing matches.
count := 0
for i := range event.Blobs {
if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok {
count++
}
}
if count == len(event.Blobs) {
return event // all match — no copy needed
}
if count == 0 {
return HeightEvent{Height: event.Height, Header: event.Header}
}

filtered := make([]types.Blob, 0, count)
for i := range event.Blobs {
if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok {
filtered = append(filtered, event.Blobs[i])
Expand Down
28 changes: 19 additions & 9 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (j
// limit=0 means no limit; offset=0 means no offset.
// Pagination is applied to the aggregate result across all namespaces.
func (s *Service) BlobGetAll(ctx context.Context, height uint64, namespaces []types.Namespace, limit, offset int) (json.RawMessage, error) {
var allBlobs []types.Blob
allBlobs := make([]types.Blob, 0, len(namespaces)*8) // preallocate for typical workload
for _, ns := range namespaces {
blobs, err := s.store.GetBlobs(ctx, ns, height, height, 0, 0)
if err != nil {
Expand Down Expand Up @@ -179,15 +179,25 @@ func (s *Service) Fetcher() fetch.DataFetcher {
return s.fetcher
}

// blobJSON is a struct-based representation for celestia-node compatible JSON.
// Using a struct avoids the per-call map[string]any allocation that json.Marshal
// requires for maps.
type blobJSON struct {
Namespace []byte `json:"namespace"`
Data []byte `json:"data"`
ShareVersion uint32 `json:"share_version"`
Commitment []byte `json:"commitment"`
Index int `json:"index"`
}

// MarshalBlob converts a stored blob into celestia-node compatible JSON.
func MarshalBlob(b *types.Blob) json.RawMessage {
m := map[string]any{
"namespace": b.Namespace[:],
"data": b.Data,
"share_version": b.ShareVersion,
"commitment": b.Commitment,
"index": b.Index,
}
raw, _ := json.Marshal(m) //nolint:errcheck
raw, _ := json.Marshal(blobJSON{ //nolint:errcheck
Namespace: b.Namespace[:],
Data: b.Data,
ShareVersion: b.ShareVersion,
Commitment: b.Commitment,
Index: b.Index,
})
return raw
}
10 changes: 7 additions & 3 deletions pkg/fetch/celestia_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ type CelestiaAppFetcher struct {

// bearerCreds implements grpc.PerRPCCredentials for bearer token auth.
type bearerCreds struct {
token string
metadata map[string]string // cached; avoids allocation per RPC
}

func newBearerCreds(token string) bearerCreds {
return bearerCreds{metadata: map[string]string{"authorization": "Bearer " + token}}
}

func (b bearerCreds) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
return map[string]string{"authorization": "Bearer " + b.token}, nil
return b.metadata, nil
}

func (b bearerCreds) RequireTransportSecurity() bool { return false }
Expand All @@ -45,7 +49,7 @@ func NewCelestiaAppFetcher(grpcAddr, authToken string, log zerolog.Logger) (*Cel
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
if authToken != "" {
opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken}))
opts = append(opts, grpc.WithPerRPCCredentials(newBearerCreds(authToken)))
}

conn, err := grpc.NewClient(grpcAddr, opts...)
Expand Down
31 changes: 17 additions & 14 deletions pkg/fetch/celestia_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog
func httpToWS(addr string) string {
switch {
case strings.HasPrefix(addr, "http://"):
return "ws://" + strings.TrimPrefix(addr, "http://")
return "ws://" + addr[len("http://"):]
case strings.HasPrefix(addr, "https://"):
return "wss://" + strings.TrimPrefix(addr, "https://")
return "wss://" + addr[len("https://"):]
default:
return addr
}
Expand Down Expand Up @@ -474,25 +474,28 @@ func isTransientRPCError(err error) bool {
return true
}
msg := strings.ToLower(err.Error())
for _, needle := range []string{
"eof",
"connection reset by peer",
"broken pipe",
"i/o timeout",
"timeout",
"temporarily unavailable",
"connection refused",
"503",
"504",
"502",
} {
for _, needle := range transientNeedles {
if strings.Contains(msg, needle) {
return true
}
}
return false
}

// transientNeedles is allocated once at package init to avoid per-call slice allocation.
var transientNeedles = []string{
"eof",
"connection reset by peer",
"broken pipe",
"i/o timeout",
"timeout",
"temporarily unavailable",
"connection refused",
"503",
"504",
"502",
}

// jsonInt64 handles CometBFT's int64 fields encoded as JSON strings.
type jsonInt64 int64

Expand Down
Loading