Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions bdd/go/tests/basic_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ func (s basicMessagingSteps) givenAuthenticationAsRoot(ctx context.Context) erro
if err != nil {
return fmt.Errorf("error creating client: %w", err)
}
if err = cli.Ping(); err != nil {
if err = cli.Ping(ctx); err != nil {
return fmt.Errorf("error pinging client: %w", err)
}

if _, err = cli.LoginUser("iggy", "iggy"); err != nil {
if _, err = cli.LoginUser(ctx, "iggy", "iggy"); err != nil {
return fmt.Errorf("error logging in: %v", err)
}

Expand All @@ -99,7 +99,7 @@ func (s basicMessagingSteps) whenSendMessages(
streamIdentifier, _ := iggcon.NewIdentifier(streamID)
topicIdentifier, _ := iggcon.NewIdentifier(topicID)
partitioning := iggcon.PartitionId(partitionID)
if err = c.client.SendMessages(streamIdentifier, topicIdentifier, partitioning, messages); err != nil {
if err = c.client.SendMessages(ctx, streamIdentifier, topicIdentifier, partitioning, messages); err != nil {
return fmt.Errorf("failed to sending messages: %w", err)
}

Expand Down Expand Up @@ -133,6 +133,7 @@ func (s basicMessagingSteps) whenPollMessages(
topicIdentifier, _ := iggcon.NewIdentifier(topicID)
uint32PartitionID := partitionID
polledMessages, err := c.client.PollMessages(
ctx,
streamIdentifier,
topicIdentifier,
consumer,
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex

func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {
client := getBasicMessagingCtx(ctx).client
streams, err := client.GetStreams()
streams, err := client.GetStreams(ctx)
if err != nil {
return fmt.Errorf("failed to get streams: %w", err)
}
Expand All @@ -225,7 +226,7 @@ func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {

func (s basicMessagingSteps) whenCreateStream(ctx context.Context, streamName string) error {
c := getBasicMessagingCtx(ctx)
stream, err := c.client.CreateStream(streamName)
stream, err := c.client.CreateStream(ctx, streamName)
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}
Expand Down Expand Up @@ -257,6 +258,7 @@ func (s basicMessagingSteps) whenCreateTopic(ctx context.Context,
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(streamID)
topic, err := c.client.CreateTopic(
ctx,
streamIdentifier,
topicName,
partitionsCount,
Expand Down
32 changes: 16 additions & 16 deletions bdd/go/tests/leader_redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func serverTypeFromPort(port uint16) string {
}
}

func verifyLeaderInMetadata(client iggcon.Client) (*iggcon.ClusterNode, error) {
metadata, err := client.GetClusterMetadata()
func verifyLeaderInMetadata(ctx context.Context, client iggcon.Client) (*iggcon.ClusterNode, error) {
metadata, err := client.GetClusterMetadata(ctx)
if err != nil {
if isClusteringUnavailable(err) {
// Clustering not enabled, this is OK
Expand All @@ -142,7 +142,7 @@ func verifyLeaderInMetadata(client iggcon.Client) (*iggcon.ClusterNode, error) {
return nil, nil
}

func verifyClientConnection(client iggcon.Client, expectedPort uint16) (string, error) {
func verifyClientConnection(ctx context.Context, client iggcon.Client, expectedPort uint16) (string, error) {
connInfo := client.GetConnectionInfo()

expectedSuffix := fmt.Sprintf(":%d", expectedPort)
Expand All @@ -155,7 +155,7 @@ func verifyClientConnection(client iggcon.Client, expectedPort uint16) (string,
}

// Verify client can communicate
if err := client.Ping(); err != nil {
if err := client.Ping(ctx); err != nil {
return "", fmt.Errorf("client cannot ping server: %w", err)
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func (s leaderSteps) whenAuthenticateRoot(ctx context.Context) error {
if !ok {
return fmt.Errorf("client %s should be created", name)
}
if _, err := cli.LoginUser(defaultRootUsername, defaultRootPassword); err != nil {
if _, err := cli.LoginUser(ctx, defaultRootUsername, defaultRootPassword); err != nil {
return err
}
// Small delay between multiple authentications to avoid race conditions
Expand All @@ -342,7 +342,7 @@ func (s leaderSteps) whenCreateStream(ctx context.Context, streamName string) er
return errors.New("client should be available")
}

stream, err := cli.CreateStream(streamName)
stream, err := cli.CreateStream(ctx, streamName)
if err != nil {
return err
}
Expand All @@ -368,10 +368,10 @@ func (s leaderSteps) thenVerifyClientPort(ctx context.Context, expectedPort uint
return errors.New("client should exist")
}

if _, err := verifyClientConnection(cli, expectedPort); err != nil {
if _, err := verifyClientConnection(ctx, cli, expectedPort); err != nil {
return err
}
leader, err := verifyLeaderInMetadata(cli)
leader, err := verifyLeaderInMetadata(ctx, cli)
if err != nil {
return err
}
Expand All @@ -390,10 +390,10 @@ func (s leaderSteps) thenVerifyNamedClientPort(ctx context.Context, clientName s
return fmt.Errorf("client %s should exist", clientName)
}

if _, err := verifyClientConnection(cli, port); err != nil {
if _, err := verifyClientConnection(ctx, cli, port); err != nil {
return fmt.Errorf("client %s connection should succeed: %w", clientName, err)
}
leader, err := verifyLeaderInMetadata(cli)
leader, err := verifyLeaderInMetadata(ctx, cli)
if err != nil {
return err
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s leaderSteps) thenConnectionRemains(ctx context.Context, port uint16) err
return errors.New("client should exist")
}

if _, err := verifyClientConnection(cli, port); err != nil {
if _, err := verifyClientConnection(ctx, cli, port); err != nil {
return fmt.Errorf("should remain on original port: %w", err)
}
if c.TestState.RedirectionOccurred {
Expand All @@ -437,7 +437,7 @@ func (s leaderSteps) thenConnectWithoutRedirection(ctx context.Context) error {
if !ok {
return errors.New("client should exist")
}
if err := cli.Ping(); err != nil {
if err := cli.Ping(ctx); err != nil {
return fmt.Errorf("client should be able to ping server: %v", err)
}
if c.TestState.RedirectionOccurred {
Expand All @@ -460,18 +460,18 @@ func (s leaderSteps) thenBothUseSameServer(ctx context.Context) error {
if a.GetConnectionInfo().ServerAddress != b.GetConnectionInfo().ServerAddress {
return errors.New("both clients should be connected to the same server")
}
if err := a.Ping(); err != nil {
if err := a.Ping(ctx); err != nil {
return errors.New("client A should be able to ping")
}
if err := b.Ping(); err != nil {
if err := b.Ping(ctx); err != nil {
return errors.New("client B should be able to ping")
}

leaderA, err := verifyLeaderInMetadata(a)
leaderA, err := verifyLeaderInMetadata(ctx, a)
if err != nil {
return err
}
leaderB, err := verifyLeaderInMetadata(b)
leaderB, err := verifyLeaderInMetadata(ctx, b)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions bdd/go/tests/tcp_test/client_feature_get_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package tcp_test

import (
"context"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
Expand All @@ -26,7 +28,7 @@ var _ = ginkgo.Describe("GET ALL CLIENT FEATURE:", func() {
ginkgo.When("user is logged in", func() {
ginkgo.Context("and tries to log with correct data", func() {
client := createAuthorizedConnection()
clients, err := client.GetClients()
clients, err := client.GetClients(context.Background())

itShouldNotReturnError(err)
ginkgo.It("should return stats", func() {
Expand All @@ -42,7 +44,7 @@ var _ = ginkgo.Describe("GET ALL CLIENT FEATURE:", func() {
ginkgo.When("user is not logged in", func() {
ginkgo.Context("and tries get all clients", func() {
client := createClient()
clients, err := client.GetClients()
clients, err := client.GetClients(context.Background())

itShouldReturnUnauthenticatedError(err)
ginkgo.It("should not return clients", func() {
Expand Down
8 changes: 8 additions & 0 deletions bdd/go/tests/tcp_test/consumers_feature_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package tcp_test

import (
"context"

iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/onsi/ginkgo/v2"
Expand All @@ -35,6 +37,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
name := createRandomString(16)
group, err := client.CreateConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
name,
Expand All @@ -47,6 +50,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to create consumer group for a non existing stream", func() {
client := createAuthorizedConnection()
_, err := client.CreateConsumerGroup(
context.Background(),
randomU32Identifier(),
randomU32Identifier(),
createRandomString(16),
Expand All @@ -61,6 +65,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
defer deleteStreamAfterTests(streamId, client)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.CreateConsumerGroup(
context.Background(),
streamIdentifier,
randomU32Identifier(),
createRandomString(16),
Expand All @@ -79,6 +84,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
_, err := client.CreateConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
name,
Expand All @@ -95,6 +101,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
_, err := client.CreateConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
createRandomString(256),
Expand All @@ -108,6 +115,7 @@ var _ = ginkgo.Describe("CREATE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to create consumer group", func() {
client := createClient()
_, err := client.CreateConsumerGroup(
context.Background(),
randomU32Identifier(),
randomU32Identifier(),
createRandomString(16),
Expand Down
7 changes: 7 additions & 0 deletions bdd/go/tests/tcp_test/consumers_feature_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package tcp_test

import (
"context"

iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/onsi/ginkgo/v2"
Expand All @@ -37,6 +39,7 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
groupIdentifier, _ := iggcon.NewIdentifier(groupId)
err := client.DeleteConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
groupIdentifier,
Expand All @@ -55,6 +58,7 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.DeleteConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
randomU32Identifier(),
Expand All @@ -70,6 +74,7 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {

streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.DeleteConsumerGroup(
context.Background(),
streamIdentifier,
randomU32Identifier(),
randomU32Identifier(),
Expand All @@ -81,6 +86,7 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to delete consumer for non-existing topic and stream", func() {
client := createAuthorizedConnection()
err := client.DeleteConsumerGroup(
context.Background(),
randomU32Identifier(),
randomU32Identifier(),
randomU32Identifier(),
Expand All @@ -94,6 +100,7 @@ var _ = ginkgo.Describe("DELETE CONSUMER GROUP:", func() {
ginkgo.Context("and tries to delete consumer group", func() {
client := createClient()
err := client.DeleteConsumerGroup(
context.Background(),
randomU32Identifier(),
randomU32Identifier(),
randomU32Identifier(),
Expand Down
6 changes: 4 additions & 2 deletions bdd/go/tests/tcp_test/consumers_feature_get_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package tcp_test

import (
"context"

iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/onsi/ginkgo/v2"
)
Expand All @@ -33,7 +35,7 @@ var _ = ginkgo.Describe("GET ALL CONSUMER GROUPS:", func() {
groupId, name := successfullyCreateConsumer(streamId, topicId, client)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
groups, err := client.GetConsumerGroups(streamIdentifier, topicIdentifier)
groups, err := client.GetConsumerGroups(context.Background(), streamIdentifier, topicIdentifier)

itShouldNotReturnError(err)
itShouldContainSpecificConsumer(groupId, name, groups)
Expand All @@ -43,7 +45,7 @@ var _ = ginkgo.Describe("GET ALL CONSUMER GROUPS:", func() {
ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to get all consumer groups", func() {
client := createClient()
_, err := client.GetConsumerGroups(randomU32Identifier(), randomU32Identifier())
_, err := client.GetConsumerGroups(context.Background(), randomU32Identifier(), randomU32Identifier())

itShouldReturnUnauthenticatedError(err)
})
Expand Down
7 changes: 6 additions & 1 deletion bdd/go/tests/tcp_test/consumers_feature_get_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package tcp_test

import (
"context"

iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/onsi/ginkgo/v2"
Expand All @@ -35,7 +37,7 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
groupIdentifier, _ := iggcon.NewIdentifier(groupId)
group, err := client.GetConsumerGroup(streamIdentifier, topicIdentifier, groupIdentifier)
group, err := client.GetConsumerGroup(context.Background(), streamIdentifier, topicIdentifier, groupIdentifier)

itShouldNotReturnError(err)
itShouldReturnSpecificConsumer(groupId, name, &group.ConsumerGroup)
Expand All @@ -45,6 +47,7 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
client := createAuthorizedConnection()

_, err := client.GetConsumerGroup(
context.Background(),
randomU32Identifier(),
randomU32Identifier(),
randomU32Identifier(),
Expand All @@ -59,6 +62,7 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
defer deleteStreamAfterTests(streamId, client)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
_, err := client.GetConsumerGroup(
context.Background(),
streamIdentifier,
randomU32Identifier(),
randomU32Identifier(),
Expand All @@ -75,6 +79,7 @@ var _ = ginkgo.Describe("GET CONSUMER GROUP BY ID:", func() {
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
_, err := client.GetConsumerGroup(
context.Background(),
streamIdentifier,
topicIdentifier,
randomU32Identifier(),
Expand Down
Loading
Loading