Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions kruxiaflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,107 @@ async def get_workflow(self, workflow_id: str) -> dict[str, Any]:
except httpx.RequestError as e:
raise KruxiaFlowError(f"Request failed: {e}") from e

async def start_workflow(
self,
workflow_name: str,
*,
inputs: dict[str, Any] | None = None,
version: str | None = None,
) -> dict[str, Any]:
"""Start a workflow execution by name.

Args:
workflow_name: Name of the workflow to start
inputs: Input parameters for the workflow
version: Optional version (default: latest)

Returns:
Response containing workflow instance ID and status
"""
body: dict[str, Any] = {"name": workflow_name}
if inputs:
body["inputs"] = inputs
if version:
body["version"] = version

try:
response = await self._client.post("/api/v1/workflows/start", json=body)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed") from e
if e.response.status_code == 404:
raise WorkflowNotFoundError(
f"Workflow '{workflow_name}' not found"
) from e
raise KruxiaFlowError(
f"Failed to start workflow: {e.response.status_code} - {e.response.text}"
) from e
except httpx.RequestError as e:
raise KruxiaFlowError(f"Request failed: {e}") from e

async def get_workflow_output(self, workflow_id: str) -> dict[str, Any]:
"""Get workflow output.

Returns all activity outputs for a completed workflow.

Args:
workflow_id: Unique workflow execution ID

Returns:
Dictionary of activity outputs
"""
try:
response = await self._client.get(
f"/api/v1/workflows/{workflow_id}/output"
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed") from e
if e.response.status_code == 404:
raise WorkflowNotFoundError(
f"Workflow '{workflow_id}' not found"
) from e
raise KruxiaFlowError(
f"Failed to get workflow output: {e.response.status_code} - {e.response.text}"
) from e
except httpx.RequestError as e:
raise KruxiaFlowError(f"Request failed: {e}") from e

async def get_activity_output(
self, workflow_id: str, activity_key: str
) -> dict[str, Any]:
"""Get output for a specific activity.

Args:
workflow_id: Unique workflow execution ID
activity_key: Activity key within the workflow

Returns:
Activity output data
"""
try:
response = await self._client.get(
f"/api/v1/workflows/{workflow_id}/activities/{activity_key}/output"
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError("Authentication failed") from e
if e.response.status_code == 404:
raise WorkflowNotFoundError(
f"Workflow '{workflow_id}' or activity '{activity_key}' not found"
) from e
raise KruxiaFlowError(
f"Failed to get activity output: {e.response.status_code} - {e.response.text}"
) from e
except httpx.RequestError as e:
raise KruxiaFlowError(f"Request failed: {e}") from e

async def cancel_workflow(self, workflow_id: str) -> None:
"""Cancel a running workflow.

Expand Down
274 changes: 274 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,3 +992,277 @@ async def test_cancel_workflow_network_error(self, httpx_mock: HTTPXMock):
) as client:
with pytest.raises(KruxiaFlowError, match="Request failed"):
await client.cancel_workflow("wf-123")


@pytest.mark.usefixtures("clean_env")
class TestAsyncKruxiaFlowStartWorkflow:
"""Test AsyncKruxiaFlow start_workflow method."""

@pytest.mark.asyncio
async def test_start_workflow_success(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="POST",
url="http://localhost:8080/api/v1/workflows/start",
json={"instance_id": "inst-123", "status": "running"},
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
result = await client.start_workflow("my_workflow")

assert result["instance_id"] == "inst-123"

@pytest.mark.asyncio
async def test_start_workflow_with_inputs(self, httpx_mock: HTTPXMock):
import json

httpx_mock.add_response(
method="POST",
url="http://localhost:8080/api/v1/workflows/start",
json={"instance_id": "inst-456"},
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
result = await client.start_workflow(
"my_workflow",
inputs={"param1": "value1"},
version="2.0.0",
)

assert result["instance_id"] == "inst-456"

request = httpx_mock.get_request()
body = json.loads(request.content)
assert body["name"] == "my_workflow"
assert body["inputs"] == {"param1": "value1"}
assert body["version"] == "2.0.0"

@pytest.mark.asyncio
async def test_start_workflow_not_found(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="POST",
url="http://localhost:8080/api/v1/workflows/start",
status_code=404,
text="Not found",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(WorkflowNotFoundError, match="not found"):
await client.start_workflow("nonexistent")

@pytest.mark.asyncio
async def test_start_workflow_auth_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="POST",
url="http://localhost:8080/api/v1/workflows/start",
status_code=401,
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(AuthenticationError):
await client.start_workflow("my_workflow")

@pytest.mark.asyncio
async def test_start_workflow_server_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="POST",
url="http://localhost:8080/api/v1/workflows/start",
status_code=500,
text="Internal server error",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Failed to start workflow"):
await client.start_workflow("my_workflow")

@pytest.mark.asyncio
async def test_start_workflow_network_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_exception(
httpx.ConnectError("Connection refused"),
url="http://localhost:8080/api/v1/workflows/start",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Request failed"):
await client.start_workflow("my_workflow")


@pytest.mark.usefixtures("clean_env")
class TestAsyncKruxiaFlowGetWorkflowOutput:
"""Test AsyncKruxiaFlow get_workflow_output method."""

@pytest.mark.asyncio
async def test_get_workflow_output_success(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/output",
json={"step1": {"result": "done"}, "step2": {"count": 42}},
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
result = await client.get_workflow_output("wf-123")

assert result["step1"]["result"] == "done"
assert result["step2"]["count"] == 42

@pytest.mark.asyncio
async def test_get_workflow_output_not_found(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/nonexistent/output",
status_code=404,
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(WorkflowNotFoundError):
await client.get_workflow_output("nonexistent")

@pytest.mark.asyncio
async def test_get_workflow_output_auth_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/output",
status_code=401,
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(AuthenticationError):
await client.get_workflow_output("wf-123")

@pytest.mark.asyncio
async def test_get_workflow_output_server_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/output",
status_code=500,
text="Internal server error",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Failed to get workflow output"):
await client.get_workflow_output("wf-123")

@pytest.mark.asyncio
async def test_get_workflow_output_network_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_exception(
httpx.ConnectError("Connection refused"),
url="http://localhost:8080/api/v1/workflows/wf-123/output",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Request failed"):
await client.get_workflow_output("wf-123")


@pytest.mark.usefixtures("clean_env")
class TestAsyncKruxiaFlowGetActivityOutput:
"""Test AsyncKruxiaFlow get_activity_output method."""

@pytest.mark.asyncio
async def test_get_activity_output_success(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/activities/step1/output",
json={"result": "success", "data": [1, 2, 3]},
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
result = await client.get_activity_output("wf-123", "step1")

assert result["result"] == "success"
assert result["data"] == [1, 2, 3]

@pytest.mark.asyncio
async def test_get_activity_output_not_found(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/activities/missing/output",
status_code=404,
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(WorkflowNotFoundError, match="activity 'missing' not found"):
await client.get_activity_output("wf-123", "missing")

@pytest.mark.asyncio
async def test_get_activity_output_auth_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/activities/step1/output",
status_code=401,
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(AuthenticationError):
await client.get_activity_output("wf-123", "step1")

@pytest.mark.asyncio
async def test_get_activity_output_server_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_response(
method="GET",
url="http://localhost:8080/api/v1/workflows/wf-123/activities/step1/output",
status_code=500,
text="Internal server error",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Failed to get activity output"):
await client.get_activity_output("wf-123", "step1")

@pytest.mark.asyncio
async def test_get_activity_output_network_error(self, httpx_mock: HTTPXMock):
httpx_mock.add_exception(
httpx.ConnectError("Connection refused"),
url="http://localhost:8080/api/v1/workflows/wf-123/activities/step1/output",
)

async with AsyncKruxiaFlow(
api_url="http://localhost:8080",
api_token="token",
) as client:
with pytest.raises(KruxiaFlowError, match="Request failed"):
await client.get_activity_output("wf-123", "step1")
Loading
Loading