diff --git a/collector/receiver/telemetryapireceiver/README.md b/collector/receiver/telemetryapireceiver/README.md index c412c8f4f7..f31974f126 100644 --- a/collector/receiver/telemetryapireceiver/README.md +++ b/collector/receiver/telemetryapireceiver/README.md @@ -17,7 +17,7 @@ Supported events: | Field | Default | Description | |---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------| -| `port` | 4325 | HTTP server port to receive Telemetry API data. | +| `port` | 0 (dynamically determined by OS) | HTTP server port to receive Telemetry API data. | | `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to | diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index 3f4f4f177d..9a68951cd1 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -27,7 +27,7 @@ import ( const ( typeStr = "telemetryapi" stability = component.StabilityLevelDevelopment - defaultPort = 4325 + defaultPort = 0 platform = "platform" function = "function" extension = "extension" diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index e3b932723b..eff197782f 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -18,8 +18,10 @@ import ( "context" crand "crypto/rand" "encoding/json" + "errors" "fmt" "io" + "net" "net/http" "os" "strconv" @@ -84,29 +86,55 @@ type telemetryAPIReceiver struct { logReport bool } +func (r *telemetryAPIReceiver) bindListener() (net.Listener, string, error) { + listenerAddr := listenOnAddress() + l, err := net.Listen("tcp", listenerAddr+":"+strconv.Itoa(r.port)) + if err != nil { + return nil, "", err + } + addr := fmt.Sprintf("%s:%d", listenerAddr, l.Addr().(*net.TCPAddr).Port) + return l, addr, nil +} + func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { - address := listenOnAddress(r.port) - r.logger.Info("Listening for requests", zap.String("address", address)) + if len(r.types) == 0 { + return fmt.Errorf("no telemetry event types provided") + } + + listener, address, err := r.bindListener() + if err != nil { + return fmt.Errorf("failed to find available port: %w", err) + } + r.logger.Info("Starting telemetry API listener", zap.String("address", address)) mux := http.NewServeMux() mux.HandleFunc("/", r.httpHandler) r.httpServer = &http.Server{Addr: address, Handler: mux} go func() { - _ = r.httpServer.ListenAndServe() + err := r.httpServer.Serve(listener) + if !errors.Is(err, http.ErrServerClosed) { + r.logger.Error("Unexpected stop on HTTP Server", zap.Error(err)) + } else { + r.logger.Info("HTTP server closed", zap.Error(err)) + } }() telemetryClient := telemetryapi.NewClient(r.logger) - if len(r.types) > 0 { - _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)) - if err != nil { - r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID)) - return err - } + if _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)); err != nil { + r.logger.Error("Failed to subscribe to telemetry", zap.Error(err)) + _ = r.Shutdown(ctx) + return err } + r.logger.Info("Successfully subscribed to telemetry", zap.String("address", address)) return nil } func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error { + if r.httpServer != nil { + if err := r.httpServer.Shutdown(ctx); err != nil { + return err + } + } return nil } @@ -727,14 +755,10 @@ func newTelemetryAPIReceiver( }, nil } -func listenOnAddress(port int) string { +func listenOnAddress() string { envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") - var addr string if ok && envAwsLocal == "true" { - addr = ":" + strconv.Itoa(port) - } else { - addr = "sandbox.localdomain:" + strconv.Itoa(port) + return "" } - - return addr + return "sandbox.localdomain" } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 39cca22312..fd9a7cd49c 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -38,16 +38,16 @@ func TestListenOnAddress(t *testing.T) { { desc: "listen on address without AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { - addr := listenOnAddress(4325) - require.EqualValues(t, "sandbox.localdomain:4325", addr) + addr := listenOnAddress() + require.EqualValues(t, "sandbox.localdomain", addr) }, }, { desc: "listen on address with AWS_SAM_LOCAL env variable", testFunc: func(t *testing.T) { t.Setenv("AWS_SAM_LOCAL", "true") - addr := listenOnAddress(4325) - require.EqualValues(t, ":4325", addr) + addr := listenOnAddress() + require.EqualValues(t, "", addr) }, }, } @@ -56,6 +56,31 @@ func TestListenOnAddress(t *testing.T) { } } +func TestBindListener(t *testing.T) { + t.Setenv("AWS_SAM_LOCAL", "true") + + t.Run("dynamic port allocation", func(t *testing.T) { + r, err := newTelemetryAPIReceiver(&Config{Port: 0}, receivertest.NewNopSettings(Type)) + require.NoError(t, err) + listener, addr, err := r.bindListener() + require.NoError(t, err) + require.NotEmpty(t, addr) + require.NotNil(t, listener) + t.Cleanup(func() { require.NoError(t, listener.Close()) }) + require.Contains(t, addr, ":") + }) + + t.Run("specific port", func(t *testing.T) { + r, err := newTelemetryAPIReceiver(&Config{Port: 4325}, receivertest.NewNopSettings(Type)) + require.NoError(t, err) + listener, addr, err := r.bindListener() + require.NoError(t, err) + require.NotNil(t, listener) + t.Cleanup(func() { require.NoError(t, listener.Close()) }) + require.Contains(t, addr, ":4325") + }) +} + type mockConsumer struct { consumed int }