diff --git a/CHANGELOG.md b/CHANGELOG.md index 7789926..83fb8fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.35.1] - 2026-02-23 +### Added +- `as_list` parameter to `search_udm()` for returning events as a list instead of dictionary +- CLI `--as-list` flag for `secops search` command + +### Updated +- Migrated `search_udm()` to use `chronicle_request` helper for improved error handling and consistency + ## [0.35.0] - 2026-02-18 ### Added - CLI commands for rule retrohunt management diff --git a/CLI.md b/CLI.md index 9b8f7d4..eb9ec6f 100644 --- a/CLI.md +++ b/CLI.md @@ -124,6 +124,9 @@ Search for events using UDM query syntax: ```bash secops search --query "metadata.event_type = \"NETWORK_CONNECTION\"" --max-events 10 + +# Get result as list +secops search --query "metadata.event_type = \"NETWORK_CONNECTION\"" --max-events 10 --as-list ``` Search using natural language: diff --git a/pyproject.toml b/pyproject.toml index 55a4ca6..11d9005 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.35.0" +version = "0.35.1" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index cdf63b2..2795f8b 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -873,7 +873,8 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, - ) -> dict[str, Any]: + as_list: bool = False, + ) -> dict[str, Any] | list[dict[str, Any]]: """Search UDM events in Chronicle. Args: @@ -885,13 +886,13 @@ def search_udm( max_attempts: Maximum number of polling attempts (default: 30) timeout: Timeout in seconds for each API request (default: 30) debug: Print debug information during execution + as_list: If True, return a list of events instead of a dict + with events list and nextPageToken. Returns: - Dictionary with search results containing: - - events: List of UDM events with 'name' and 'udm' fields - - total_events: Number of events returned - - more_data_available: Boolean indicating - if more results are available + If as_list is True: List of Events. + If as_list is False: Dict with event list, total number of event and + flag to check if more data is available. Raises: APIError: If the API request fails @@ -906,6 +907,7 @@ def search_udm( max_attempts, timeout, debug, + as_list, ) def find_udm_field_values( diff --git a/src/secops/chronicle/search.py b/src/secops/chronicle/search.py index 037a6e5..32f0c7a 100644 --- a/src/secops/chronicle/search.py +++ b/src/secops/chronicle/search.py @@ -15,15 +15,19 @@ """UDM search functionality for Chronicle.""" from datetime import datetime -from typing import Any +from typing import Any, TYPE_CHECKING -import requests +from secops.chronicle.models import APIVersion +from secops.chronicle.utils.request_utils import ( + chronicle_request, +) -from secops.exceptions import APIError +if TYPE_CHECKING: + from secops.chronicle.client import ChronicleClient def search_udm( - client, + client: "ChronicleClient", query: str, start_time: datetime, end_time: datetime, @@ -32,7 +36,8 @@ def search_udm( max_attempts: int = 30, timeout: int = 30, debug: bool = False, -) -> dict[str, Any]: + as_list: bool = False, +) -> dict[str, Any] | list[dict[str, Any]]: """Perform a UDM search query using the Chronicle V1alpha API. Args: @@ -46,23 +51,19 @@ def search_udm( for backwards compatibility) timeout: Timeout in seconds for each API request (default: 30) debug: Print debug information during execution + as_list: Whether to return results as a list or dictionary Returns: - Dict containing the search results with events + If as_list is True: List of Events. + If as_list is False: Dict with event list, total number of event and + flag to check if more data is available. Raises: APIError: If the API request fails """ - # Unused parameters, kept for backward compatibility _ = (case_insensitive, max_attempts) - # Format the instance ID for the API call - instance = client.instance_id - - # Endpoint for UDM search - url = f"{client.base_url}/{instance}:udmSearch" - # Format times for the API start_time_str = start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") end_time_str = end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") @@ -79,40 +80,21 @@ def search_udm( print(f"Executing UDM search: {query}") print(f"Time range: {start_time_str} to {end_time_str}") - try: - response = client.session.get(url, params=params, timeout=timeout) - - if response.status_code != 200: - error_msg = ( - f"Error executing search: Status {response.status_code}, " - f"Response: {response.text}" - ) - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) - - # Parse the response - response_data = response.json() - - # Extract events and metadata - events = response_data.get("events", []) - more_data_available = response_data.get("moreDataAvailable", False) - - if debug: - print(f"Found {len(events)} events") - print(f"More data available: {more_data_available}") - - # Build the result structure to match the expected format - result = { - "events": events, - "total_events": len(events), - "more_data_available": more_data_available, - } - - return result - - except requests.exceptions.RequestException as e: - error_msg = f"Request failed: {str(e)}" - if debug: - print(f"Error: {error_msg}") - raise APIError(error_msg) from e + result = chronicle_request( + client, + method="GET", + endpoint_path=":udmSearch", + api_version=APIVersion.V1ALPHA, + params=params, + timeout=timeout, + ) + + if as_list: + return result.get("events", []) + + events = result.get("events", []) + return { + "events": events, + "total_events": len(events), + "more_data_available": result.get("moreDataAvailable", False), + } diff --git a/src/secops/cli/commands/search.py b/src/secops/cli/commands/search.py index cd58514..fd6232f 100644 --- a/src/secops/cli/commands/search.py +++ b/src/secops/cli/commands/search.py @@ -19,6 +19,7 @@ from secops.cli.utils.common_args import ( add_pagination_args, add_time_range_args, + add_as_list_arg, ) from secops.cli.utils.formatters import output_formatter from secops.cli.utils.time_utils import get_time_range @@ -58,6 +59,7 @@ def setup_search_command(subparsers): "--csv", action="store_true", help="Output in CSV format" ) add_time_range_args(search_parser) + add_as_list_arg(search_parser) search_parser.set_defaults(func=handle_search_command) search_subparser = search_parser.add_subparsers( @@ -115,6 +117,7 @@ def handle_search_command(args, chronicle): start_time=start_time, end_time=end_time, max_events=args.max_events, + as_list=args.as_list or False, ) output_formatter(result, args.output) except Exception as e: # pylint: disable=broad-exception-caught diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index e9cb648..f826590 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -90,41 +90,6 @@ def test_chronicle_client_custom_session_user_agent(): assert client.session.headers.get("User-Agent") == "secops-wrapper-sdk" -def test_search_udm(chronicle_client): - """Test UDM search functionality.""" - # Mock the search request - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = { - "events": [ - { - "name": "projects/test-project/locations/us/instances/test-instance/events/event1", - "udm": { - "metadata": { - "eventTimestamp": "2024-01-01T00:00:00Z", - "eventType": "NETWORK_CONNECTION", - }, - "target": {"ip": "192.168.1.1", "hostname": "test-host"}, - }, - } - ], - "moreDataAvailable": False, - } - - with patch.object(chronicle_client.session, "get", return_value=mock_response): - result = chronicle_client.search_udm( - query='target.ip != ""', - start_time=datetime(2024, 1, 1, tzinfo=timezone.utc), - end_time=datetime(2024, 1, 2, tzinfo=timezone.utc), - max_events=10, - ) - - assert "events" in result - assert "total_events" in result - assert result["total_events"] == 1 - assert result["events"][0]["udm"]["target"]["ip"] == "192.168.1.1" - - @patch("secops.chronicle.entity._detect_value_type_for_query") @patch("secops.chronicle.entity._summarize_entity_by_id") def test_summarize_entity_ip(mock_summarize_by_id, mock_detect, chronicle_client): diff --git a/tests/chronicle/test_search.py b/tests/chronicle/test_search.py new file mode 100644 index 0000000..ef094b3 --- /dev/null +++ b/tests/chronicle/test_search.py @@ -0,0 +1,200 @@ +"""Tests for Chronicle UDM search functionality (search_udm).""" + +from __future__ import annotations + +import unittest +from datetime import datetime, timedelta, timezone +from unittest import mock + +from secops.chronicle.models import APIVersion +from secops.chronicle.search import search_udm + + +class TestChronicleUdmSearch(unittest.TestCase): + """Tests for Chronicle search functionality.""" + + def setUp(self) -> None: + self.client = mock.MagicMock() + self.start_time = datetime.now(tz=timezone.utc) - timedelta(days=1) + self.end_time = datetime.now(tz=timezone.utc) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_returns_expected_shape( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.return_value = { + "events": [{"id": 1}, {"id": 2}], + "moreDataAvailable": True, + } + + result = search_udm( + client=self.client, + query='metadata.event_type = "NETWORK_CONNECTION"', + start_time=self.start_time, + end_time=self.end_time, + max_events=500, + ) + + self.assertEqual(result["events"], [{"id": 1}, {"id": 2}]) + self.assertEqual(result["total_events"], 2) + self.assertTrue(result["more_data_available"]) + + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + + self.assertEqual(kwargs["method"], "GET") + self.assertEqual(kwargs["endpoint_path"], ":udmSearch") + self.assertEqual(kwargs["api_version"], APIVersion.V1ALPHA) + + params = kwargs["params"] + self.assertEqual( + params["query"], 'metadata.event_type = "NETWORK_CONNECTION"' + ) + self.assertEqual(params["limit"], 500) + self.assertEqual( + params["timeRange.start_time"], + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + self.assertEqual( + params["timeRange.end_time"], + self.end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_defaults_when_keys_missing( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + # Missing "events" and "moreDataAvailable" should default safely + mock_chronicle_request.return_value = {} + + result = search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertEqual(result["events"], []) + self.assertEqual(result["total_events"], 0) + self.assertFalse(result["more_data_available"]) + + @mock.patch("secops.chronicle.search.chronicle_request") + @mock.patch("builtins.print") + def test_search_udm_debug_prints( + self, mock_print: mock.MagicMock, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.return_value = {"events": []} + + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + debug=True, + ) + + # Two prints: query + time range + self.assertGreaterEqual(mock_print.call_count, 2) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_propagates_api_error( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + mock_chronicle_request.side_effect = Exception("boom") + + with self.assertRaises(Exception) as ctx: + search_udm( + client=self.client, + query="q", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertIn("boom", str(ctx.exception)) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_as_list_returns_list( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that as_list=True returns a list of events.""" + mock_chronicle_request.return_value = { + "events": [{"id": 1}, {"id": 2}, {"id": 3}], + "moreDataAvailable": True, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + as_list=True, + ) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 3) + self.assertEqual(result, [{"id": 1}, {"id": 2}, {"id": 3}]) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_as_list_with_missing_events( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that as_list=True returns empty list when events missing.""" + mock_chronicle_request.return_value = { + "moreDataAvailable": False, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + as_list=True, + ) + + self.assertIsInstance(result, list) + self.assertEqual(len(result), 0) + self.assertEqual(result, []) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_timeout_parameter_passed( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test that timeout parameter is correctly passed.""" + mock_chronicle_request.return_value = {"events": []} + + search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + timeout=60, + ) + + mock_chronicle_request.assert_called_once() + _, kwargs = mock_chronicle_request.call_args + self.assertEqual(kwargs["timeout"], 60) + + @mock.patch("secops.chronicle.search.chronicle_request") + def test_search_udm_empty_events_list( + self, mock_chronicle_request: mock.MagicMock + ) -> None: + """Test handling of empty events list in response.""" + mock_chronicle_request.return_value = { + "events": [], + "moreDataAvailable": False, + } + + result = search_udm( + client=self.client, + query="test", + start_time=self.start_time, + end_time=self.end_time, + ) + + self.assertEqual(result["events"], []) + self.assertEqual(result["total_events"], 0) + self.assertFalse(result["more_data_available"]) + + +if __name__ == "__main__": + unittest.main()