From dffe95406a41c8b814697fea0035b0166290a810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kosch=C3=BCtzki?= Date: Fri, 2 May 2025 15:07:58 +0200 Subject: [PATCH 1/4] feat: run instance creation in parallel --- provider.go | 151 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 64 deletions(-) diff --git a/provider.go b/provider.go index 2ee76f6..8644f54 100644 --- a/provider.go +++ b/provider.go @@ -14,6 +14,7 @@ import ( "path" "regexp" "strings" + "sync" cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v5" hclog "github.com/hashicorp/go-hclog" @@ -287,8 +288,10 @@ func (g *InstanceGroup) Increase( ctx context.Context, delta int, ) (succeeded int, err error) { - servers := make([]*cloudscale.Server, 0, delta) - errs := make([]error, 0) + // precreating the slices with the right lenght, so we can later + // concurrently write to them + errs := make([]error, delta) + servers := make([]*cloudscale.Server, delta) publicKey, err := g.publicKey() if err != nil { @@ -297,44 +300,52 @@ func (g *InstanceGroup) Increase( tagMap := g.tagMap() - for i := 0; i < delta; i++ { - serverName := g.serverName() - - g.log.Info("creating server", "name", serverName, "flavor", g.Flavor) - server, err := g.client.Servers.Create(ctx, &cloudscale.ServerRequest{ - Name: serverName, - Zone: g.Zone, - Flavor: g.Flavor, - Image: g.Image, - SSHKeys: []string{string(publicKey)}, - VolumeSizeGB: g.VolumeSizeGB, - UserData: g.UserData, - TaggedResourceRequest: cloudscale.TaggedResourceRequest{ - Tags: &tagMap, - }, - }) - - if err != nil { - errs = append(errs, fmt.Errorf( - "failed to create %s: %w", serverName, err)) - continue - } - - server, err = g.client.Servers.WaitFor( - ctx, server.UUID, cloudscale.ServerIsRunning) + var wg sync.WaitGroup - if err != nil { - errs = append(errs, fmt.Errorf( - "failed to wait for %s: %w", serverName, err)) - continue - } - - g.log.Info("created server", "name", serverName, "uuid", server.UUID) - servers = append(servers, server) + for i := 0; i < delta; i++ { + // We run all iterations of this loop in parallel using goroutines + // the waitgroup (wg) enables us to wait later on for them to complete + wg.Add(1) + go func() { + defer wg.Done() + serverName := g.serverName() + + g.log.Info("creating server", "name", serverName, "flavor", g.Flavor) + server, err := g.client.Servers.Create(ctx, &cloudscale.ServerRequest{ + Name: serverName, + Zone: g.Zone, + Flavor: g.Flavor, + Image: g.Image, + SSHKeys: []string{string(publicKey)}, + VolumeSizeGB: g.VolumeSizeGB, + UserData: g.UserData, + TaggedResourceRequest: cloudscale.TaggedResourceRequest{ + Tags: &tagMap, + }, + }) + + if err != nil { + errs[i] = fmt.Errorf("failed to create %s: %w", serverName, err) + return + } + + server, err = g.client.Servers.WaitFor( + ctx, server.UUID, cloudscale.ServerIsRunning) + + if err != nil { + errs[i] = fmt.Errorf( + "failed to wait for %s: %w", serverName, err) + return + } + + g.log.Info("created server", "name", serverName, "uuid", server.UUID) + servers[i] = server + }() } + // We wait for all previously started goroutines to complete before we exit + wg.Wait() return len(servers), errors.Join(errs...) - } // Decrease removes the specified instances from the instance group. It @@ -344,36 +355,48 @@ func (g *InstanceGroup) Decrease( ids []string, ) (succeeded []string, err error) { - errs := make([]error, 0) - - for _, id := range ids { - - // Before we delete a server, we assert that we can do so. We do this - // by double-checking our assumptions. If there is any bug elsewhere, - // we want to be sure to not delete servers the user cares about. - server, err := g.client.Servers.Get(ctx, id) - if err != nil { - errs = append(errs, fmt.Errorf( - "failed to fetch server before deleting %s: %w", id, err)) - continue - } - - if err := g.ensureSafeToDelete(server); err != nil { - errs = append(errs, fmt.Errorf( - "prevented from deleting server %s: %w", id, err)) - continue - } - - g.log.Info("deleting server", "uuid", id) - err = g.client.Servers.Delete(ctx, id) - if err != nil { - errs = append(errs, fmt.Errorf( - "failed to delete server %s: %w", id, err)) - continue - } - succeeded = append(succeeded, id) + // precreating the slices with the right lenght, so we can later + // concurrently write to them + errs := make([]error, len(ids)) + succeeded = make([]string, len(ids)) + var wg sync.WaitGroup + + for idx, id := range ids { + // We run all iterations of this loop in parallel using goroutines + // the waitgroup (wg) enables us to wait later on for them to complete + wg.Add(1) + go func() { + defer wg.Done() + + // Before we delete a server, we assert that we can do so. We do this + // by double-checking our assumptions. If there is any bug elsewhere, + // we want to be sure to not delete servers the user cares about. + server, err := g.client.Servers.Get(ctx, id) + if err != nil { + errs[idx] = fmt.Errorf("failed to fetch server before deleting %s: %w", id, err) + return + } + + if err := g.ensureSafeToDelete(server); err != nil { + errs[idx] = fmt.Errorf( + "prevented from deleting server %s: %w", id, err) + return + } + + g.log.Info("deleting server", "uuid", id) + err = g.client.Servers.Delete(ctx, id) + if err != nil { + errs[idx] = fmt.Errorf( + "failed to delete server %s: %w", id, err) + return + } + succeeded[idx] = id + }() } + // We wait for all previously started goroutines to complete before we exit + wg.Wait() + return succeeded, errors.Join(errs...) } From 525df388230071418211a13cb9313c9b4e702a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kosch=C3=BCtzki?= Date: Tue, 6 May 2025 12:05:16 +0200 Subject: [PATCH 2/4] revert: dont use concurency for instance deletion --- provider.go | 60 +++++++++++++++++++++-------------------------------- 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/provider.go b/provider.go index 8644f54..3e5c53b 100644 --- a/provider.go +++ b/provider.go @@ -355,48 +355,36 @@ func (g *InstanceGroup) Decrease( ids []string, ) (succeeded []string, err error) { - // precreating the slices with the right lenght, so we can later - // concurrently write to them - errs := make([]error, len(ids)) - succeeded = make([]string, len(ids)) - var wg sync.WaitGroup + errs := make([]error, 0) - for idx, id := range ids { - // We run all iterations of this loop in parallel using goroutines - // the waitgroup (wg) enables us to wait later on for them to complete - wg.Add(1) - go func() { - defer wg.Done() + for _, id := range ids { - // Before we delete a server, we assert that we can do so. We do this - // by double-checking our assumptions. If there is any bug elsewhere, - // we want to be sure to not delete servers the user cares about. - server, err := g.client.Servers.Get(ctx, id) - if err != nil { - errs[idx] = fmt.Errorf("failed to fetch server before deleting %s: %w", id, err) - return - } + // Before we delete a server, we assert that we can do so. We do this + // by double-checking our assumptions. If there is any bug elsewhere, + // we want to be sure to not delete servers the user cares about. + server, err := g.client.Servers.Get(ctx, id) + if err != nil { + errs = append(errs, fmt.Errorf( + "failed to fetch server before deleting %s: %w", id, err)) + continue + } - if err := g.ensureSafeToDelete(server); err != nil { - errs[idx] = fmt.Errorf( - "prevented from deleting server %s: %w", id, err) - return - } + if err := g.ensureSafeToDelete(server); err != nil { + errs = append(errs, fmt.Errorf( + "prevented from deleting server %s: %w", id, err)) + continue + } - g.log.Info("deleting server", "uuid", id) - err = g.client.Servers.Delete(ctx, id) - if err != nil { - errs[idx] = fmt.Errorf( - "failed to delete server %s: %w", id, err) - return - } - succeeded[idx] = id - }() + g.log.Info("deleting server", "uuid", id) + err = g.client.Servers.Delete(ctx, id) + if err != nil { + errs = append(errs, fmt.Errorf( + "failed to delete server %s: %w", id, err)) + continue + } + succeeded = append(succeeded, id) } - // We wait for all previously started goroutines to complete before we exit - wg.Wait() - return succeeded, errors.Join(errs...) } From 93d3efd9c97e55d438d1111e8e19d795e8c335de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kosch=C3=BCtzki?= Date: Fri, 9 May 2025 08:48:11 +0200 Subject: [PATCH 3/4] fix: create instances in serial, wait in parallel --- provider.go | 106 +++++++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/provider.go b/provider.go index 3e5c53b..8717bb0 100644 --- a/provider.go +++ b/provider.go @@ -288,11 +288,6 @@ func (g *InstanceGroup) Increase( ctx context.Context, delta int, ) (succeeded int, err error) { - // precreating the slices with the right lenght, so we can later - // concurrently write to them - errs := make([]error, delta) - servers := make([]*cloudscale.Server, delta) - publicKey, err := g.publicKey() if err != nil { return 0, err @@ -300,51 +295,71 @@ func (g *InstanceGroup) Increase( tagMap := g.tagMap() + // Channel for server uuids + servers := make(chan string) + + var errs_mutext sync.Mutex + errs := make([]error, delta) + + // Await all servers being ready var wg sync.WaitGroup + wg.Add(1) + go func(servers <-chan string) { + for uuid := range servers { + wg.Add(1) + go func(uuid string) { + defer wg.Done() + server, err := g.client.Servers.WaitFor( + ctx, + uuid, + cloudscale.ServerIsRunning, + ) + if err != nil { + errs_mutext.Lock() + defer errs_mutext.Unlock() + errs = append(errs, + fmt.Errorf("failed to wait for %s: %w", server.Name, err)) + return + } + g.log.Info("created server", "name", server.Name, "uuid", server.UUID) + }(uuid) + } + wg.Done() + }(servers) + // Create the servers for i := 0; i < delta; i++ { - // We run all iterations of this loop in parallel using goroutines - // the waitgroup (wg) enables us to wait later on for them to complete - wg.Add(1) - go func() { - defer wg.Done() - serverName := g.serverName() - - g.log.Info("creating server", "name", serverName, "flavor", g.Flavor) - server, err := g.client.Servers.Create(ctx, &cloudscale.ServerRequest{ - Name: serverName, - Zone: g.Zone, - Flavor: g.Flavor, - Image: g.Image, - SSHKeys: []string{string(publicKey)}, - VolumeSizeGB: g.VolumeSizeGB, - UserData: g.UserData, - TaggedResourceRequest: cloudscale.TaggedResourceRequest{ - Tags: &tagMap, - }, - }) - - if err != nil { - errs[i] = fmt.Errorf("failed to create %s: %w", serverName, err) - return - } - - server, err = g.client.Servers.WaitFor( - ctx, server.UUID, cloudscale.ServerIsRunning) - - if err != nil { - errs[i] = fmt.Errorf( - "failed to wait for %s: %w", serverName, err) - return - } - - g.log.Info("created server", "name", serverName, "uuid", server.UUID) - servers[i] = server - }() + serverName := g.serverName() + + g.log.Info("creating server", "name", serverName, "flavor", g.Flavor) + server, err := g.client.Servers.Create(ctx, &cloudscale.ServerRequest{ + Name: serverName, + Zone: g.Zone, + Flavor: g.Flavor, + Image: g.Image, + SSHKeys: []string{string(publicKey)}, + VolumeSizeGB: g.VolumeSizeGB, + UserData: g.UserData, + TaggedResourceRequest: cloudscale.TaggedResourceRequest{ + Tags: &tagMap, + }, + }) + + if err != nil { + errs_mutext.Lock() + defer errs_mutext.Unlock() + errs = append(errs, + fmt.Errorf("failed to create %s: %w", serverName, err)) + return + } + + // Send the uuid to the "wait" goroutine + servers <- server.UUID } - // We wait for all previously started goroutines to complete before we exit + close(servers) + + // We wait for all previously started goroutines to complete before we return wg.Wait() - return len(servers), errors.Join(errs...) } @@ -354,7 +369,6 @@ func (g *InstanceGroup) Decrease( ctx context.Context, ids []string, ) (succeeded []string, err error) { - errs := make([]error, 0) for _, id := range ids { From 6d34e38fde88dcedb9a8dbd6518a21ee8e9f4b69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kosch=C3=BCtzki?= Date: Fri, 9 May 2025 08:45:09 +0200 Subject: [PATCH 4/4] fix: Use mutex instead of channel magic --- provider.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/provider.go b/provider.go index 8717bb0..63d5391 100644 --- a/provider.go +++ b/provider.go @@ -297,8 +297,9 @@ func (g *InstanceGroup) Increase( // Channel for server uuids servers := make(chan string) + var server_counter int // needed for returning the number of successfull instances - var errs_mutext sync.Mutex + var mutex sync.Mutex errs := make([]error, delta) // Await all servers being ready @@ -315,12 +316,17 @@ func (g *InstanceGroup) Increase( cloudscale.ServerIsRunning, ) if err != nil { - errs_mutext.Lock() - defer errs_mutext.Unlock() + mutex.Lock() + defer mutex.Unlock() errs = append(errs, fmt.Errorf("failed to wait for %s: %w", server.Name, err)) return } + + mutex.Lock() + defer mutex.Unlock() + server_counter += 1 + g.log.Info("created server", "name", server.Name, "uuid", server.UUID) }(uuid) } @@ -346,11 +352,11 @@ func (g *InstanceGroup) Increase( }) if err != nil { - errs_mutext.Lock() - defer errs_mutext.Unlock() + mutex.Lock() + defer mutex.Unlock() errs = append(errs, fmt.Errorf("failed to create %s: %w", serverName, err)) - return + continue } // Send the uuid to the "wait" goroutine @@ -360,7 +366,7 @@ func (g *InstanceGroup) Increase( // We wait for all previously started goroutines to complete before we return wg.Wait() - return len(servers), errors.Join(errs...) + return server_counter, errors.Join(errs...) } // Decrease removes the specified instances from the instance group. It