diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 550815c..7c67073 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - uses: actions/checkout@v3 diff --git a/CHANGES b/CHANGES index 9ea5650..fd35d6c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +1.25.0 +------- +- Add support for Python 3.14 +- Drop support for Python 3.9 +- Modernize type annotations to use PEP 604 union syntax (X | Y) +- Use structural pattern matching in status code handling + 1.24.1 ------- - Rate limit exception improvements diff --git a/README.md b/README.md index 72a0177..989e5c9 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ![Build](https://github.com/intezer/analyze-python-sdk/actions/workflows/test.yml/badge.svg) # Intezer SDK -The SDK wraps Intezer Analyze API 2.0 ([View full API documentation](https://analyze.intezer.com/api-docs.html)) +The SDK wraps Intezer Platform API 2.0 ([View full API documentation](https://analyze.intezer.com/api-docs.html)) Currently, the following options are available in the SDK: diff --git a/examples/sentinel_one_integration.py b/examples/sentinel_one_integration.py deleted file mode 100755 index 3b88aff..0000000 --- a/examples/sentinel_one_integration.py +++ /dev/null @@ -1,257 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import datetime -import io -import logging -import logging.handlers -import secrets -import sys -import time -import urllib.parse -from http import HTTPStatus -from typing import Optional -from typing import Tuple - -import requests -import requests.adapters - -from intezer_sdk import api -from intezer_sdk import errors -from intezer_sdk import util -from intezer_sdk.analysis import FileAnalysis - -_s1_session: Optional[requests.Session] = None -_logger = logging.getLogger('intezer') - - -class BaseUrlSession(requests.Session): - """Taken from https://github.com/requests/toolbelt/blob/master/requests_toolbelt/sessions.py""" - base_url = None - - def __init__(self, base_url=None): - if base_url: - self.base_url = base_url - super(BaseUrlSession, self).__init__() - - def request(self, method, url, *args, **kwargs): - """Send the request after generating the complete URL.""" - url = self.create_url(url) - return super(BaseUrlSession, self).request( - method, url, *args, **kwargs - ) - - def prepare_request(self, request): - """Prepare the request after generating the complete URL.""" - request.url = self.create_url(request.url) - return super(BaseUrlSession, self).prepare_request(request) - - def create_url(self, url): - """Create the URL based off this partial path.""" - return urllib.parse.urljoin(self.base_url, url) - - -def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verification: bool): - headers = {'Authorization': 'ApiToken ' + api_token} - global _s1_session - _s1_session = BaseUrlSession(base_url) - _s1_session.headers = headers - _s1_session.verify = not skip_ssl_verification - _s1_session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) - _s1_session.mount('http://', requests.adapters.HTTPAdapter(max_retries=3)) - - -def analyze_by_file(threat_id: str): - download_url, zipp_password = fetch_file(threat_id) - file = download_file(download_url) - _logger.debug('starting to analyze file') - analysis = FileAnalysis(file_stream=file, file_name=f'{threat_id}.zip', zip_password=zipp_password) - return analysis - - -def fetch_file(threat_id: str) -> Tuple[str, Optional[str]]: - zip_password = secrets.token_urlsafe(32) - fetch_file_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=5) - - _logger.debug('sending fetch command to the endpoint') - response = _s1_session.post('/web/api/v2.1/threats/fetch-file', - json={'data': {'password': zip_password}, 'filter': {'ids': [threat_id]}}) - assert_s1_response(response) - - for count in range(20): - _logger.debug(f'waiting for s1 to fetch the file from the endpoint ({count})') - time.sleep(10) - response = _s1_session.get('/web/api/v2.1/activities', - params={'threatIds': threat_id, - 'activityTypes': 86, - 'createdAt__gte': fetch_file_time.isoformat()}) - assert_s1_response(response) - data = response.json() - - for activity in data['data']: - download_url = activity['data'].get('downloadUrl') - if download_url: - return download_url, zip_password - else: - err_msg = ('Time out fetching the file, this is most likely when the endpoint is powered off' - 'or the agent is shut down') - - _logger.debug(err_msg) - raise Exception(err_msg) - - -def download_file(download_url: str): - _logger.debug(f'downloading file from s1 (download url of {download_url})') - response = _s1_session.get('/web/api/v2.1' + download_url) - assert_s1_response(response) - _logger.debug(f'download finished') - - file = io.BytesIO(response.content) - return file - - -def format_s1_error(error: dict) -> str: - error_text = '' - if 'title' in error: - error_text = f'{error["title"]}' - if 'details' in error: - error_text = f'{error_text}: {error["details"]}' - if 'code' in error: - error_text = f'{error_text} (code:{error["code"]})' - return error_text - - -def assert_s1_response(response: requests.Response): - if response.status_code != HTTPStatus.OK: - try: - response_data = response.json() - error_text = '\n'.join(format_s1_error(error) for error in response_data['errors']) - except Exception: - error_text = f'Server returned {response.status_code} status code' - - _logger.error(error_text) - raise Exception(error_text) - - -def get_threat(threat_id: str): - response = _s1_session.get('/web/api/v2.1/threats', params={'Ids': threat_id}) - assert_s1_response(response) - return response.json()['data'][0] - - -def filter_threat(threat_info: dict) -> bool: - return threat_info['agentDetectionInfo']['agentOsName'].lower().startswith(('linux', 'windows')) - - -def send_note(threat_id: str, analysis: FileAnalysis): - note = util.get_analysis_summary(analysis) - - response = _s1_session.post('/web/api/v2.1/threats/notes', - json={'data': {'text': note}, 'filter': {'ids': [threat_id]}}) - assert_s1_response(response) - _logger.info('note sent') - - -def send_failure_note(note: str, threat_id: str): - note = f'Intezer Analyze File Scan failed: {note}' - response = _s1_session.post('/web/api/v2.1/threats/notes', - json={'data': {'text': note}, 'filter': {'ids': [threat_id]}}) - assert_s1_response(response) - - -def analyze_threat(threat_id: str, threat: dict = None): - _logger.info(f'incoming threat: {threat_id}') - try: - if not threat: - threat = get_threat(threat_id) - if not filter_threat(threat): - _logger.info(f'threat {threat_id} is been filtered') - return - - threat_info = threat['threatInfo'] - file_hash = threat_info.get('sha256') or threat_info.get('sha1') or threat_info.get('md5') - analysis = None - if file_hash: - _logger.debug(f'trying to analyze by hash {file_hash}') - try: - analysis = FileAnalysis(file_hash=file_hash) - analysis.send() - except errors.HashDoesNotExistError: - _logger.debug(f'hash {file_hash} not found on server, fetching the file from endpoint') - analysis = None - - if not analysis: - analysis = analyze_by_file(threat_id) - analysis.send(requester='s1') - - _logger.debug('waiting for analysis completion') - analysis.wait_for_completion() - _logger.debug('analysis completed') - - send_note(threat_id, analysis) - except Exception as ex: - _logger.exception(f'failed to process threat {threat_id}') - send_failure_note(str(ex), threat_id) - - -def parse_argparse_args(): - parser = argparse.ArgumentParser(description='This script takes the threat file from SentinelOne threat ' - 'and analyze it in Intezer Analyze, the results will be ' - 'pushed to SentinelOne as a threat note.') - - parser.add_argument('-i', '--intezer-api-key', help='Intezer API key', required=True) - parser.add_argument('-s', '--s1-api-key', help='S1 API Key', required=True) - parser.add_argument('-a', '--s1-base-address', help='S1 base address', required=True) - parser.add_argument('-sv', '--skip-ssl-verification', action='store_true', - help='Skipping SSL verification on S1 request') - subparser_options = {} - if sys.version_info >= (3, 7): - subparser_options['required'] = True - - subparsers = parser.add_subparsers(title='valid subcommands', dest='subcommand', **subparser_options) - threat_parser = subparsers.add_parser('threat', help='Get a threat ID and analyze it') - threat_parser.add_argument('threat_id', help='SentinelOne threat id') - query_parser = subparsers.add_parser('query', help='Analyze new incoming threat') - query_parser.add_argument('--since', - help='query threats from certain date in the format YYYY-MM-DD', - type=lambda s: datetime.datetime.strptime(s, '%Y-%m-%d'),) - - return parser.parse_args() - - -def _init_logger(): - _logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s')) - _logger.addHandler(handler) - - -def query_threats(next_time_query: Optional[datetime.datetime]): - next_time_query = next_time_query or datetime.datetime.utcnow() - while True: - _logger.info('checking for new threats...') - response = _s1_session.get('/web/api/v2.1/threats', params={'createdAt__gte': next_time_query.isoformat()}) - next_time_query = datetime.datetime.utcnow() - assert_s1_response(response) - threats = response.json()['data'] - for threat in threats: - analyze_threat(threat['id'], threat) - - if not threats: - _logger.info('no new threats found') - time.sleep(10) - - -if __name__ == '__main__': - _args = parse_argparse_args() - api.set_global_api(_args.intezer_api_key) - init_s1_requests_session(_args.s1_api_key, _args.s1_base_address, _args.skip_ssl_verification) - _init_logger() - if _args.subcommand == 'threat': - analyze_threat(_args.threat_id) - elif _args.subcommand == 'query': - query_threats(_args.since) - else: - print('error: the following arguments are required: subcommand') - sys.exit(1) - diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index 785c3b8..8de33c0 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.24.1' +__version__ = '1.25.0' diff --git a/intezer_sdk/_account_api.py b/intezer_sdk/_account_api.py index 5ddb251..67b8abd 100644 --- a/intezer_sdk/_account_api.py +++ b/intezer_sdk/_account_api.py @@ -1,6 +1,4 @@ from http import HTTPStatus -from typing import List -from typing import Optional from intezer_sdk import errors from intezer_sdk.api import IntezerApiClient @@ -9,7 +7,7 @@ class AccountApi: - def __init__(self, api: Optional[IntezerApiClient]): + def __init__(self, api: IntezerApiClient | None): self.api = api or get_global_api() def get_my_quota(self, raise_on_no_file_quota=False, raise_on_no_endpoint_quota=False) -> dict: @@ -27,7 +25,7 @@ def get_my_account(self) -> dict: raise_for_status(response) return response.json()['result'] - def get_account(self, account_id: str) -> Optional[dict]: + def get_account(self, account_id: str) -> dict | None: response = self.api.request_with_refresh_expired_access_token('GET', f'/accounts/{account_id}') if response.status_code == HTTPStatus.NOT_FOUND: return None @@ -35,7 +33,7 @@ def get_account(self, account_id: str) -> Optional[dict]: raise_for_status(response) return response.json()['result'] - def get_organization_accounts(self) -> List[dict]: + def get_organization_accounts(self) -> list[dict]: response = self.api.request_with_refresh_expired_access_token('GET', f'/accounts') raise_for_status(response) return response.json()['result'] diff --git a/intezer_sdk/_api.py b/intezer_sdk/_api.py index 4d3492c..799efcf 100644 --- a/intezer_sdk/_api.py +++ b/intezer_sdk/_api.py @@ -1,13 +1,9 @@ import io import os from http import HTTPStatus -from typing import IO from typing import Any from typing import BinaryIO -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple +from typing import IO from requests import Response @@ -25,16 +21,16 @@ def __init__(self, api: IntezerApiClient): self.api = api @property - def on_premise_version(self) -> Optional[OnPremiseVersion]: + def on_premise_version(self) -> OnPremiseVersion | None: return self.api.on_premise_version - def _get_tenant_id(self) -> Optional[str]: + def _get_tenant_id(self) -> str | None: return os.environ.get('INTEZER_TENANT_ID') def analyze_by_hash(self, file_hash: str, - disable_dynamic_unpacking: Optional[bool], - disable_static_unpacking: Optional[bool], + disable_dynamic_unpacking: bool | None, + disable_static_unpacking: bool | None, sandbox_command_line_arguments: str = None, sandbox_machine_type: str = None, file_name: str = None, @@ -126,7 +122,7 @@ def analyze_by_file(self, zip_password: str = None, sandbox_command_line_arguments: str = None, sandbox_machine_type: str = None, - **additional_parameters) -> Optional[str]: + **additional_parameters) -> str | None: """ Analyze a file by its path or stream. @@ -160,7 +156,7 @@ def get_latest_analysis(self, file_hash: str, private_only: bool = False, composed_only: bool = None, - **additional_parameters) -> Optional[dict]: + **additional_parameters) -> dict | None: """ Get the latest analysis of a file. @@ -231,7 +227,7 @@ def get_endpoint_analysis_response(self, analyses_id: str, ignore_not_found: boo return response - def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: Optional[List[str]]) -> List[dict]: + def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: list[str] | None) -> list[dict]: """ Get the sub analyses of an endpoint. @@ -249,7 +245,7 @@ def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: Optional[List[st return response.json()['sub_analyses'] - def create_endpoint_scan(self, scanner_info: dict) -> Dict[str, str]: + def create_endpoint_scan(self, scanner_info: dict) -> dict[str, str]: """ Create an endpoint scan. @@ -291,7 +287,7 @@ def send_binary_alert(self, self._assert_alert_response_status_code(response) return response.json()['alert_id'] - def get_iocs(self, analyses_id: str) -> Optional[dict]: + def get_iocs(self, analyses_id: str) -> dict | None: """ Get the IOCs of an analysis. @@ -303,7 +299,7 @@ def get_iocs(self, analyses_id: str) -> Optional[dict]: return response.json()['result'] - def get_detection_result_url(self, analyses_id: str) -> Optional[str]: + def get_detection_result_url(self, analyses_id: str) -> str | None: """ Get the detection result url of an analysis. @@ -317,7 +313,7 @@ def get_detection_result_url(self, analyses_id: str) -> Optional[str]: return response.json()['result_url'] - def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]: + def get_dynamic_ttps(self, analyses_id: str) -> dict | None: """ Get the dynamic TTPs of an analysis. @@ -331,7 +327,7 @@ def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]: return response.json()['result'] - def get_family_info(self, family_id: str) -> Optional[dict]: + def get_family_info(self, family_id: str) -> dict | None: """ Get the family info of a family. @@ -345,7 +341,7 @@ def get_family_info(self, family_id: str) -> Optional[dict]: raise_for_status(response, allowed_statuses=[HTTPStatus.OK]) return response.json()['result'] - def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]: + def get_family_by_name(self, family_name: str) -> dict[str, Any] | None: """ Get the family by name. @@ -361,7 +357,7 @@ def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]: raise_for_status(response, allowed_statuses=[HTTPStatus.OK]) return response.json()['result'] - def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]: + def get_sub_analyses_by_id(self, analysis_id: str) -> list[dict] | None: """ Get the sub analyses of an analysis. @@ -377,7 +373,7 @@ def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]: def get_sub_analysis_code_reuse_by_id(self, composed_analysis_id: str, - sub_analysis_id: str) -> Optional[dict]: + sub_analysis_id: str) -> dict | None: """ Get the code reuse of a sub analysis. @@ -680,7 +676,7 @@ def index_by_file(self, file_path: str, index_as: IndexType, family_name: str = return self._get_index_id_from_response(response) - def get_alerts_by_alert_ids(self, alert_ids: List[str], environments: List[str] = None) -> Dict: + def get_alerts_by_alert_ids(self, alert_ids: list[str], environments: list[str] = None) -> dict: """ Get alerts by alert ids. @@ -692,10 +688,10 @@ def get_alerts_by_alert_ids(self, alert_ids: List[str], environments: List[str] data = dict(alert_ids=alert_ids) if environments: data['environments'] = environments - + if tenant_id := self._get_tenant_id(): data['tenant_id'] = tenant_id - + response = self.api.request_with_refresh_expired_access_token(method='GET', path='/alerts/search', data=data) @@ -704,7 +700,7 @@ def get_alerts_by_alert_ids(self, alert_ids: List[str], environments: List[str] return data_response['result'] - def get_alert_by_alert_id(self, alert_id: str, environment: Optional[str] = None) -> Tuple[Dict, str]: + def get_alert_by_alert_id(self, alert_id: str, environment: str | None = None) -> tuple[dict, str]: """ Get alert by alert id. @@ -715,10 +711,10 @@ def get_alert_by_alert_id(self, alert_id: str, environment: Optional[str] = None data = dict(alert_id=alert_id) if environment: data['environment'] = environment - + if tenant_id := self._get_tenant_id(): data['tenant_id'] = tenant_id - + response = self.api.request_with_refresh_expired_access_token(method='GET', path='/alerts/get-by-id', data=data) @@ -727,7 +723,7 @@ def get_alert_by_alert_id(self, alert_id: str, environment: Optional[str] = None return data_response['result'], data_response['status'] - def get_incident_by_id(self, incident_id: str, environment: Optional[str] = None) -> dict: + def get_incident_by_id(self, incident_id: str, environment: str | None = None) -> dict: data = dict(incident_id=incident_id) if environment: data['environment'] = environment @@ -749,7 +745,7 @@ def get_device_by_id(self, device_id: str) -> dict: return data_response['result'] - def notify_alert(self, alert_id: str, environment: Optional[str] = None) -> dict: + def notify_alert(self, alert_id: str, environment: str | None = None) -> dict: """ Send a notification for an alert. @@ -759,14 +755,14 @@ def notify_alert(self, alert_id: str, environment: Optional[str] = None) -> dict :return: The notification response containing notified_channels. """ self.assert_any_on_premise('notify-alert') - + data = {} if environment: data['environment'] = environment - + if tenant_id := self._get_tenant_id(): data['tenant_id'] = tenant_id - + response = self.api.request_with_refresh_expired_access_token( method='POST', path=f'/alerts/{alert_id}/notify', @@ -787,7 +783,7 @@ def get_index_response(self, index_id: str) -> Response: return response - def analyze_url(self, url: str, **additional_parameters) -> Optional[str]: + def analyze_url(self, url: str, **additional_parameters) -> str | None: """ Analyze a url. @@ -819,7 +815,7 @@ def get_raw_alert_data( """ data = {'environment': environment, 'raw_data_type': raw_data_type} - + if tenant_id := self._get_tenant_id(): data['tenant_id'] = tenant_id @@ -849,6 +845,14 @@ def get_raw_incident_data( raise_for_status(response) return response.json() + def delete_alert(self, alert_id: str, environment:str) -> dict: + data = {'environment': environment} + response = self.api.request_with_refresh_expired_access_token( + method='DELETE', path=f'/alerts/{alert_id}', data=data + ) + raise_for_status(response) + return response.json() + @staticmethod def _assert_result_response(ignore_not_found: bool, response: Response): statuses_to_ignore = [HTTPStatus.NOT_FOUND] if ignore_not_found else None @@ -883,35 +887,38 @@ def _param_initialize(disable_dynamic_unpacking: bool, @staticmethod def _assert_analysis_response_status_code(response: Response): - if response.status_code == HTTPStatus.NOT_FOUND: - raise errors.HashDoesNotExistError(response) - elif response.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE: - raise errors.FileTooLargeError(response) - elif response.status_code == HTTPStatus.CONFLICT: - is_skipped_by_rule = response.json().get('result', {}).get('is_skipped_by_rule') - if is_skipped_by_rule: - raise errors.AnalysisSkippedByRuleError(response) - - running_analysis_id = response.json().get('result', {}).get('analysis_id') - raise errors.AnalysisIsAlreadyRunningError(response, running_analysis_id) - elif response.status_code == HTTPStatus.FORBIDDEN: - raise errors.InsufficientQuotaError(response) - elif response.status_code == HTTPStatus.TOO_MANY_REQUESTS: - raise errors.AnalysisRateLimitError(response) - elif response.status_code == HTTPStatus.BAD_REQUEST: - data = response.json() - error = data.get('error', '') - details = data.get('details', '') - result = data.get('result', {}) - if result.get('is_url_offline'): - raise errors.UrlOfflineError(response) - - if error == 'Invalid url': - raise errors.InvalidUrlError(response) - - raise errors.ServerError(f'Server returned bad request error: {error}, details: {details}', response) - elif response.status_code != HTTPStatus.CREATED: - raise errors.ServerError(f'Error in response status code:{response.status_code}', response) + match response.status_code: + case HTTPStatus.NOT_FOUND: + raise errors.HashDoesNotExistError(response) + case HTTPStatus.REQUEST_ENTITY_TOO_LARGE: + raise errors.FileTooLargeError(response) + case HTTPStatus.CONFLICT: + is_skipped_by_rule = response.json().get('result', {}).get('is_skipped_by_rule') + if is_skipped_by_rule: + raise errors.AnalysisSkippedByRuleError(response) + + running_analysis_id = response.json().get('result', {}).get('analysis_id') + raise errors.AnalysisIsAlreadyRunningError(response, running_analysis_id) + case HTTPStatus.FORBIDDEN: + raise errors.InsufficientQuotaError(response) + case HTTPStatus.TOO_MANY_REQUESTS: + raise errors.AnalysisRateLimitError(response) + case HTTPStatus.BAD_REQUEST: + data = response.json() + error = data.get('error', '') + details = data.get('details', '') + result = data.get('result', {}) + if result.get('is_url_offline'): + raise errors.UrlOfflineError(response) + + if error == 'Invalid url': + raise errors.InvalidUrlError(response) + + raise errors.ServerError(f'Server returned bad request error: {error}, details: {details}', response) + case HTTPStatus.CREATED: + pass + case _: + raise errors.ServerError(f'Error in response status code:{response.status_code}', response) @staticmethod def _assert_index_response_status_code(response: Response): diff --git a/intezer_sdk/_endpoint_analysis_api.py b/intezer_sdk/_endpoint_analysis_api.py index 5d83e68..ee60fb3 100644 --- a/intezer_sdk/_endpoint_analysis_api.py +++ b/intezer_sdk/_endpoint_analysis_api.py @@ -1,6 +1,5 @@ import gzip import logging -from typing import List import requests @@ -74,7 +73,7 @@ def send_file_module_differences(self, file_module_differences: dict): method='POST') raise_for_status(response) - def send_files_info(self, files_info: dict) -> List[str]: + def send_files_info(self, files_info: dict) -> list[str]: """ :param files_info: endpoint scan files info :return: list of file hashes to upload @@ -93,7 +92,7 @@ def send_files_info(self, files_info: dict) -> List[str]: return files_to_upload - def send_memory_module_dump_info(self, memory_modules_info: dict) -> List[str]: + def send_memory_module_dump_info(self, memory_modules_info: dict) -> list[str]: """ :param memory_modules_info: endpoint scan memory modules info :return: list of file hashes to upload diff --git a/intezer_sdk/_operation.py b/intezer_sdk/_operation.py index 5454113..f2f9ffd 100644 --- a/intezer_sdk/_operation.py +++ b/intezer_sdk/_operation.py @@ -1,18 +1,15 @@ import datetime -from typing import Dict -from typing import Optional -from typing import Union from intezer_sdk._api import IntezerApi from intezer_sdk.operation import Operation -def handle_operation(operations: Dict[str, Operation], +def handle_operation(operations: dict[str, Operation], api: IntezerApi, operation: str, result_url: str, - wait: Union[bool, int], - wait_timeout: Optional[datetime.timedelta]) -> Operation: + wait: bool | int, + wait_timeout: datetime.timedelta | None) -> Operation: if operation not in operations: operations[operation] = Operation(result_url, operation, api=api.api) @@ -25,4 +22,4 @@ def handle_operation(operations: Dict[str, Operation], sleep_before_first_check=True, wait_timeout=wait_timeout) - return operations[operation] \ No newline at end of file + return operations[operation] diff --git a/intezer_sdk/account.py b/intezer_sdk/account.py index 2cfca66..9a6245c 100644 --- a/intezer_sdk/account.py +++ b/intezer_sdk/account.py @@ -1,6 +1,4 @@ import datetime -from typing import List -from typing import Optional from intezer_sdk import consts from intezer_sdk._account_api import AccountApi @@ -21,23 +19,23 @@ def name(self) -> str: return self.details['account_name'] @property - def email(self) -> Optional[str]: + def email(self) -> str | None: return self.details['account_email'] if 'account_email' else None @property - def created_time(self) -> Optional[datetime.datetime]: + def created_time(self) -> datetime.datetime | None: if 'created_time' in self.details: return datetime.datetime.strptime(self.details['created_time'], consts.DEFAULT_DATE_FORMAT) return None @property - def last_sign_in_time(self) -> Optional[datetime.datetime]: + def last_sign_in_time(self) -> datetime.datetime | None: if 'last_sign_in_time' in self.details: return datetime.datetime.strptime(self.details['last_sign_in_time'], consts.DEFAULT_DATE_FORMAT) return None @classmethod - def from_account_id(cls, account_id: str, api: IntezerApiClient = None) -> Optional['Account']: + def from_account_id(cls, account_id: str, api: IntezerApiClient = None) -> 'Account | None': """ Get details about an account. @@ -62,7 +60,7 @@ def from_myself(cls, api: IntezerApiClient = None) -> 'Account': return cls(account_details['account_id'], account_details, api=api) @classmethod - def get_organization_account(cls, api: IntezerApiClient = None) -> List['Account']: + def get_organization_account(cls, api: IntezerApiClient = None) -> list['Account']: """ Get all accounts in the organization. diff --git a/intezer_sdk/alerts.py b/intezer_sdk/alerts.py index 171d507..0e36b6a 100644 --- a/intezer_sdk/alerts.py +++ b/intezer_sdk/alerts.py @@ -5,12 +5,6 @@ from io import BytesIO from typing import Any from typing import BinaryIO -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple -from typing import Type -from typing import Union import requests @@ -24,6 +18,7 @@ from intezer_sdk.api import get_global_api from intezer_sdk.consts import AlertStatusCode from intezer_sdk.endpoint_analysis import EndpointAnalysis +from intezer_sdk.operation import Operation from intezer_sdk.util import add_filter DEFAULT_LIMIT = 100 @@ -31,9 +26,9 @@ ALERTS_SEARCH_REQUEST = '/alerts/search' -def get_alerts_by_alert_ids(alert_ids: List[str], - environments: List[str] = None, - api: IntezerApi = None) -> Tuple[int, List[dict]]: +def get_alerts_by_alert_ids(alert_ids: list[str], + environments: list[str] = None, + api: IntezerApi = None) -> tuple[int, list[dict]]: """ Get alerts by alert ids. @@ -50,23 +45,23 @@ def get_alerts_by_alert_ids(alert_ids: List[str], def generate_alerts_history_search_filters(*, start_time: datetime.datetime = None, end_time: datetime.datetime = None, - environments: List[str] = None, + environments: list[str] = None, offset: int = None, limit: int = None, - sources: List[str] = None, - risk_categories: List[str] = None, - alert_verdicts: List[str] = None, - family_names: List[str] = None, - response_statuses: List[str] = None, - hostnames: List[str] = None, + sources: list[str] = None, + risk_categories: list[str] = None, + alert_verdicts: list[str] = None, + family_names: list[str] = None, + response_statuses: list[str] = None, + hostnames: list[str] = None, free_text: str = None, site_name: str = None, account_name: str = None, - exclude_alert_ids: List[str] = None, - usernames: List[str] = None, - file_hashes: List[str] = None, - process_commandlines: List[str] = None, - sort_by: List[str] = None, + exclude_alert_ids: list[str] = None, + usernames: list[str] = None, + file_hashes: list[str] = None, + process_commandlines: list[str] = None, + sort_by: list[str] = None, is_mitigated: bool = None, email_sender: str = None, email_recipient: str = None, @@ -75,14 +70,14 @@ def generate_alerts_history_search_filters(*, email_bcc: str = None, email_message_id: str = None, email_reported_by: str = None, - device_private_ips: List[str] = None, - device_external_ips: List[str] = None, - device_ids: List[str] = None, - time_filter_type: List[str] = None, + device_private_ips: list[str] = None, + device_external_ips: list[str] = None, + device_ids: list[str] = None, + time_filter_type: list[str] = None, sort_order: str = None, - ips: List[str] = None, - domains: List[str] = None, - incident_ids: List[str] = None) -> Dict[str, Any]: + ips: list[str] = None, + domains: list[str] = None, + incident_ids: list[str] = None) -> dict[str, Any]: filters = {} if start_time: filters['start_time'] = start_time.timestamp() @@ -129,23 +124,23 @@ def query_alerts_history(*, start_time: datetime.datetime = None, end_time: datetime.datetime = None, api: IntezerApiClient = None, - environments: List[str] = None, + environments: list[str] = None, offset: int = DEFAULT_OFFSET, limit: int = DEFAULT_LIMIT, - sources: List[str] = None, - risk_categories: List[str] = None, - alert_verdicts: List[str] = None, - family_names: List[str] = None, - response_statuses: List[str] = None, - hostnames: List[str] = None, + sources: list[str] = None, + risk_categories: list[str] = None, + alert_verdicts: list[str] = None, + family_names: list[str] = None, + response_statuses: list[str] = None, + hostnames: list[str] = None, free_text: str = None, site_name: str = None, account_name: str = None, - exclude_alert_ids: List[str] = None, - usernames: List[str] = None, - file_hashes: List[str] = None, - process_commandlines: List[str] = None, - sort_by: List[str] = None, + exclude_alert_ids: list[str] = None, + usernames: list[str] = None, + file_hashes: list[str] = None, + process_commandlines: list[str] = None, + sort_by: list[str] = None, is_mitigated: bool = None, email_sender: str = None, email_recipient: str = None, @@ -154,14 +149,14 @@ def query_alerts_history(*, email_bcc: str = None, email_message_id: str = None, email_reported_by: str = None, - device_private_ips: List[str] = None, - device_external_ips: List[str] = None, - device_ids: List[str] = None, - time_filter_type: List[str] = None, + device_private_ips: list[str] = None, + device_external_ips: list[str] = None, + device_ids: list[str] = None, + time_filter_type: list[str] = None, sort_order: str = None, - ips: List[str] = None, - domains: List[str] = None, - incident_ids: List[str] = None) -> AlertsHistoryResult: + ips: list[str] = None, + domains: list[str] = None, + incident_ids: list[str] = None) -> AlertsHistoryResult: """ Query for alerts history with query param. @@ -249,7 +244,7 @@ def query_alerts_history(*, class Alert: """ - The Alert class is used to represent an alert from the Intezer Analyze API. + The Alert class is used to represent an alert from the Intezer Platform API. :ivar alert_id: The alert id. :vartype alert_id: str @@ -268,13 +263,13 @@ class Alert: """ def __init__(self, - alert_id: Optional[str] = None, - environment: Optional[str] = None, - alert_stream: Optional[BinaryIO] = None, + alert_id: str | None = None, + environment: str | None = None, + alert_stream: BinaryIO | None = None, api: IntezerApiClient = None): """ Create a new Alert instance with the given alert id. - Please note that this does not query the Intezer Analyze API for the alert data, but rather creates an Alert + Please note that this does not query the Intezer Platform API for the alert data, but rather creates an Alert instance with the given alert id. :param alert_id: The alert id. @@ -298,14 +293,14 @@ def __init__(self, self.environment = environment self._intezer_api_client = api self._api = IntezerApi(api or get_global_api()) - self._report: Optional[Dict] = None - self.source: Optional[str] = None - self.verdict: Optional[str] = None - self.family_name: Optional[str] = None - self.sender: Optional[str] = None - self.intezer_alert_url: Optional[str] = None - self.status: Optional[AlertStatusCode] = None - self.scans: List[Union[UrlAnalysis, FileAnalysis, EndpointAnalysis]] = [] + self._report: dict | None = None + self.source: str | None = None + self.verdict: str | None = None + self.family_name: str | None = None + self.sender: str | None = None + self.intezer_alert_url: str | None = None + self.status: AlertStatusCode | None = None + self.scans: list[UrlAnalysis | FileAnalysis | EndpointAnalysis] = [] @classmethod def _parse_alert_id_from_alert_stream(cls, alert_stream: BinaryIO) -> str: @@ -316,7 +311,7 @@ def _parse_alert_id_from_alert_stream(cls, alert_stream: BinaryIO) -> str: def check_status(self) -> AlertStatusCode: """ - Refresh the alert data from the Intezer Analyze API - overrides current data (if exists) with the new data. + Refresh the alert data from the Intezer Platform API - overrides current data (if exists) with the new data. :return: The updated status of the alert. @@ -348,7 +343,7 @@ def is_running(self) -> bool: def result(self) -> dict: """ - Get the raw alert result, as received from Intezer Analyze API. + Get the raw alert result, as received from Intezer Platform API. :raises intezer_sdk.errors.AlertNotFound: If the alert was not found. :raises intezer_sdk.errors.AlertInProgressError: If the alert is in progress @@ -363,13 +358,13 @@ def result(self) -> dict: @classmethod def from_id(cls, alert_id: str, - environment: Optional[str] = None, + environment: str | None = None, api: IntezerApiClient = None, fetch_scans: bool = False, wait: bool = False, - timeout: Optional[int] = None): + timeout: int | None = None): """ - Create a new Alert instance, and fetch the alert data from the Intezer Analyze API. + Create a new Alert instance, and fetch the alert data from the Intezer Platform API. :param alert_id: The alert id. :param environment: The environment of the alert. @@ -397,15 +392,15 @@ def send(cls, alert_mapping: dict, source: str, api: IntezerApiClient = None, - environment: Optional[str] = None, - display_fields: Optional[List[str]] = None, - default_verdict: Optional[str] = None, - alert_sender: Optional[str] = None, + environment: str | None = None, + display_fields: list[str] | None = None, + default_verdict: str | None = None, + alert_sender: str | None = None, wait: bool = False, - timeout: Optional[int] = None, + timeout: int | None = None, ): """ - Send an alert for further investigation using the Intezer Analyze API. + Send an alert for further investigation using the Intezer Platform API. :param raw_alert: The raw alert data. :param alert_mapping: The alert mapping - defines how to map the raw alert to get relevant information. @@ -442,18 +437,18 @@ def send(cls, @classmethod def send_phishing_email(cls, - raw_email: Optional[BinaryIO] = None, - api: Optional[IntezerApiClient] = None, - environment: Optional[str] = None, - default_verdict: Optional[str] = None, - alert_sender: Optional[str] = None, + raw_email: BinaryIO | None = None, + api: IntezerApiClient | None = None, + environment: str | None = None, + default_verdict: str | None = None, + alert_sender: str | None = None, wait: bool = False, - timeout: Optional[int] = None, - email_path: Optional[str] = None, - additional_info: Optional[dict] = None, - zip_password: Optional[str] = None): + timeout: int | None = None, + email_path: str | None = None, + additional_info: dict | None = None, + zip_password: str | None = None): """ - Send an alert for further investigation using the Intezer Analyze API. + Send an alert for further investigation using the Intezer Platform API. Should pass either raw_email or email_path. :param raw_email: The raw alert data. @@ -502,7 +497,7 @@ def send_phishing_email(cls, def wait_for_completion(self, interval: int = None, sleep_before_first_check=False, - timeout: Optional[datetime.timedelta] = None): + timeout: datetime.timedelta | None = None): """ Blocks until the alert is finished processing, or until the timeout is reached. @@ -539,7 +534,7 @@ def fetch_scans(self): def _fetch_scan(scan_: dict, scan_key: str, - scan_object: Union[Type[FileAnalysis], Type[EndpointAnalysis], Type[UrlAnalysis]]): + scan_object: type[FileAnalysis] | type[EndpointAnalysis] | type[UrlAnalysis]): current_analysis_id = scan_.get(scan_key, {}).get('analysis_id') if current_analysis_id: self.scans.append(scan_object.from_analysis_id(analysis_id=current_analysis_id, @@ -556,7 +551,7 @@ def _fetch_scan(scan_: dict, _fetch_scan(scan, 'url_analysis', UrlAnalysis) def get_raw_data(self, - environment: Optional[str] = None, + environment: str | None = None, raw_data_type: str = 'raw_alert') -> dict: """ Get raw alert data. @@ -574,7 +569,7 @@ def get_raw_data(self, raw_data_type=raw_data_type ) - def notify(self) -> List[str]: + def notify(self) -> list[str]: """ Send a notification for this alert. @@ -590,3 +585,11 @@ def notify(self) -> List[str]: response = self._api.notify_alert(self.alert_id, self.environment) return response.get('notified_channels', []) + + def delete(self, wait_for_completion: bool = False) -> Operation: + result = self._api.delete_alert(self.alert_id, self.environment) + operation = Operation(result['result_url'], 'Delete Alert', self._intezer_api_client) + if wait_for_completion: + operation.wait_for_completion() + + return operation diff --git a/intezer_sdk/alerts_results.py b/intezer_sdk/alerts_results.py index 046ab21..4673be7 100644 --- a/intezer_sdk/alerts_results.py +++ b/intezer_sdk/alerts_results.py @@ -1,6 +1,3 @@ -from typing import List -from typing import Dict -from typing import Tuple from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import raise_for_status @@ -9,13 +6,13 @@ class AlertsHistoryResult(HistoryResult): - def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: dict): """ Fetch all alerts history results from server. """ super().__init__(request_url_path, api, filters) - def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: dict) -> tuple[int, list]: """ Request from server filtered alerts history. :param url_path: Url to request new data from. diff --git a/intezer_sdk/analyses_history.py b/intezer_sdk/analyses_history.py index 0d288b2..b6e4c71 100644 --- a/intezer_sdk/analyses_history.py +++ b/intezer_sdk/analyses_history.py @@ -1,7 +1,5 @@ import datetime from typing import Any -from typing import Dict -from typing import List from intezer_sdk.analyses_results import AnalysesHistoryResult from intezer_sdk.api import IntezerApiClient @@ -19,10 +17,10 @@ def query_file_analyses_history(*, end_date: datetime.datetime, api: IntezerApiClient = None, aggregated_view: bool = None, - sources: List[str] = None, - verdicts: List[str] = None, + sources: list[str] = None, + verdicts: list[str] = None, file_hash: str = None, - family_names: List[str] = None, + family_names: list[str] = None, file_name: str = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET @@ -68,9 +66,9 @@ def query_endpoint_analyses_history(*, end_date: datetime.datetime, api: IntezerApiClient = None, aggregated_view: bool = None, - sources: List[str] = None, - verdicts: List[str] = None, - computer_names: List[str] = None, + sources: list[str] = None, + verdicts: list[str] = None, + computer_names: list[str] = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET ) -> AnalysesHistoryResult: @@ -107,9 +105,9 @@ def query_url_analyses_history(*, start_date: datetime.datetime, end_date: datetime.datetime, api: IntezerApiClient = None, - sources: List[str] = None, - verdicts: List[str] = None, - sub_verdicts: List[str] = None, + sources: list[str] = None, + verdicts: list[str] = None, + sub_verdicts: list[str] = None, did_download_file: bool = None, submitted_url: str = None, scanned_url: str = None, @@ -170,12 +168,12 @@ def generate_analyses_history_filter(*, start_date: datetime.datetime, end_date: datetime.datetime, aggregated_view: bool = None, - sources: List[str] = None, - verdicts: List[str] = None, - computer_names: List[str] = None, + sources: list[str] = None, + verdicts: list[str] = None, + computer_names: list[str] = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET - ) -> Dict[str, Any]: + ) -> dict[str, Any]: base_filter = { 'start_date': int(start_date.timestamp()), 'end_date': int(end_date.timestamp()), diff --git a/intezer_sdk/analyses_results.py b/intezer_sdk/analyses_results.py index 6e0ae53..bbf78aa 100644 --- a/intezer_sdk/analyses_results.py +++ b/intezer_sdk/analyses_results.py @@ -1,6 +1,3 @@ -from typing import List -from typing import Dict -from typing import Tuple from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import raise_for_status @@ -8,13 +5,13 @@ class AnalysesHistoryResult(HistoryResult): - def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: dict): """ Fetch all analyses history results from server. """ super().__init__(request_url_path, api, filters) - def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: dict) -> tuple[int, list]: """ Request from server filtered analyses history. :param url_path: Url to request new data from. diff --git a/intezer_sdk/analysis.py b/intezer_sdk/analysis.py index 7dea4dd..4e91f7c 100644 --- a/intezer_sdk/analysis.py +++ b/intezer_sdk/analysis.py @@ -5,9 +5,6 @@ from http import HTTPStatus from typing import BinaryIO from typing import IO -from typing import Optional -from typing import Union -from typing import List import requests from requests import Response @@ -91,7 +88,7 @@ def __init__(self, self._zip_password = zip_password self._sandbox_command_line_arguments = sandbox_command_line_arguments self._sandbox_machine_type = sandbox_machine_type - self._sub_analyses: List[SubAnalysis] = None + self._sub_analyses: list[SubAnalysis] = None self._root_analysis = None self._iocs_report = None self._dynamic_ttps_report = None @@ -108,7 +105,7 @@ def __init__(self, self._file_name = 'file.zip' @classmethod - def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> Optional['FileAnalysis']: + def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> 'FileAnalysis | None': """ Returns a FileAnalysis instance with the given analysis ID. Returns None when analysis doesn't exist. @@ -127,7 +124,7 @@ def from_latest_hash_analysis(cls, private_only: bool = False, composed_only: bool = None, days_threshold_for_latest_analysis: int = None, - **additional_parameters) -> Optional['FileAnalysis']: + **additional_parameters) -> 'FileAnalysis | None': """ Returns the latest FileAnalysis instance for the given file hash, with the option to filter by private analyses only. Returns None when analysis doesn't exist. @@ -200,7 +197,7 @@ def _send_analyze_to_api(self, **additional_parameters) -> str: self._file_stream = None return analysis_id - def get_sub_analyses(self) -> List[SubAnalysis]: + def get_sub_analyses(self) -> list[SubAnalysis]: """ Get a list of sub analysis. @@ -266,8 +263,8 @@ def iocs(self) -> dict: return self._iocs_report def get_detections(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> Optional[operation.Operation]: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation | None: """ Gets the detection report :data:`intezer_sdk.operation.Operation` related to specific analysis. @@ -324,17 +321,17 @@ def sub_verdict(self) -> str: def get_latest_analysis(file_hash: str, api: IntezerApi = None, private_only: bool = False, - **additional_parameters) -> Optional[FileAnalysis]: + **additional_parameters) -> FileAnalysis | None: return FileAnalysis.from_latest_hash_analysis(file_hash, api, private_only, **additional_parameters) @deprecated('This method is deprecated, use FileAnalysis.from_analysis_by_id instead to be explict') -def get_file_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[FileAnalysis]: +def get_file_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> FileAnalysis | None: return FileAnalysis.from_analysis_id(analysis_id, api) @deprecated('This method is deprecated, use FileAnalysis.from_analysis_by_id instead to be explict') -def get_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[FileAnalysis]: +def get_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> FileAnalysis | None: return get_file_analysis_by_id(analysis_id, api) def _clean_url(url: str) -> str: @@ -361,7 +358,7 @@ class UrlAnalysis(Analysis): :vartype url: str """ - def __init__(self, url: Optional[str] = None, api: IntezerApiClient = None): + def __init__(self, url: str | None = None, api: IntezerApiClient = None): """ UrlAnalysis is a class for analyzing URLs. @@ -371,10 +368,10 @@ def __init__(self, url: Optional[str] = None, api: IntezerApiClient = None): super().__init__(api) self._api.assert_any_on_premise('query-url-history') self.url = url - self._file_analysis: Optional[FileAnalysis] = None + self._file_analysis: FileAnalysis | None = None @classmethod - def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> Optional['UrlAnalysis']: + def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> 'UrlAnalysis | None': """ Returns a UrlAnalysis instance with the given analysis ID. Returns None when analysis doesn't exist. @@ -391,7 +388,7 @@ def from_latest_analysis(cls, url: str, days_threshold_for_latest_analysis: int = 1, api: IntezerApiClient = None, - exact_match: bool = False) -> Optional['UrlAnalysis']: + exact_match: bool = False) -> 'UrlAnalysis | None': """ Returns a UrlAnalysis instance with the latest analysis of the given URL. Note: For more control over the query (beyond the submitted URL), use the 'query_url_analyses_history' method. @@ -448,7 +445,7 @@ def _send_analyze_to_api(self, **additional_parameters) -> str: return self._api.analyze_url(self.url, **additional_parameters) @property - def downloaded_file_analysis(self) -> Optional[FileAnalysis]: + def downloaded_file_analysis(self) -> FileAnalysis | None: """ In case the url downloaded a file, returns the downloaded file analysis, otherwise, None. """ @@ -469,5 +466,5 @@ def downloaded_file_analysis(self) -> Optional[FileAnalysis]: @deprecated('This method is deprecated, use UrlAnalysis.from_analysis_by_id instead to be explict') -def get_url_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[UrlAnalysis]: +def get_url_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> UrlAnalysis | None: return UrlAnalysis.from_analysis_id(analysis_id, api) diff --git a/intezer_sdk/api.py b/intezer_sdk/api.py index 48876d7..681f99b 100644 --- a/intezer_sdk/api.py +++ b/intezer_sdk/api.py @@ -4,11 +4,8 @@ from http import HTTPStatus from typing import Any from typing import BinaryIO -from typing import Dict from typing import IO -from typing import List -from typing import Optional -from typing import Union +from typing import TypeAlias import requests import requests.adapters @@ -20,13 +17,13 @@ from intezer_sdk.consts import IndexType from intezer_sdk.consts import OnPremiseVersion -_global_api: Optional['IntezerApi'] = None +_global_api: 'IntezerApi | None' = None logger = logging.getLogger(__name__) def raise_for_status(response: requests.Response, - statuses_to_ignore: List[Union[HTTPStatus, int]] = None, - allowed_statuses: List[Union[HTTPStatus, int]] = None): + statuses_to_ignore: list[HTTPStatus | int] = None, + allowed_statuses: list[HTTPStatus | int] = None): """Raises stored :class:`HTTPError`, if one occurred.""" try: response_json = response.json() @@ -74,19 +71,19 @@ def __init__(self, api_key: str = None, base_url: str = None, verify_ssl: bool = True, - proxies: Dict[str, str] = None, + proxies: dict[str, str] = None, on_premise_version: OnPremiseVersion = None, user_agent: str = None, renew_token_window=20, max_retry=3, - timeout_in_seconds: Optional[int] = None): + timeout_in_seconds: int | None = None): self.full_url = base_url + api_version self.base_url = base_url self._proxies = proxies self.api_key = api_key self._access_token = None self._renew_token_window = renew_token_window - self._token_expiration: Optional[int] = None + self._token_expiration: int | None = None self._session = None self.verify_ssl = verify_ssl self.on_premise_version = on_premise_version @@ -106,7 +103,7 @@ def _request(self, files: dict = None, stream: bool = None, base_url: str = None, - timeout_in_seconds: Optional[int] = None) -> Response: + timeout_in_seconds: int | None = None) -> Response: if not self._session: self._set_session() @@ -122,7 +119,7 @@ def _request(self, stream=stream, timeout=timeout_in_seconds or self.timeout_in_seconds ) - elif isinstance(data, (bytes, str)): + elif isinstance(data, bytes | str): response = self._session.request( method, url, @@ -158,7 +155,7 @@ def request_with_refresh_expired_access_token(self, files: dict = None, stream: bool = None, base_url: str = None, - timeout_in_seconds: Optional[int] = None) -> Response: + timeout_in_seconds: int | None = None) -> Response: for retry_count in range(self.max_retry): try: self._refresh_token_if_needed() @@ -234,8 +231,8 @@ def __init__(self, verify_ssl: bool = True, on_premise_version: OnPremiseVersion = None, user_agent: str = None, - proxies: Dict[str, str] = None, - timeout_in_seconds: Optional[int] = None, + proxies: dict[str, str] = None, + timeout_in_seconds: int | None = None, max_retry:int = 3): super().__init__(api_key=api_key, base_url=base_url, @@ -250,8 +247,8 @@ def __init__(self, @deprecated('IntezerApi is deprecated and will be removed in the future') def analyze_by_hash(self, file_hash: str, - disable_dynamic_unpacking: Optional[bool], - disable_static_unpacking: Optional[bool], + disable_dynamic_unpacking: bool | None, + disable_static_unpacking: bool | None, sandbox_command_line_arguments: str = None, sandbox_machine_type: str = None, **additional_parameters) -> str: @@ -314,7 +311,7 @@ def analyze_by_file(self, zip_password: str = None, sandbox_command_line_arguments: str = None, sandbox_machine_type: str = None, - **additional_parameters) -> Optional[str]: + **additional_parameters) -> str | None: options = self._param_initialize(disable_dynamic_unpacking=disable_dynamic_unpacking, disable_static_unpacking=disable_static_unpacking, code_item_type=code_item_type, @@ -333,7 +330,7 @@ def analyze_by_file(self, def get_latest_analysis(self, file_hash: str, private_only: bool = False, - **additional_parameters) -> Optional[dict]: + **additional_parameters) -> dict | None: if not self.on_premise_version or self.on_premise_version > OnPremiseVersion.V21_11: options = {'should_get_only_private_analysis': private_only, **additional_parameters} @@ -380,7 +377,7 @@ def get_endpoint_analysis_response(self, analyses_id: str, ignore_not_found: boo return response @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: Optional[List[str]]) -> List[dict]: + def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: list[str] | None) -> list[dict]: data = dict(verdicts=verdicts) if verdicts is not None else None response = self.request_with_refresh_expired_access_token(path=f'/endpoint-analyses/{analyses_id}/sub-analyses', method='GET', @@ -390,7 +387,7 @@ def get_endpoint_sub_analyses(self, analyses_id: str, verdicts: Optional[List[st return response.json()['sub_analyses'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def create_endpoint_scan(self, scanner_info: dict) -> Dict[str, str]: + def create_endpoint_scan(self, scanner_info: dict) -> dict[str, str]: if not self.on_premise_version or self.on_premise_version > OnPremiseVersion.V22_10: scanner_info['scan_type'] = consts.SCAN_TYPE_OFFLINE_ENDPOINT_SCAN response = self.request_with_refresh_expired_access_token(path='scans', @@ -402,7 +399,7 @@ def create_endpoint_scan(self, scanner_info: dict) -> Dict[str, str]: return response.json()['result'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_iocs(self, analyses_id: str) -> Optional[dict]: + def get_iocs(self, analyses_id: str) -> dict | None: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/iocs'.format(analyses_id), method='GET') raise_for_status(response) @@ -410,7 +407,7 @@ def get_iocs(self, analyses_id: str) -> Optional[dict]: return response.json()['result'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_detection_result_url(self, analyses_id: str) -> Optional[str]: + def get_detection_result_url(self, analyses_id: str) -> str | None: response = self.request_with_refresh_expired_access_token(path=f'/analyses/{analyses_id}/detect', method='GET') if response.status_code == HTTPStatus.CONFLICT: @@ -420,7 +417,7 @@ def get_detection_result_url(self, analyses_id: str) -> Optional[str]: return response.json()['result_url'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]: + def get_dynamic_ttps(self, analyses_id: str) -> dict | None: self.assert_on_premise_above_v21_11() response = self.request_with_refresh_expired_access_token(path='/analyses/{}/dynamic-ttps'.format(analyses_id), method='GET') @@ -429,7 +426,7 @@ def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]: return response.json()['result'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_family_info(self, family_id: str) -> Optional[dict]: + def get_family_info(self, family_id: str) -> dict | None: response = self.request_with_refresh_expired_access_token('GET', '/families/{}/info'.format(family_id)) if response.status_code == HTTPStatus.NOT_FOUND: return None @@ -438,7 +435,7 @@ def get_family_info(self, family_id: str) -> Optional[dict]: return response.json()['result'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]: + def get_family_by_name(self, family_name: str) -> dict[str, Any] | None: response = self.request_with_refresh_expired_access_token('GET', '/families', {'family_name': family_name}) if response.status_code == HTTPStatus.NOT_FOUND: return None @@ -447,7 +444,7 @@ def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]: return response.json()['result'] @deprecated('IntezerApi is deprecated and will be removed in the future') - def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]: + def get_sub_analyses_by_id(self, analysis_id: str) -> list[dict] | None: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses'.format(analysis_id), method='GET') raise_for_status(response) @@ -457,7 +454,7 @@ def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]: @deprecated('IntezerApi is deprecated and will be removed in the future') def get_sub_analysis_code_reuse_by_id(self, composed_analysis_id: str, - sub_analysis_id: str) -> Optional[dict]: + sub_analysis_id: str) -> dict | None: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses/{}/code-reuse' .format(composed_analysis_id, sub_analysis_id), method='GET') @@ -631,7 +628,7 @@ def get_index_response(self, index_id: str) -> Response: return response @deprecated('IntezerApi is deprecated and will be removed in the future') - def analyze_url(self, url: str, **additional_parameters) -> Optional[str]: + def analyze_url(self, url: str, **additional_parameters) -> str | None: self.assert_any_on_premise() response = self.request_with_refresh_expired_access_token(method='POST', path='/url', @@ -728,7 +725,7 @@ def set_global_api(api_key: str = None, base_url: str = None, verify_ssl: bool = True, on_premise_version: OnPremiseVersion = None, - proxies: Dict[str, str] = None) -> IntezerApiClient: + proxies: dict[str, str] = None) -> IntezerApiClient: """ Configure the global api @@ -763,4 +760,4 @@ def set_global_api_custom_instance(api: IntezerApiClient) -> IntezerApiClient: return _global_api -IntezerProxy = IntezerApiClient +IntezerProxy: TypeAlias = IntezerApiClient diff --git a/intezer_sdk/base_analysis.py b/intezer_sdk/base_analysis.py index 42e4171..81f956b 100644 --- a/intezer_sdk/base_analysis.py +++ b/intezer_sdk/base_analysis.py @@ -3,9 +3,6 @@ import time from http import HTTPStatus from typing import Any -from typing import Dict -from typing import Optional -from typing import Union from requests import Response @@ -26,12 +23,12 @@ def __init__(self, api: IntezerApiClient = None): """ :param api: The API connection to Intezer. """ - self.status: Optional[consts.AnalysisStatusCode] = None - self.analysis_id: Optional[str] = None - self.analysis_time: Optional[datetime.datetime] = None + self.status: consts.AnalysisStatusCode | None = None + self.analysis_id: str | None = None + self.analysis_time: datetime.datetime | None = None self._api = IntezerApi(api or get_global_api()) - self._report: Optional[Dict[str, Any]] = None - self._send_time: Optional[datetime.datetime] = None + self._report: dict[str, Any] | None = None + self._send_time: datetime.datetime | None = None @abc.abstractmethod def _query_status_from_api(self) -> Response: @@ -53,7 +50,7 @@ def verdict(self) -> str: def wait_for_completion(self, interval: int = None, sleep_before_first_check=False, - timeout: Optional[datetime.timedelta] = None): + timeout: datetime.timedelta | None = None): """ Blocks until the analysis is completed. @@ -88,7 +85,7 @@ def is_analysis_running(self) -> bool: consts.AnalysisStatusCode.QUEUED) @property - def running_analysis_duration(self) -> Optional[datetime.timedelta]: + def running_analysis_duration(self) -> datetime.timedelta | None: """ Returns the time elapsed from the analysis sending and now. Returns None when the analysis finished. @@ -171,8 +168,8 @@ def _send_analyze_to_api(self, **additional_parameters) -> str: raise NotImplementedError() def send(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None, + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None, **additional_parameters) -> None: if self.analysis_id: raise errors.AnalysisHasAlreadyBeenSentError() diff --git a/intezer_sdk/consts.py b/intezer_sdk/consts.py index 01d1f4d..70ee0b7 100644 --- a/intezer_sdk/consts.py +++ b/intezer_sdk/consts.py @@ -79,12 +79,13 @@ class IndexType(AutoName): @staticmethod def from_str(label): - if label in ('TRUSTED', 'trusted'): - return IndexType.TRUSTED - elif label in ('MALICIOUS', 'malicious'): - return IndexType.MALICIOUS - else: - raise NotImplementedError + match label: + case 'TRUSTED' | 'trusted': + return IndexType.TRUSTED + case 'MALICIOUS' | 'malicious': + return IndexType.MALICIOUS + case _: + raise NotImplementedError class CodeItemType(AutoName): @@ -105,4 +106,4 @@ class OnPremiseVersion(enum.IntEnum): CHECK_STATUS_INTERVAL = 1 SCAN_TYPE_OFFLINE_ENDPOINT_SCAN = 'offline_endpoint_scan' SCAN_DEFAULT_MAX_WORKERS = 1 -SCAN_MAX_UPLOAD_RETRIES = 3 \ No newline at end of file +SCAN_MAX_UPLOAD_RETRIES = 3 diff --git a/intezer_sdk/devices.py b/intezer_sdk/devices.py index 7f5ead1..3484857 100644 --- a/intezer_sdk/devices.py +++ b/intezer_sdk/devices.py @@ -1,9 +1,6 @@ import datetime from typing import Any -from typing import Dict -from typing import List from typing import Literal -from typing import Optional from requests import HTTPError @@ -20,25 +17,25 @@ def generate_devices_history_search_filters(*, - device_ids: List[str] = None, - environments: List[str] = None, + device_ids: list[str] = None, + environments: list[str] = None, offset: int = None, limit: int = None, time_range_start: datetime.datetime = None, time_range_end: datetime.datetime = None, - external_ips: List[str] = None, - host_groups: List[str] = None, - host_tags: List[str] = None, - hostnames: List[str] = None, - managed_by: List[str] = None, - os_names: List[str] = None, - os_versions: List[str] = None, - private_ips: List[str] = None, - cloud_providers: List[str] = None, - host_types: List[str] = None, - last_login_users: List[str] = None, - site_names: List[str] = None, - include_raw_device: bool = None) -> Dict[str, Any]: + external_ips: list[str] = None, + host_groups: list[str] = None, + host_tags: list[str] = None, + hostnames: list[str] = None, + managed_by: list[str] = None, + os_names: list[str] = None, + os_versions: list[str] = None, + private_ips: list[str] = None, + cloud_providers: list[str] = None, + host_types: list[str] = None, + last_login_users: list[str] = None, + site_names: list[str] = None, + include_raw_device: bool = None) -> dict[str, Any]: filters = {} time_range_start_timestamp = int(time_range_start.timestamp()) if time_range_start else None time_range_end_timestamp = int(time_range_end.timestamp()) if time_range_end else None @@ -69,24 +66,24 @@ def generate_devices_history_search_filters(*, def query_devices_history(*, api: IntezerApiClient = None, search_mode: Literal['and', 'or'] = 'and', - device_ids: List[str] = None, - environments: List[str] = None, + device_ids: list[str] = None, + environments: list[str] = None, offset: int = None, limit: int = None, time_range_start: datetime.datetime = None, time_range_end: datetime.datetime = None, - external_ips: List[str] = None, - host_groups: List[str] = None, - host_tags: List[str] = None, - hostnames: List[str] = None, - managed_by: List[str] = None, - os_names: List[str] = None, - os_versions: List[str] = None, - private_ips: List[str] = None, - cloud_providers: List[str] = None, - host_types: List[str] = None, - last_login_users: List[str] = None, - site_names: List[str] = None, + external_ips: list[str] = None, + host_groups: list[str] = None, + host_tags: list[str] = None, + hostnames: list[str] = None, + managed_by: list[str] = None, + os_names: list[str] = None, + os_versions: list[str] = None, + private_ips: list[str] = None, + cloud_providers: list[str] = None, + host_types: list[str] = None, + last_login_users: list[str] = None, + site_names: list[str] = None, include_raw_device: bool = None) -> DevicesHistoryResult: """ Query devices history with query param. @@ -158,13 +155,13 @@ class Device: :vartype os_version: str """ - def __init__(self, device_id: Optional[str] = None, api: IntezerApiClient = None): + def __init__(self, device_id: str | None = None, api: IntezerApiClient = None): """ Create a new Device instance with the given device_id. - Please note that this does not query the Intezer Analyze API for the device data, but rather creates a Device + Please note that this does not query the Intezer Platform API for the device data, but rather creates a Device instance with the given device id. - If you wish to fetch the device data from the Intezer Analyze API, use the `from_id` class method. + If you wish to fetch the device data from the Intezer Platform API, use the `from_id` class method. :param device_id: The device id. :param api: The API connection to Intezer. @@ -173,15 +170,15 @@ def __init__(self, device_id: Optional[str] = None, api: IntezerApiClient = None self._intezer_api_client = api self._api = IntezerApi(api or get_global_api()) - self._result: Optional[Dict] = None - self.hostname: Optional[str] = None - self.host_type: Optional[str] = None - self.os_type: Optional[str] = None - self.os_version: Optional[str] = None + self._result: dict | None = None + self.hostname: str | None = None + self.host_type: str | None = None + self.os_type: str | None = None + self.os_version: str | None = None def fetch_info(self): """ - Fetch the device data from the Intezer Analyze API. + Fetch the device data from the Intezer Platform API. :raises intezer_sdk.errors.DeviceNotFound: If the device was not found. """ @@ -200,9 +197,9 @@ def fetch_info(self): self.os_type = self._result.get('os_type') self.os_version = self._result.get('os_version') - def result(self) -> Optional[dict]: + def result(self) -> dict | None: """ - Get the raw device result, as received from Intezer Analyze API. + Get the raw device result, as received from Intezer Platform API. :raises intezer_sdk.errors.IncidentNotFound: If the device was not found. :return: The raw device dictionary. @@ -212,7 +209,7 @@ def result(self) -> Optional[dict]: @classmethod def from_id(cls, device_id: str, api: IntezerApiClient = None) -> 'Device': """ - Create a new Device instance, and fetch the device data from the Intezer Analyze API. + Create a new Device instance, and fetch the device data from the Intezer Platform API. :param device_id: The device id. :param api: The API connection to Intezer. diff --git a/intezer_sdk/devices_results.py b/intezer_sdk/devices_results.py index 2715edf..2cc6d77 100644 --- a/intezer_sdk/devices_results.py +++ b/intezer_sdk/devices_results.py @@ -1,6 +1,4 @@ -from typing import List from typing import Literal -from typing import Tuple from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import raise_for_status @@ -20,7 +18,7 @@ def __init__(self, super().__init__(request_url_path, api, filters) self._search_mode = search_mode - def _fetch_history(self, url_path: str, data: dict) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: dict) -> tuple[int, list]: """ Request devices from server according to filters. :param url_path: Url to request new data from. diff --git a/intezer_sdk/endpoint_analysis.py b/intezer_sdk/endpoint_analysis.py index 8b874bd..31a67a2 100644 --- a/intezer_sdk/endpoint_analysis.py +++ b/intezer_sdk/endpoint_analysis.py @@ -6,8 +6,6 @@ import os import pathlib from typing import IO -from typing import List -from typing import Optional from intezer_sdk import consts from intezer_sdk._api import IntezerApi @@ -58,10 +56,10 @@ def __init__(self, self._fileless_dir = pathlib.Path(fileless_dir) self._memory_modules_dir = pathlib.Path(memory_modules_dir) - self._sub_analyses: List[SubAnalysis] = [] + self._sub_analyses: list[SubAnalysis] = [] self._scan_id = None - self.scan_start_time: Optional[datetime.datetime] = None - self.scan_end_time: Optional[datetime.datetime] = None + self.scan_start_time: datetime.datetime | None = None + self.scan_end_time: datetime.datetime | None = None @classmethod def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None): @@ -94,7 +92,7 @@ def _set_report(self, report: dict): def _query_status_from_api(self): return self._api.get_endpoint_analysis_response(self.analysis_id, False) - def get_sub_analyses(self, verdicts: List[str] = None) -> List[SubAnalysis]: + def get_sub_analyses(self, verdicts: list[str] = None) -> list[SubAnalysis]: """ Get the sub_analyses of the current analysis. :param verdicts: A list of the verdicts to filter by. diff --git a/intezer_sdk/errors.py b/intezer_sdk/errors.py index 62583d2..44ebe9c 100644 --- a/intezer_sdk/errors.py +++ b/intezer_sdk/errors.py @@ -1,5 +1,5 @@ -from typing import Optional import datetime + import requests @@ -68,7 +68,7 @@ def __init__(self): class AnalysisIsAlreadyRunningError(ServerError): - def __init__(self, response: requests.Response, running_analysis_id: Optional[str]): + def __init__(self, response: requests.Response, running_analysis_id: str | None): super().__init__('Analysis already running', response) self.analysis_id = running_analysis_id diff --git a/intezer_sdk/family.py b/intezer_sdk/family.py index f941cbf..3b9de0b 100644 --- a/intezer_sdk/family.py +++ b/intezer_sdk/family.py @@ -1,8 +1,3 @@ -import typing - -from typing import Optional -from typing import List - from intezer_sdk import errors from intezer_sdk._api import IntezerApi from intezer_sdk.api import IntezerApiClient @@ -26,7 +21,7 @@ def __eq__(self, other): return self is other or isinstance(other, Family) and self.family_id and other.family_id == self.family_id @classmethod - def from_family_id(cls, family_id: str, api: IntezerApiClient = None) -> typing.Optional['Family']: + def from_family_id(cls, family_id: str, api: IntezerApiClient = None) -> 'Family | None': try: family = cls(family_id, api=api) family.fetch_info() @@ -58,14 +53,14 @@ def type(self) -> str: return self._type @property - def tags(self) -> List[str]: + def tags(self) -> list[str]: if self._tags is None: self.fetch_info() return self._tags -def get_family_by_name(family_name: str, api: IntezerApiClient = None) -> typing.Optional[Family]: +def get_family_by_name(family_name: str, api: IntezerApiClient = None) -> Family | None: family = IntezerApi(api or get_global_api()).get_family_by_name(family_name) if family: return Family(family['family_id'], family['family_name']) diff --git a/intezer_sdk/file.py b/intezer_sdk/file.py index 4b8dd63..314fcd2 100644 --- a/intezer_sdk/file.py +++ b/intezer_sdk/file.py @@ -1,9 +1,6 @@ import datetime from dataclasses import dataclass from typing import IO -from typing import List -from typing import Optional -from typing import Union from intezer_sdk import _operation from intezer_sdk import consts @@ -18,7 +15,7 @@ class Block: address: int software_type: str - families: List[str] + families: list[str] @property def is_common(self): @@ -48,7 +45,7 @@ def __init__(self, self._file_path = file_path self._sha256 = sha256 self._api = IntezerApi(api or get_global_api()) - self._index: Optional[Index] = None + self._index: Index | None = None self._operations = {} @property @@ -64,7 +61,7 @@ def file_path(self) -> str: def index(self, index_as: consts.IndexType, family_name: str = None, - wait: Union[bool, int] = False): + wait: bool | int = False): """ Index the file. @@ -85,7 +82,7 @@ def index(self, self._index.send(wait=wait) - def unset_indexing(self, wait: Union[bool, int] = False): + def unset_indexing(self, wait: bool | int = False): """ Unset the indexing request (only works for sha256-based files). @@ -156,8 +153,8 @@ def download(self, self._api.download_file_by_sha256(self._sha256, path, output_stream, password_protection) def get_code_blocks(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: """ Retrieves a report containing information about reused code blocks for the given SHA-256 hash. diff --git a/intezer_sdk/history_results.py b/intezer_sdk/history_results.py index 829c304..6d99a0d 100644 --- a/intezer_sdk/history_results.py +++ b/intezer_sdk/history_results.py @@ -1,14 +1,11 @@ import abc -from typing import List from typing import Any -from typing import Dict -from typing import Tuple from intezer_sdk.api import IntezerApiClient class HistoryResult: - def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): + def __init__(self, request_url_path: str, api: IntezerApiClient, filters: dict): """ Fetch all history results from server. @@ -17,9 +14,9 @@ def __init__(self, request_url_path: str, api: IntezerApiClient, filters: Dict): :param filters: Filters requested from server. """ self._api = api - self.filters: Dict = filters - self._pages: List[Any] = [] - self._current_page: List[Any] = [] + self.filters: dict = filters + self._pages: list[Any] = [] + self._current_page: list[Any] = [] self._request_url_path: str = request_url_path self._total_count: int = 0 self._current_offset: int = 0 @@ -68,7 +65,7 @@ def _update_current_page_metadata(self): @abc.abstractmethod - def _fetch_history(self, url_path: str, data: Dict) -> Tuple[int, List]: + def _fetch_history(self, url_path: str, data: dict) -> tuple[int, list]: """ Request from server filtered history results. :param url_path: Url to request new data from. diff --git a/intezer_sdk/incidents.py b/intezer_sdk/incidents.py index 94f1c59..1d35fdc 100644 --- a/intezer_sdk/incidents.py +++ b/intezer_sdk/incidents.py @@ -1,8 +1,5 @@ import datetime from typing import Any -from typing import Dict -from typing import List -from typing import Optional from requests import HTTPError @@ -20,25 +17,25 @@ def generate_incidents_history_search_filters(*, - incident_ids: List[str] = None, - environments: List[str] = None, + incident_ids: list[str] = None, + environments: list[str] = None, offset: int = None, limit: int = None, - time_filter_type: List[str] = None, + time_filter_type: list[str] = None, start_time: datetime.datetime = None, end_time: datetime.datetime = None, - sources: List[str] = None, - senders: List[str] = None, - severities: List[str] = None, - statuses: List[str] = None, - names: List[str] = None, - related_alert_ids: List[str] = None, - risk_categories: List[str] = None, - response_statuses: List[str] = None, + sources: list[str] = None, + senders: list[str] = None, + severities: list[str] = None, + statuses: list[str] = None, + names: list[str] = None, + related_alert_ids: list[str] = None, + risk_categories: list[str] = None, + response_statuses: list[str] = None, free_text: str = None, - sort_by: List[str] = None, + sort_by: list[str] = None, sort_order: str = None, - include_raw_incident: bool = None) -> Dict[str, Any]: + include_raw_incident: bool = None) -> dict[str, Any]: filters = {} start_timestamp = int(start_time.timestamp()) if start_time else None end_timestamp = int(end_time.timestamp()) if end_time else None @@ -68,23 +65,23 @@ def generate_incidents_history_search_filters(*, def query_incidents_history(*, api: IntezerApiClient = None, - incident_ids: List[str] = None, - environments: List[str] = None, + incident_ids: list[str] = None, + environments: list[str] = None, offset: int = DEFAULT_OFFSET, limit: int = DEFAULT_LIMIT, - time_filter_type: List[str] = None, + time_filter_type: list[str] = None, start_time: datetime.datetime = None, end_time: datetime.datetime = None, - sources: List[str] = None, - senders: List[str] = None, - severities: List[str] = None, - statuses: List[str] = None, - names: List[str] = None, - related_alert_ids: List[str] = None, - risk_categories: List[str] = None, - response_statuses: List[str] = None, + sources: list[str] = None, + senders: list[str] = None, + severities: list[str] = None, + statuses: list[str] = None, + names: list[str] = None, + related_alert_ids: list[str] = None, + risk_categories: list[str] = None, + response_statuses: list[str] = None, free_text: str = None, - sort_by: List[str] = None, + sort_by: list[str] = None, sort_order: str = None, include_raw_incident: bool = None) -> IncidentsHistoryResult: """ @@ -161,16 +158,16 @@ class Incident: def __init__( self, - incident_id: Optional[str] = None, - environment: Optional[str] = None, + incident_id: str | None = None, + environment: str | None = None, api: IntezerApiClient = None, ): """ Create a new Incident instance with the given incident id. - Please note that this does not query the Intezer Analyze API for the incident data, but rather creates an Incident + Please note that this does not query the Intezer Platform API for the incident data, but rather creates an Incident instance with the given incident id. - If you wish to fetch the incident data from the Intezer Analyze API, use the `from_id` class method. + If you wish to fetch the incident data from the Intezer Platform API, use the `from_id` class method. :param incident_id: The incident id. :param environment: The environment of the incident. @@ -180,17 +177,17 @@ def __init__( self.environment = environment self._intezer_api_client = api self._api = IntezerApi(api or get_global_api()) - self._result: Optional[Dict] = None - self.name: Optional[str] = None - self.source: Optional[str] = None - self.sender: Optional[str] = None - self.risk_category: Optional[str] = None - self.risk_level: Optional[str] = None - self.intezer_incident_url: Optional[str] = None + self._result: dict | None = None + self.name: str | None = None + self.source: str | None = None + self.sender: str | None = None + self.risk_category: str | None = None + self.risk_level: str | None = None + self.intezer_incident_url: str | None = None def fetch_info(self): """ - Fetch the incident data from the Intezer Analyze API. + Fetch the incident data from the Intezer Platform API. :raises intezer_sdk.errors.IncidentNotFound: If the incident was not found. """ @@ -205,10 +202,10 @@ def fetch_info(self): if e.response.status_code == 404: raise errors.IncidentNotFoundError(self.incident_id) raise - + if not self.environment: self.environment = self._result['environment'] - + self.source = self._result.get('source') self.sender = self._result.get('sender') self.name = self._result.get('incident', {}).get('name') @@ -216,9 +213,9 @@ def fetch_info(self): self.risk_level = self._result.get('triage_summary', {}).get('risk_level') self.intezer_incident_url = self._result.get('intezer_incident_url') - def result(self) -> Optional[dict]: + def result(self) -> dict | None: """ - Get the raw incident result, as received from Intezer Analyze API. + Get the raw incident result, as received from Intezer Platform API. :raises intezer_sdk.errors.IncidentNotFound: If the incident was not found. :return: The raw incident dictionary. @@ -226,9 +223,9 @@ def result(self) -> Optional[dict]: return self._result @classmethod - def from_id(cls, incident_id: str, environment: Optional[str] = None, api: IntezerApiClient = None) -> 'Incident': + def from_id(cls, incident_id: str, environment: str | None = None, api: IntezerApiClient = None) -> 'Incident': """ - Create a new Incident instance, and fetch the incident data from the Intezer Analyze API. + Create a new Incident instance, and fetch the incident data from the Intezer Platform API. :param incident_id: The incident id. :param environment: The environment of the incident. @@ -241,7 +238,7 @@ def from_id(cls, incident_id: str, environment: Optional[str] = None, api: Intez return new_incident def get_raw_data( - self, environment: Optional[str] = None, raw_data_type: str = 'raw_incident' + self, environment: str | None = None, raw_data_type: str = 'raw_incident' ) -> dict: """ Get raw incident data. diff --git a/intezer_sdk/incidents_results.py b/intezer_sdk/incidents_results.py index 1caf621..b29c84f 100644 --- a/intezer_sdk/incidents_results.py +++ b/intezer_sdk/incidents_results.py @@ -1,7 +1,3 @@ -from typing import Dict -from typing import List -from typing import Tuple - from intezer_sdk.api import IntezerApiClient from intezer_sdk.api import raise_for_status from intezer_sdk.history_results import HistoryResult @@ -15,7 +11,7 @@ def __init__(self, request_url_path: str, api: IntezerApiClient, filters: dict): """ super().__init__(request_url_path, api, filters) - def _fetch_history(self, url_path: str, data: dict) -> Tuple[int, list]: + def _fetch_history(self, url_path: str, data: dict) -> tuple[int, list]: """ Request incidents from server according to filters. :param url_path: Url to request new data from. diff --git a/intezer_sdk/index.py b/intezer_sdk/index.py index 7f9ba79..1769ae7 100644 --- a/intezer_sdk/index.py +++ b/intezer_sdk/index.py @@ -1,5 +1,4 @@ import time -import typing from http import HTTPStatus from intezer_sdk import consts @@ -39,7 +38,7 @@ def __init__(self, self._index_as = index_as self._family_name = family_name - def send(self, wait: typing.Union[bool, int] = False): + def send(self, wait: bool | int = False): """ Send the index request. @@ -61,7 +60,7 @@ def send(self, wait: typing.Union[bool, int] = False): else: self.wait_for_completion(wait, sleep_before_first_check=True) - def unset_indexing(self, wait: typing.Union[bool, int] = False): + def unset_indexing(self, wait: bool | int = False): """ Unset the indexing request. diff --git a/intezer_sdk/operation.py b/intezer_sdk/operation.py index 5f686d6..e8aef9d 100644 --- a/intezer_sdk/operation.py +++ b/intezer_sdk/operation.py @@ -1,6 +1,5 @@ import datetime import time -from typing import Optional from intezer_sdk import errors from intezer_sdk._api import IntezerApi @@ -57,7 +56,7 @@ def check_status(self) -> bool: def wait_for_completion(self, interval: int = None, sleep_before_first_check=False, - wait_timeout: Optional[datetime.timedelta] = None) -> None: + wait_timeout: datetime.timedelta | None = None) -> None: """ Blocks until the operation is completed. :param interval: The interval to wait between checks in seconds. diff --git a/intezer_sdk/sub_analysis.py b/intezer_sdk/sub_analysis.py index e6d1b43..8a82574 100644 --- a/intezer_sdk/sub_analysis.py +++ b/intezer_sdk/sub_analysis.py @@ -1,8 +1,5 @@ import datetime from typing import IO -from typing import List -from typing import Optional -from typing import Union from intezer_sdk import _operation from intezer_sdk import errors @@ -21,7 +18,7 @@ def __init__(self, composed_analysis_id: str, sha256: str, source: str, - extraction_info: Optional[dict], + extraction_info: dict | None, api: IntezerApiClient = None, verdict=None): self.composed_analysis_id = composed_analysis_id @@ -41,7 +38,7 @@ def from_analysis_id(cls, analysis_id: str, composed_analysis_id: str, lazy_load=True, - api: IntezerApiClient = None) -> Optional['SubAnalysis']: + api: IntezerApiClient = None) -> 'SubAnalysis | None': """ class method that creates a new instance of the class by fetching the details of the sub-analysis from the Intezer API. If lazy_load is set to True, the details of the sub-analysis are not fetched immediately. @@ -76,7 +73,7 @@ def source(self) -> str: return self._source @property - def extraction_info(self) -> Optional[dict]: + def extraction_info(self) -> dict | None: # Since extraction_info could be none, we check if the sha256 was provided, signaling we already fetch it if not self._sha256: self._init_sub_analysis_from_parent() @@ -102,7 +99,7 @@ def verdict(self) -> str: return self._verdict @property - def indicators(self) -> List[dict]: + def indicators(self) -> list[dict]: if self._indicators is None: self._indicators = self.metadata.get('indicators', []) @@ -122,8 +119,8 @@ def _init_sub_analysis_from_parent(self): def find_related_files(self, family_id: str, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: result_url = self._api.get_sub_analysis_related_files_by_family_id(self.composed_analysis_id, self.analysis_id, family_id) @@ -135,8 +132,8 @@ def find_related_files(self, wait_timeout) def get_account_related_samples(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> Optional[operation.Operation]: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation | None: try: result_url = self._api.get_sub_analysis_account_related_samples_by_id(self.composed_analysis_id, self.analysis_id) @@ -151,20 +148,20 @@ def get_account_related_samples(self, wait_timeout) def generate_vaccine(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: result_url = self._api.generate_sub_analysis_vaccine_by_id(self.composed_analysis_id, self.analysis_id) return _operation.handle_operation(self._operations, self._api, 'Vaccine', result_url, wait, wait_timeout) def get_capabilities(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: result_url = self._api.get_sub_analysis_capabilities_by_id(self.composed_analysis_id, self.analysis_id) return _operation.handle_operation(self._operations, self._api, 'Capabilities', result_url, wait, wait_timeout) def get_strings(self, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: result = self._api.get_strings_by_id(self.composed_analysis_id, self.analysis_id) return _operation.handle_operation(self._operations, self._api, @@ -174,8 +171,8 @@ def get_strings(self, def get_string_related_samples(self, string_value: str, - wait: Union[bool, int] = False, - wait_timeout: Optional[datetime.timedelta] = None) -> operation.Operation: + wait: bool | int = False, + wait_timeout: datetime.timedelta | None = None) -> operation.Operation: result_url = self._api.get_string_related_samples_by_id(self.composed_analysis_id, self.analysis_id, string_value) diff --git a/intezer_sdk/util.py b/intezer_sdk/util.py index 98b0b1c..6c355bc 100644 --- a/intezer_sdk/util.py +++ b/intezer_sdk/util.py @@ -3,12 +3,8 @@ import email.parser import itertools import os -from typing import BinaryIO -from typing import Dict -from typing import List -from typing import Optional -from typing import Tuple from typing import Any +from typing import BinaryIO from intezer_sdk.analysis import FileAnalysis from intezer_sdk.consts import ANALYZE_URL @@ -36,7 +32,7 @@ def _get_title(short: bool) -> str: def get_analysis_summary_metadata(analysis: FileAnalysis, use_hash_link: bool = False, should_use_largest_families: bool = True, - should_include_related_samples: bool = True) -> Dict[str, any]: + should_include_related_samples: bool = True) -> dict[str, any]: result = analysis.result() verdict = result['verdict'].lower() sub_verdict = result['sub_verdict'].lower() @@ -166,8 +162,8 @@ def get_analysis_summary(analysis: FileAnalysis, def get_analysis_family(analysis: FileAnalysis, - software_type_priorities: List[str], - should_use_largest_families: bool = True) -> Tuple[Optional[str], Optional[int]]: + software_type_priorities: list[str], + should_use_largest_families: bool = True) -> tuple[str | None, int | None]: result = analysis.result() family_name = result.get('family_name') if family_name: diff --git a/setup.py b/setup.py index 7269852..e2b5484 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ def rel(*xs): license='Apache 2.0', author='Intezer Labs ltd.', author_email='info@intezer.com', - description='Intezer Analyze SDK', + description='Intezer Platform SDK', long_description=long_description, long_description_content_type='text/markdown', install_requires=[ @@ -39,13 +39,13 @@ def rel(*xs): 'responses == 0.25.8', 'pytest == 8.1.1' ], - python_requires='!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,!=3.8.*', + python_requires='!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,!=3.8.*,!=3.9.*', classifiers=[ 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', 'Programming Language :: Python :: 3.13', + 'Programming Language :: Python :: 3.14', ] ) diff --git a/tests/unit/test_results.py b/tests/unit/test_results.py index c8aefcd..9763658 100644 --- a/tests/unit/test_results.py +++ b/tests/unit/test_results.py @@ -3,7 +3,6 @@ import datetime import uuid from http import HTTPStatus -from typing import List import responses @@ -57,8 +56,8 @@ def add_mock_response(self, header, request_url_path=FILE_ANALYSES_REQUEST): self.expected_result = copy.deepcopy(self.normal_result['analyses']) yield mock - def assert_deep_lists_equal(self, lst1: List, lst2: List): - [self.assertDictEqual(x, y) for x, y in zip(lst1, lst2)] + def assert_deep_lists_equal(self, lst1: list, lst2: list): + [self.assertDictEqual(x, y) for x, y in zip(lst1, lst2, strict=True)] def test_current_page_never_none(self): """Current page will always hold a page even thought didn't ask to fetch analyse yet."""