Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/cli/cmd/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ var yamlflowRunCmd = &cobra.Command{
&services.NodeAiProvider,
&services.NodeMemory,
&services.NodeGraphQL,
&services.NodeWsConnection,
&services.NodeWsSend,
&services.NodeWait,
&services.WebSocket,
&services.WebSocketHeader,
&services.GraphQL,
&services.GraphQLHeader,
&services.Workspace,
Expand Down
37 changes: 26 additions & 11 deletions apps/cli/internal/common/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/sflow"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/sgraphql"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/shttp"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/swebsocket"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/sworkspace"
)

Expand All @@ -32,16 +33,23 @@ type Services struct {
FlowVariable sflow.FlowVariableService

// Flow Nodes
Node sflow.NodeService
NodeRequest sflow.NodeRequestService
NodeFor sflow.NodeForService
NodeForEach sflow.NodeForEachService
NodeIf sflow.NodeIfService
NodeJS sflow.NodeJsService
NodeAI sflow.NodeAIService
NodeAiProvider sflow.NodeAiProviderService
NodeMemory sflow.NodeMemoryService
NodeGraphQL sflow.NodeGraphQLService
Node sflow.NodeService
NodeRequest sflow.NodeRequestService
NodeFor sflow.NodeForService
NodeForEach sflow.NodeForEachService
NodeIf sflow.NodeIfService
NodeJS sflow.NodeJsService
NodeAI sflow.NodeAIService
NodeAiProvider sflow.NodeAiProviderService
NodeMemory sflow.NodeMemoryService
NodeGraphQL sflow.NodeGraphQLService
NodeWsConnection sflow.NodeWsConnectionService
NodeWsSend sflow.NodeWsSendService
NodeWait sflow.NodeWaitService

// WebSocket
WebSocket swebsocket.WebSocketService
WebSocketHeader swebsocket.WebSocketHeaderService

// GraphQL
GraphQL sgraphql.GraphQLService
Expand Down Expand Up @@ -94,7 +102,14 @@ func CreateServices(ctx context.Context, db *sql.DB, logger *slog.Logger) (*Serv
NodeAI: sflow.NewNodeAIService(queries),
NodeAiProvider: sflow.NewNodeAiProviderService(queries),
NodeMemory: sflow.NewNodeMemoryService(queries),
NodeGraphQL: sflow.NewNodeGraphQLService(queries),
NodeGraphQL: sflow.NewNodeGraphQLService(queries),
NodeWsConnection: sflow.NewNodeWsConnectionService(queries),
NodeWsSend: sflow.NewNodeWsSendService(queries),
NodeWait: sflow.NewNodeWaitService(queries),

// WebSocket
WebSocket: swebsocket.New(queries, logger),
WebSocketHeader: swebsocket.NewWebSocketHeaderService(queries),

// GraphQL
GraphQL: sgraphql.New(queries, logger),
Expand Down
4 changes: 2 additions & 2 deletions apps/cli/internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func RunFlow(ctx context.Context, flowPtr *mflow.Flow, services RunnerServices,
defer close(gqlRespChan)

// Build flow node map using flowbuilder
flowNodeMap, startNodeID, err := services.Builder.BuildNodes(
flowNodeMap, startNodeIDs, err := services.Builder.BuildNodes(
ctx,
*flowPtr,
nodes,
Expand All @@ -279,7 +279,7 @@ func RunFlow(ctx context.Context, flowPtr *mflow.Flow, services RunnerServices,
}

// Use the same timeout for the flow runner
runnerInst := flowlocalrunner.CreateFlowRunner(idwrap.NewNow(), latestFlowID, startNodeID, flowNodeMap, edgeMap, nodeTimeout, nil)
runnerInst := flowlocalrunner.CreateFlowRunner(idwrap.NewNow(), latestFlowID, startNodeIDs, flowNodeMap, edgeMap, nodeTimeout, nil)

// Use a large buffer for CLI to avoid blocking
flowNodeStatusChan := make(chan runner.FlowNodeStatus, 10000)
Expand Down
119 changes: 119 additions & 0 deletions apps/cli/internal/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/senv"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/sflow"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/shttp"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/swebsocket"
"github.com/the-dev-tools/dev-tools/packages/server/pkg/service/sworkspace"
yamlflowsimplev2 "github.com/the-dev-tools/dev-tools/packages/server/pkg/translate/yamlflowsimplev2"
"github.com/the-dev-tools/dev-tools/packages/spec/dist/buf/go/api/private/node_js_executor/v1/node_js_executorv1connect"
"github.com/coder/websocket"
)

// flowTestFixture provides a common test environment for flow execution tests
Expand Down Expand Up @@ -88,6 +90,13 @@ func newFlowTestFixture(t *testing.T) *flowTestFixture {
httpBodyRawService := shttp.NewHttpBodyRawService(queries)
httpAssertService := shttp.NewHttpAssertService(queries)

// WebSocket services
nodeWsConnectionService := sflow.NewNodeWsConnectionService(queries)
nodeWsSendService := sflow.NewNodeWsSendService(queries)
nodeWaitService := sflow.NewNodeWaitService(queries)
webSocketService := swebsocket.New(queries, logger)
webSocketHeaderService := swebsocket.NewWebSocketHeaderService(queries)

// Additional services for builder
varService := senv.NewVariableService(queries, logger)

Expand All @@ -114,6 +123,11 @@ func newFlowTestFixture(t *testing.T) *flowTestFixture {
nil, // NodeAiProviderService - not needed for CLI tests
nil, // NodeMemoryService - not needed for CLI tests
nil, // NodeGraphQLService - not needed for CLI tests
&nodeWsConnectionService,
&nodeWsSendService,
&nodeWaitService,
&webSocketService,
&webSocketHeaderService,
nil, // GraphQLService - not needed for CLI tests
nil, // GraphQLHeaderService - not needed for CLI tests
&workspaceService,
Expand Down Expand Up @@ -149,6 +163,11 @@ func newFlowTestFixture(t *testing.T) *flowTestFixture {
HTTPBodyUrlEncoded: httpBodyUrlEncodedService,
HTTPBodyRaw: httpBodyRawService,
HTTPAssert: httpAssertService,
NodeWsConnection: nodeWsConnectionService,
NodeWsSend: nodeWsSendService,
NodeWait: nodeWaitService,
WebSocket: webSocketService,
WebSocketHeader: webSocketHeaderService,
Logger: logger,
}

Expand Down Expand Up @@ -763,3 +782,103 @@ flows:
t.Error("OrphanRequest should NOT have been executed (it's an orphan node)")
}
}

// echoWSServer creates a test WebSocket server that echoes messages back.
func echoWSServer(t *testing.T) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
defer conn.Close(websocket.StatusNormalClosure, "") //nolint:errcheck // best-effort cleanup
for {
typ, msg, err := conn.Read(r.Context())
if err != nil {
return
}
if err := conn.Write(r.Context(), typ, msg); err != nil {
return
}
}
}))
}

func wsURL(s *httptest.Server) string {
return "ws" + strings.TrimPrefix(s.URL, "http")
}

// TestFlowRun_WebSocket tests a flow with WebSocket connection and send nodes.
func TestFlowRun_WebSocket(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}

fixture := newFlowTestFixture(t)

wsSrv := echoWSServer(t)
defer wsSrv.Close()

yamlContent := fmt.Sprintf(`workspace_name: WS Test
flows:
- name: WSFlow
steps:
- manual_start:
name: Start
- ws_connection:
name: MyWS
depends_on: Start
url: %s
- ws_send:
name: SendHello
depends_on: MyWS
ws_connection_node_name: MyWS
message: '{"hello":"world"}'
`, wsURL(wsSrv))

resolved, err := yamlflowsimplev2.ConvertSimplifiedYAML([]byte(yamlContent), yamlflowsimplev2.ConvertOptionsV2{
WorkspaceID: fixture.workspaceID,
})
if err != nil {
t.Fatalf("failed to convert YAML: %v", err)
}

fixture.importWorkspaceBundle(resolved)

flow := fixture.getFlowByName("WSFlow")
if flow == nil {
t.Fatal("WSFlow not found")
}

ctx, cancel := context.WithTimeout(fixture.ctx, 10*time.Second)
defer cancel()

result, err := runner.RunFlow(ctx, flow, fixture.getRunnerServices(nil), nil)

if err != nil {
t.Errorf("flow execution failed: %v", err)
}

if result.Status != "success" {
t.Errorf("expected status 'success', got '%s'. Error: %s", result.Status, result.Error)
}

// Verify both WS nodes were executed
foundConn := false
foundSend := false
for _, node := range result.Nodes {
switch node.Name {
case "MyWS":
foundConn = true
case "SendHello":
foundSend = true
}
}

if !foundConn {
t.Error("WS connection node 'MyWS' was not executed")
}
if !foundSend {
t.Error("WS send node 'SendHello' was not executed")
}
}
1 change: 1 addition & 0 deletions apps/cli/test/yamlflow/integration_yamlflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ func TestYAMLFlow_TestRunField(t *testing.T) {
func TestYAMLFlow_GraphQLRun(t *testing.T) {
runCLI(t, "graphql_run_example.yaml")
}

57 changes: 57 additions & 0 deletions apps/cli/test/yamlflow/ws_run_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
workspace_name: New Workspace
run:
- flow: ws-integration
flows:
- name: ws-integration
steps:
- manual_start:
name: Start
position_x: -182.05
position_y: -3.7
- wait:
name: wait_4
depends_on: Start
position_x: 201
position_y: -139.77
duration_ms: '8000'
- ws_connection:
name: ws_connection_5
depends_on: Start
position_x: 218.43
position_y: -3.21
url: http://localhost:8080/
- for:
name: for_6
depends_on: ws_connection_5
position_x: 462.56
position_y: -146.17
iter_count: '10'
- wait:
name: wait_7
depends_on: ws_connection_5
position_x: 470.57
position_y: 88.8
duration_ms: '1000'
- ws_send:
name: ws_send_6
depends_on: for_6.loop
position_x: 592.26
position_y: -52.7
ws_connection_node_name: ws_connection_5
message: '{"a":"{{ for_6.index }}"}'
- ws_send:
name: ws_send_6_1
depends_on: wait_7
position_x: 631.96
position_y: 99.13
ws_connection_node_name: ws_connection_5
message: '{"a":"2"}'
- wait:
name: wait_7
depends_on: ws_send_6
position_x: 803.84
position_y: -54.89
duration_ms: '1000'
environments:
- name: default
variables: {}
2 changes: 1 addition & 1 deletion apps/desktop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"electron": "catalog:",
"electron-builder": "catalog:",
"electron-devtools-installer": "catalog:",
"electron-updater": "6.7.3",
"electron-updater": "catalog:",
"electron-vite": "catalog:",
"eslint": "catalog:",
"react": "catalog:",
Expand Down
2 changes: 1 addition & 1 deletion apps/desktop/src/renderer/main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const UpdateAvailable = ({ children }: UpdateAvailableProps) => {
<div className={tw`mt-2 text-2xl`}>Update available!</div>
</div>

{/* eslint-disable-next-line better-tailwindcss/no-unregistered-classes */}
{/* eslint-disable-next-line better-tailwindcss/no-unknown-classes */}
<div className={tw`prose dark:prose-invert flex-1 overflow-auto`}>
<Markdown>{children}</Markdown>
</div>
Expand Down
Loading
Loading