Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cli/running/base_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -18,6 +19,8 @@ import (
type baseInstance struct {
// processController is a child process controller.
*processController
// mu is a mutex used to protect concurrent access to the processController.
mu sync.Mutex
// logger represents an active logging object.
logger ttlog.Logger
// tarantoolPath describes the path to the tarantool binary
Expand Down Expand Up @@ -56,7 +59,7 @@ type baseInstance struct {

func newBaseInstance(tarantoolPath string, instanceCtx InstanceCtx,
opts ...InstanceOption,
) baseInstance {
) *baseInstance {
baseInst := baseInstance{
tarantoolPath: tarantoolPath,
appPath: instanceCtx.InstanceScript,
Expand All @@ -75,7 +78,7 @@ func newBaseInstance(tarantoolPath string, instanceCtx InstanceCtx,
for _, opt := range opts {
opt(&baseInst)
}
return baseInst
return &baseInst
}

// InstanceOption is a functional option to configure tarantool instance.
Expand Down Expand Up @@ -126,6 +129,9 @@ func (inst *baseInstance) Wait() error {

// SendSignal sends a signal to tarantool instance.
func (inst *baseInstance) SendSignal(sig os.Signal) error {
inst.mu.Lock()
defer inst.mu.Unlock()

if inst.processController == nil {
return fmt.Errorf("instance is not started")
}
Expand All @@ -134,6 +140,9 @@ func (inst *baseInstance) SendSignal(sig os.Signal) error {

// IsAlive verifies that the instance is alive by sending a "0" signal.
func (inst *baseInstance) IsAlive() bool {
inst.mu.Lock()
defer inst.mu.Unlock()

if inst.processController == nil {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion cli/running/cluster_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// clusterInstance describes tarantool 3 instance running using cluster config.
type clusterInstance struct {
baseInstance
*baseInstance
// clusterConfigPath is a path of the cluster config.
clusterConfigPath string
runDir string
Expand Down
11 changes: 6 additions & 5 deletions cli/running/process_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
)
Expand All @@ -27,7 +28,7 @@ type processController struct {
// https://github.com/golang/go/issues/28461
waitMutex sync.Mutex
// done represent whether the process was stopped.
done bool
done atomic.Bool
}

// start starts the process.
Expand All @@ -36,13 +37,13 @@ func (pc *processController) start() error {
if err := pc.Start(); err != nil {
return err
}
pc.done = false
pc.done.Store(false)
return nil
}

// Wait waits for the process to complete.
func (pc *processController) Wait() error {
if pc.done {
if pc.done.Load() {
return nil
}
// waitMutex is used to prevent several invokes of the "Wait"
Expand All @@ -52,7 +53,7 @@ func (pc *processController) Wait() error {
defer pc.waitMutex.Unlock()
err := pc.Cmd.Wait()
if err == nil {
pc.done = true
pc.done.Store(true)
}
return err
}
Expand All @@ -67,7 +68,7 @@ func (pc *processController) SendSignal(sig os.Signal) error {

// IsAlive verifies that the Instance is alive by sending a "0" signal.
func (pc *processController) IsAlive() bool {
if pc.done {
if pc.done.Load() {
return false
}

Expand Down
2 changes: 1 addition & 1 deletion cli/running/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func Quit(run InstanceCtx) error {
}

func Run(runInfo *RunInfo) error {
inst := scriptInstance{baseInstance: baseInstance{
inst := scriptInstance{baseInstance: &baseInstance{
tarantoolPath: runInfo.CmdCtx.Cli.TarantoolCli.Executable,
integrityCtx: runInfo.CmdCtx.Integrity,
}}
Expand Down
5 changes: 4 additions & 1 deletion cli/running/script_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (

// scriptInstance represents a tarantool invoked with an instance script provided.
type scriptInstance struct {
baseInstance
*baseInstance
}

//go:embed lua/launcher.lua
Expand Down Expand Up @@ -170,9 +170,12 @@ func (inst *scriptInstance) Start(ctx context.Context) error {
inst.setTarantoolLog(cmd)

// Start an Instance.
inst.mu.Lock()
Copy link
Copy Markdown
Contributor

@psergee psergee Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach raises questions. We are protecting only a single member of inst, even though the cluster instance employs the exact same process controller initialization code without any locking. Is this a test-specific fix for race conditions, rather than a comprehensive thread‑safety mechanism for the instances API?

defer inst.mu.Unlock()
if inst.processController, err = newProcessController(cmd); err != nil {
return err
}

StdinPipe.Write([]byte(instanceLauncher))
StdinPipe.Close()

Expand Down
16 changes: 8 additions & 8 deletions lib/watchdog/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Watchdog struct {
processGroupPID atomic.Int32
// startupComplete signals successful child process start.
startupComplete chan struct{}
// startupEnded indicates whether startup has completed.
startupEnded atomic.Bool
}

// NewWatchdog initializes a new Watchdog instance with the specified
Expand All @@ -54,6 +56,7 @@ func NewWatchdog(pidFile, wdPidFile string, restartTimeout time.Duration) *Watch
restartTimeout: restartTimeout,
signalChan: make(chan os.Signal, 1),
startupComplete: make(chan struct{}),
startupEnded: atomic.Bool{},
}
}

Expand Down Expand Up @@ -119,6 +122,7 @@ func (wd *Watchdog) Start(bin string, args ...string) error {
}

log.Infof("Process started successfully")
wd.startupEnded.Store(true)
close(wd.startupComplete) // Signal that startup is complete.

// Wait for process completion in separate goroutine.
Expand Down Expand Up @@ -165,7 +169,8 @@ func (wd *Watchdog) Start(bin string, args ...string) error {
return nil
}

// Reset startup complete channel for next iteration.
// Reset startup ended flag for next iteration.
wd.startupEnded.Store(false)
wd.startupComplete = make(chan struct{})
}
}
Expand All @@ -179,13 +184,8 @@ func (wd *Watchdog) Stop() {
return // Already stopping or stopped.
}

// Ensure process startup is complete before attempting to stop.
// This prevents races during process initialization.
select {
case <-wd.startupComplete:
// Normal case - startup already completed.
default:
// Startup still in progress - wait for completion.
if !wd.startupEnded.Load() {
// Wait for startup to complete before stopping.
log.Infof("Waiting for process startup...")
<-wd.startupComplete
}
Expand Down
66 changes: 63 additions & 3 deletions magefile.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build mage
// +build mage

package main

Expand Down Expand Up @@ -31,6 +30,8 @@ const (
defaultDarwinConfigPath = "/usr/local/etc/tarantool"

cartridgePath = "cli/cartridge/third_party/cartridge-cli"

raceTestsTimeout = "10m"
)

var (
Expand Down Expand Up @@ -60,6 +61,11 @@ var (
"lib/integrity",
"lib/cluster",
}

validRaceTests = []raceTest{
{path: "./cli/running", runTag: "TestWatchdog"},
{path: "./lib/watchdog", runTag: "TestWatchdog"},
}
)

type BuildType string
Expand Down Expand Up @@ -330,6 +336,37 @@ func (Lint) Python() error {

type Unit mg.Namespace

type raceTest struct {
// Path to the folder with tests.
path string
// runTag specifies -run flag for go test.
// runTag can be a regexp pattern to match multiple test names.
// For example, "TestWatchdogBase|TestWatchdogNotRestartable"
// will run both tests.
runTag string
}

func runUnitRaceTests(flags []string) error {
args := []string{"test", "-timeout", raceTestsTimeout, "-race"}
if mg.Verbose() {
args = append(args, "-v")
}

for _, test := range validRaceTests {
argsTest := args
argsTest = append(argsTest, test.path)
argsTest = append(argsTest, "-run")
argsTest = append(argsTest, test.runTag)
argsTest = append(argsTest, flags...)

if err := sh.RunV(goExecutableName, argsTest...); err != nil {
return err
}
}

return nil
}

func runUnitTests(flags []string) error {
mg.Deps(GenerateGoCode)

Expand All @@ -354,15 +391,38 @@ func runUnitTests(flags []string) error {
// Run unit tests.
func (Unit) Default() error {
fmt.Println("Running unit tests...")
if err := runUnitTests([]string{}); err != nil {
return err
}

fmt.Println("\nRunning unit tests with race...")
if err := runUnitRaceTests([]string{}); err != nil {
return err
}

return runUnitTests([]string{})
return nil
}

// Run unit tests with the race detector enabled.
func (Unit) Race() error {
fmt.Println("Running unit tests with the race detector...")

return runUnitRaceTests([]string{})
}

// Run unit tests with a Tarantool instance integration.
func (Unit) Full() error {
fmt.Println("Running full unit tests...")
if err := runUnitTests([]string{"-tags", "integration,integration_docker"}); err != nil {
return err
}

return runUnitTests([]string{"-tags", "integration,integration_docker"})
fmt.Println("\nRunning full unit tests with race...")
if err := runUnitRaceTests([]string{"-tags", "integration,integration_docker"}); err != nil {
return err
}

return nil
}

// Run unit tests with a Tarantool instance integration, excluding docker tests.
Expand Down
Loading