Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions adapter/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewGRPCServer(store store.ScanStore, coordinate *kv.Coordinate) *GRPCServer
}

func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
opCounter.WithLabelValues("raw_get", "grpc").Inc()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While adding metrics is great, manually incrementing the counter in every gRPC method leads to code duplication and is error-prone. If a new method is added, one might forget to add the metrics line.

A better approach is to use a custom gRPC unary interceptor to handle this logic centrally. The interceptor can extract the method name, format it, and increment the counter. This would remove boilerplate from all the service methods and make the code more maintainable.

For example, you could create an interceptor in the adapter package:

import (
	"context"
	"path"
	"strings"

	"google.golang.org/grpc"
)

// MetricsInterceptor is a gRPC unary interceptor that records operation metrics.
func MetricsInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	// Example: /RawKV/RawGet -> raw_get
	// You might need a more robust way to convert CamelCase to snake_case.
	methodName := strings.ToLower(path.Base(info.FullMethod))
	opCounter.WithLabelValues(methodName, "grpc").Inc()
	return handler(ctx, req)
}

Then, in main.go, you would chain this interceptor with the existing one from go-grpc-prometheus, for example by using a library like grpc-middleware.

v, err := r.store.Get(ctx, req.Key)
if err != nil {
switch {
Expand All @@ -60,6 +61,7 @@ func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawG
}

func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error) {
opCounter.WithLabelValues("raw_put", "grpc").Inc()
m, err := r.grpcTranscoder.RawPutToRequest(req)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -80,6 +82,7 @@ func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPut
}

func (r GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error) {
opCounter.WithLabelValues("raw_delete", "grpc").Inc()
m, err := r.grpcTranscoder.RawDeleteToRequest(req)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -112,6 +115,7 @@ func (r GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.
}

func (r GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error) {
opCounter.WithLabelValues("put", "grpc").Inc()
reqs, err := r.grpcTranscoder.TransactionalPutToRequests(req)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -133,6 +137,7 @@ func (r GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutRespons
}

func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
opCounter.WithLabelValues("get", "grpc").Inc()
h := murmur3.New64()
if _, err := h.Write(req.Key); err != nil {
return nil, errors.WithStack(err)
Expand All @@ -153,6 +158,7 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
}

func (r GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
opCounter.WithLabelValues("delete", "grpc").Inc()
reqs, err := r.grpcTranscoder.TransactionalDeleteToRequests(req)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -174,6 +180,7 @@ func (r GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.Dele
}

func (r GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error) {
opCounter.WithLabelValues("scan", "grpc").Inc()
limit, err := internal.Uint64ToInt(req.Limit)
if err != nil {
return &pb.ScanResponse{
Expand Down
14 changes: 14 additions & 0 deletions adapter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package adapter

import "github.com/prometheus/client_golang/prometheus"

var (
opCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "elastickv_operations_total",
Help: "Total number of KV operations",
}, []string{"op", "client"})
)

func init() {
prometheus.MustRegister(opCounter)
}
5 changes: 5 additions & 0 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (r *RedisServer) ping(conn redcon.Conn, _ redcon.Command) {
}

func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
opCounter.WithLabelValues("set", "redis").Inc()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the gRPC handlers, there's code duplication here for incrementing the metrics counter. This can be improved by using a higher-order function to wrap your command handlers.

This would centralize the metrics logic and make the code cleaner and easier to maintain. You could define a wrapper and use it in NewRedisServer when populating the route map.

For example:

// withMetrics wraps a redcon handler to increment an operation counter.
func (r *RedisServer) withMetrics(op string, handler func(redcon.Conn, redcon.Command)) func(redcon.Conn, redcon.Command) {
	return func(conn redcon.Conn, cmd redcon.Command) {
		opCounter.WithLabelValues(op, "redis").Inc()
		handler(conn, cmd)
	}
}

// In NewRedisServer():
r.route["SET"] = r.withMetrics("set", r.set)
r.route["GET"] = r.withMetrics("get", r.get)
// ... and so on

With this change, you would remove the manual opCounter.Inc() call from each handler function like set, get, etc.

res, err := r.redisTranscoder.SetToRequest(cmd.Args[1], cmd.Args[2])
if err != nil {
conn.WriteError(err.Error())
Expand All @@ -113,6 +114,7 @@ func (r *RedisServer) set(conn redcon.Conn, cmd redcon.Command) {
}

func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
opCounter.WithLabelValues("get", "redis").Inc()
v, err := r.store.Get(context.Background(), cmd.Args[1])
if err != nil {
conn.WriteNull()
Expand All @@ -123,6 +125,7 @@ func (r *RedisServer) get(conn redcon.Conn, cmd redcon.Command) {
}

func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) {
opCounter.WithLabelValues("del", "redis").Inc()
res, err := r.redisTranscoder.DeleteToRequest(cmd.Args[1])
if err != nil {
conn.WriteError(err.Error())
Expand All @@ -139,6 +142,7 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) {
}

func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
opCounter.WithLabelValues("exists", "redis").Inc()
ok, err := r.store.Exists(context.Background(), cmd.Args[1])
if err != nil {
conn.WriteError(err.Error())
Expand All @@ -153,6 +157,7 @@ func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
}

func (r *RedisServer) keys(conn redcon.Conn, cmd redcon.Command) {
opCounter.WithLabelValues("keys", "redis").Inc()

// If an asterisk (*) is not included, the match will be exact,
// so check if the key exists.
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.50.0
github.com/cockroachdb/errors v1.12.0
github.com/emirpasic/gods v1.18.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/raft v1.7.3
github.com/hashicorp/raft-boltdb/v2 v2.3.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.0
github.com/redis/go-redis/v9 v9.12.1
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.11.1
Expand All @@ -42,6 +44,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.34.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.38.1 // indirect
github.com/aws/smithy-go v1.23.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand All @@ -63,6 +66,9 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/tidwall/btree v1.1.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ github.com/aws/smithy-go v1.23.0 h1:8n6I3gXzWJB2DxBDnfxgBaSX6oe0d/t10qGz7OKqMCE=
github.com/aws/smithy-go v1.23.0/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
Expand Down Expand Up @@ -153,6 +154,8 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -248,21 +251,29 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg=
github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
16 changes: 15 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"net"
"net/http"
"os"
"path/filepath"

Expand All @@ -17,8 +18,10 @@ import (
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/hashicorp/raft"
boltdb "github.com/hashicorp/raft-boltdb/v2"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -31,6 +34,7 @@ var (
raftId = flag.String("raft_id", "", "Node id used by Raft")
raftDir = flag.String("raft_data_dir", "data/", "Raft data dir")
raftBootstrap = flag.Bool("raft_bootstrap", false, "Whether to bootstrap the Raft cluster")
metricsAddr = flag.String("metrics_address", ":2112", "TCP host+port for Prometheus metrics")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The test failure you noted (bind: address already in use) is likely caused by this new metrics server. When tests run in parallel, each instance of the application will try to bind to the same default port :2112, causing a conflict.

To fix this, you should configure tests to use a dynamic port. A common technique is to use ":0" as the address, which tells the OS to pick an available ephemeral port. You'll need to update your test setup to either pass a dynamic address for the -metrics_address flag or have a mechanism to override it during tests.

)

func main() {
Expand Down Expand Up @@ -62,7 +66,10 @@ func main() {
log.Fatalf("failed to start raft: %v", err)
}

gs := grpc.NewServer()
gs := grpc.NewServer(
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
)
trx := kv.NewTransaction(r)
coordinate := kv.NewCoordinator(trx, r)
pb.RegisterRawKVServer(gs, adapter.NewGRPCServer(s, coordinate))
Expand All @@ -73,19 +80,26 @@ func main() {
leaderhealth.Setup(r, gs, []string{"RawKV", "Example"})
raftadmin.Register(gs, r)
reflection.Register(gs)
grpc_prometheus.Register(gs)

redisL, err := lc.Listen(ctx, "tcp", *redisAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

eg := errgroup.Group{}
eg.Go(func() error {
return errors.WithStack(gs.Serve(grpcSock))
})
eg.Go(func() error {
return errors.WithStack(adapter.NewRedisServer(redisL, s, coordinate).Run())
})
eg.Go(func() error {
return errors.WithStack(http.ListenAndServe(*metricsAddr, mux))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
G114: Use of net/http serve function that has no support for setting timeouts (gosec)

})

err = eg.Wait()
if err != nil {
Expand Down
Loading