Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.35.1] - 2026-02-23
### Added
- `as_list` parameter to `search_udm()` for returning events as a list instead of dictionary
- CLI `--as-list` flag for `secops search` command

### Updated
- Migrated `search_udm()` to use `chronicle_request` helper for improved error handling and consistency

## [0.35.0] - 2026-02-18
### Added
- CLI commands for rule retrohunt management
Expand Down
3 changes: 3 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ Search for events using UDM query syntax:

```bash
secops search --query "metadata.event_type = \"NETWORK_CONNECTION\"" --max-events 10

# Get result as list
secops search --query "metadata.event_type = \"NETWORK_CONNECTION\"" --max-events 10 --as-list
```

Search using natural language:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "secops"
version = "0.35.0"
version = "0.35.1"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
14 changes: 8 additions & 6 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,8 @@ def search_udm(
max_attempts: int = 30,
timeout: int = 30,
debug: bool = False,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Search UDM events in Chronicle.

Args:
Expand All @@ -885,13 +886,13 @@ def search_udm(
max_attempts: Maximum number of polling attempts (default: 30)
timeout: Timeout in seconds for each API request (default: 30)
debug: Print debug information during execution
as_list: If True, return a list of events instead of a dict
with events list and nextPageToken.

Returns:
Dictionary with search results containing:
- events: List of UDM events with 'name' and 'udm' fields
- total_events: Number of events returned
- more_data_available: Boolean indicating
if more results are available
If as_list is True: List of Events.
If as_list is False: Dict with event list, total number of event and
flag to check if more data is available.

Raises:
APIError: If the API request fails
Expand All @@ -906,6 +907,7 @@ def search_udm(
max_attempts,
timeout,
debug,
as_list,
)

def find_udm_field_values(
Expand Down
82 changes: 32 additions & 50 deletions src/secops/chronicle/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
"""UDM search functionality for Chronicle."""

from datetime import datetime
from typing import Any
from typing import Any, TYPE_CHECKING

import requests
from secops.chronicle.models import APIVersion
from secops.chronicle.utils.request_utils import (
chronicle_request,
)

from secops.exceptions import APIError
if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def search_udm(
client,
client: "ChronicleClient",
query: str,
start_time: datetime,
end_time: datetime,
Expand All @@ -32,7 +36,8 @@ def search_udm(
max_attempts: int = 30,
timeout: int = 30,
debug: bool = False,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[dict[str, Any]]:
"""Perform a UDM search query using the Chronicle V1alpha API.

Args:
Expand All @@ -46,23 +51,19 @@ def search_udm(
for backwards compatibility)
timeout: Timeout in seconds for each API request (default: 30)
debug: Print debug information during execution
as_list: Whether to return results as a list or dictionary

Returns:
Dict containing the search results with events
If as_list is True: List of Events.
If as_list is False: Dict with event list, total number of event and
flag to check if more data is available.

Raises:
APIError: If the API request fails
"""

# Unused parameters, kept for backward compatibility
_ = (case_insensitive, max_attempts)

# Format the instance ID for the API call
instance = client.instance_id

# Endpoint for UDM search
url = f"{client.base_url}/{instance}:udmSearch"

# Format times for the API
start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
Expand All @@ -79,40 +80,21 @@ def search_udm(
print(f"Executing UDM search: {query}")
print(f"Time range: {start_time_str} to {end_time_str}")

try:
response = client.session.get(url, params=params, timeout=timeout)

if response.status_code != 200:
error_msg = (
f"Error executing search: Status {response.status_code}, "
f"Response: {response.text}"
)
if debug:
print(f"Error: {error_msg}")
raise APIError(error_msg)

# Parse the response
response_data = response.json()

# Extract events and metadata
events = response_data.get("events", [])
more_data_available = response_data.get("moreDataAvailable", False)

if debug:
print(f"Found {len(events)} events")
print(f"More data available: {more_data_available}")

# Build the result structure to match the expected format
result = {
"events": events,
"total_events": len(events),
"more_data_available": more_data_available,
}

return result

except requests.exceptions.RequestException as e:
error_msg = f"Request failed: {str(e)}"
if debug:
print(f"Error: {error_msg}")
raise APIError(error_msg) from e
result = chronicle_request(
client,
method="GET",
endpoint_path=":udmSearch",
api_version=APIVersion.V1ALPHA,
params=params,
timeout=timeout,
)

if as_list:
return result.get("events", [])

events = result.get("events", [])
return {
"events": events,
"total_events": len(events),
"more_data_available": result.get("moreDataAvailable", False),
}
3 changes: 3 additions & 0 deletions src/secops/cli/commands/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from secops.cli.utils.common_args import (
add_pagination_args,
add_time_range_args,
add_as_list_arg,
)
from secops.cli.utils.formatters import output_formatter
from secops.cli.utils.time_utils import get_time_range
Expand Down Expand Up @@ -58,6 +59,7 @@ def setup_search_command(subparsers):
"--csv", action="store_true", help="Output in CSV format"
)
add_time_range_args(search_parser)
add_as_list_arg(search_parser)
search_parser.set_defaults(func=handle_search_command)

search_subparser = search_parser.add_subparsers(
Expand Down Expand Up @@ -115,6 +117,7 @@ def handle_search_command(args, chronicle):
start_time=start_time,
end_time=end_time,
max_events=args.max_events,
as_list=args.as_list or False,
)
output_formatter(result, args.output)
except Exception as e: # pylint: disable=broad-exception-caught
Expand Down
35 changes: 0 additions & 35 deletions tests/chronicle/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,6 @@ def test_chronicle_client_custom_session_user_agent():
assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk"


def test_search_udm(chronicle_client):
"""Test UDM search functionality."""
# Mock the search request
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"events": [
{
"name": "projects/test-project/locations/us/instances/test-instance/events/event1",
"udm": {
"metadata": {
"eventTimestamp": "2024-01-01T00:00:00Z",
"eventType": "NETWORK_CONNECTION",
},
"target": {"ip": "192.168.1.1", "hostname": "test-host"},
},
}
],
"moreDataAvailable": False,
}

with patch.object(chronicle_client.session, "get", return_value=mock_response):
result = chronicle_client.search_udm(
query='target.ip != ""',
start_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
end_time=datetime(2024, 1, 2, tzinfo=timezone.utc),
max_events=10,
)

assert "events" in result
assert "total_events" in result
assert result["total_events"] == 1
assert result["events"][0]["udm"]["target"]["ip"] == "192.168.1.1"


@patch("secops.chronicle.entity._detect_value_type_for_query")
@patch("secops.chronicle.entity._summarize_entity_by_id")
def test_summarize_entity_ip(mock_summarize_by_id, mock_detect, chronicle_client):
Expand Down
Loading
Loading