Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions exthttpcheck/bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type BandwidthCheckState struct {
ReadTimeout time.Duration
FollowRedirects bool
InsecureSkipVerify bool
RequestsPerSecond uint64
MaxConcurrent int
}

Expand Down Expand Up @@ -150,15 +149,15 @@ func (a *httpCheckActionBandwidth) Describe() action_kit_api.ActionDescription {
//------------------------
repetitionControl,
{
Name: "requestsPerSecond",
Label: "Requests per second",
Description: extutil.Ptr("The number of requests per second. Should be between 1 and 10."),
Name: "maxConcurrent",
Label: "Concurrent Requests",
Description: extutil.Ptr("Number of parallel requests to execute simultaneously without delay. More concurrent requests generate more traffic to saturate the available bandwidth."),
Type: action_kit_api.ActionParameterTypeInteger,
DefaultValue: extutil.Ptr("1"),
DefaultValue: extutil.Ptr("5"),
Required: extutil.Ptr(true),
Order: extutil.Ptr(7),
MinValue: extutil.Ptr(1),
MaxValue: extutil.Ptr(10),
MaxValue: extutil.Ptr(50),
},
duration,
separator(9),
Expand Down Expand Up @@ -194,10 +193,6 @@ func (a *httpCheckActionBandwidth) Describe() action_kit_api.ActionDescription {
//------------------------
targetSelectionParameter,
//------------------------
// Additional Settings
//------------------------
maxConcurrent,
//------------------------
// Client Settings
//------------------------
clientSettings,
Expand Down Expand Up @@ -289,7 +284,6 @@ func (a *httpCheckActionBandwidth) Prepare(_ context.Context, state *BandwidthCh
state.ReadTimeout = time.Duration(extutil.ToInt64(request.Config["readTimeout"])) * time.Millisecond
state.FollowRedirects = extutil.ToBool(request.Config["followRedirects"])
state.InsecureSkipVerify = extutil.ToBool(request.Config["insecureSkipVerify"])
state.RequestsPerSecond = extutil.ToUInt64(request.Config["requestsPerSecond"])
state.MaxConcurrent = extutil.ToInt(request.Config["maxConcurrent"])
if state.MaxConcurrent < 1 {
state.MaxConcurrent = 5
Expand Down
47 changes: 5 additions & 42 deletions exthttpcheck/bandwidthChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
)

type bandwidthChecker struct {
// Request scheduling
work chan struct{}
ticker *time.Ticker
tickerDelay time.Duration

// Window aggregation
windowMu sync.Mutex
windowStartTime time.Time
Expand All @@ -46,14 +41,9 @@ type bandwidthChecker struct {
var bandwidthCheckers = sync.Map{}

func newBandwidthChecker(state *BandwidthCheckState) *bandwidthChecker {
delayMs := uint64(1000) / state.RequestsPerSecond
checker := &bandwidthChecker{
work: make(chan struct{}, 10),
tickerDelay: time.Duration(delayMs) * time.Millisecond,
state: state,
return &bandwidthChecker{
state: state,
}

return checker
}

func (c *bandwidthChecker) start() {
Expand All @@ -65,39 +55,16 @@ func (c *bandwidthChecker) start() {
c.windowErrorCount = 0
c.windowMu.Unlock()

// Start request ticker
c.ticker = time.NewTicker(c.tickerDelay)

// Start workers for performing requests
// Start workers that continuously perform requests without delay
for w := 1; w <= c.state.MaxConcurrent; w++ {
go c.performBandwidthRequests()
}

// Schedule first request immediately
log.Debug().Msgf("Schedule first bandwidth request at %v", time.Now())
c.work <- struct{}{}

// Request scheduler goroutine
go func() {
for range c.ticker.C {
if c.stopped.Load() {
return
}
log.Trace().Msgf("Schedule bandwidth request at %v", time.Now())
select {
case c.work <- struct{}{}:
default:
// Work channel full, skip this tick
}
}
}()
log.Debug().Msgf("Started %d bandwidth workers", c.state.MaxConcurrent)
}

func (c *bandwidthChecker) stop() {
c.stopped.Store(true)
if c.ticker != nil {
c.ticker.Stop()
}
}

func (c *bandwidthChecker) performBandwidthRequests() {
Expand All @@ -123,11 +90,7 @@ func (c *bandwidthChecker) performBandwidthRequests() {
}
}

for range c.work {
if c.stopped.Load() {
break
}

for !c.stopped.Load() {
req, err := http.NewRequest("GET", c.state.URL.String(), nil)
if err != nil {
log.Error().Err(err).Msg("Failed to create bandwidth request")
Expand Down
6 changes: 2 additions & 4 deletions exthttpcheck/bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func TestBandwidthCheckAction_Prepare_Success(t *testing.T) {
"connectTimeout": 5000,
"readTimeout": 5000,
"followRedirects": true,
"requestsPerSecond": 1,
"headers": []interface{}{},
},
ExecutionId: execID,
Expand Down Expand Up @@ -177,7 +176,7 @@ func TestBandwidthCheckAction_FullCycle(t *testing.T) {
"connectTimeout": 5000,
"readTimeout": 5000,
"followRedirects": true,
"requestsPerSecond": 5,
"maxConcurrent": 10,
"headers": []interface{}{},
},
ExecutionId: execID,
Expand Down Expand Up @@ -252,7 +251,6 @@ func TestBandwidthCheckAction_BigFileDownload(t *testing.T) {
"connectTimeout": 10000,
"readTimeout": 10000,
"followRedirects": true,
"requestsPerSecond": 1,
"maxConcurrent": 1,
"headers": []interface{}{},
},
Expand Down Expand Up @@ -342,7 +340,7 @@ func TestBandwidthCheckAction_NonSuccessStatusCode(t *testing.T) {
"connectTimeout": 5000,
"readTimeout": 5000,
"followRedirects": true,
"requestsPerSecond": 2,
"maxConcurrent": 2,
"headers": []interface{}{},
},
ExecutionId: execID,
Expand Down
Loading