From c18f3bf21820f9e04a1ec383dcf366e15efa07dd Mon Sep 17 00:00:00 2001 From: vc1492a Date: Wed, 4 Feb 2026 12:40:02 -0800 Subject: [PATCH 1/5] feat: refactor Validation class for ease of use --- .github/workflows/release-please.yml | 24 -- PyNomaly/__init__.py | 18 ++ PyNomaly/loop.py | 425 +++++++++++++-------------- changelog.md | 17 ++ setup.py | 4 +- tests/test_loop.py | 23 +- 6 files changed, 255 insertions(+), 256 deletions(-) delete mode 100644 .github/workflows/release-please.yml diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml deleted file mode 100644 index 298f0fc..0000000 --- a/.github/workflows/release-please.yml +++ /dev/null @@ -1,24 +0,0 @@ -on: - push: - branches: - - main # release branch (default) - -permissions: - contents: write - pull-requests: write - -name: release-please - -jobs: - release-please: - runs-on: ubuntu-latest - steps: - - uses: googleapis/release-please-action@v4 - with: - # this assumes that you have created a personal access token - # (PAT) and configured it as a GitHub action secret named - # `RELEASE_PLEASE_PERSONAL_ACCESS_TOKEN` (this secret name is not important). - token: ${{ secrets.RELEASE_PLEASE_PERSONAL_ACCESS_TOKEN }} - # this is a built-in strategy in release-please, see "Action Inputs" - # for more options - release-type: simple \ No newline at end of file diff --git a/PyNomaly/__init__.py b/PyNomaly/__init__.py index e69de29..0cd3ebf 100644 --- a/PyNomaly/__init__.py +++ b/PyNomaly/__init__.py @@ -0,0 +1,18 @@ +# Authors: Valentino Constantinou +# License: Apache 2.0 + +from PyNomaly.loop import ( + LocalOutlierProbability, + PyNomalyError, + ValidationError, + ClusterSizeError, + MissingValuesError, +) + +__all__ = [ + "LocalOutlierProbability", + "PyNomalyError", + "ValidationError", + "ClusterSizeError", + "MissingValuesError", +] diff --git a/PyNomaly/loop.py b/PyNomaly/loop.py index 95d0ca7..2ab26d7 100644 --- a/PyNomaly/loop.py +++ b/PyNomaly/loop.py @@ -11,10 +11,31 @@ pass __author__ = "Valentino Constantinou" -__version__ = "0.3.4" +__version__ = "0.3.5" __license__ = "Apache License, Version 2.0" +# Custom Exceptions +class PyNomalyError(Exception): + """Base exception for PyNomaly.""" + pass + + +class ValidationError(PyNomalyError): + """Raised when input validation fails.""" + pass + + +class ClusterSizeError(ValidationError): + """Raised when cluster size is smaller than n_neighbors.""" + pass + + +class MissingValuesError(ValidationError): + """Raised when data contains missing values.""" + pass + + class Utils: @staticmethod def emit_progress_bar(progress: str, index: int, total: int) -> str: @@ -77,217 +98,190 @@ class LocalOutlierProbability(object): (2016). """ - class Validate: + """ + Validation methods. + These methods validate inputs and raise exceptions or warnings as appropriate. + """ + + @staticmethod + def _convert_to_array(obj: Union["pd.DataFrame", np.ndarray]) -> np.ndarray: + """ + Converts the input data to a numpy array if it is a Pandas DataFrame + or validates it is already a numpy array. + :param obj: user-provided input data. + :return: a vector of values to be used in calculating the local + outlier probability. + """ + if obj.__class__.__name__ == "DataFrame": + points_vector = obj.values + return points_vector + elif obj.__class__.__name__ == "ndarray": + points_vector = obj + return points_vector + else: + warnings.warn( + "Provided data or distance matrix must be in ndarray " + "or DataFrame.", + UserWarning, + ) + if isinstance(obj, list): + points_vector = np.array(obj) + return points_vector + points_vector = np.array([obj]) + return points_vector + def _validate_inputs(self): """ - The Validate class aids in ensuring PyNomaly receives the right set - of user inputs for proper execution of the Local Outlier Probability - (LoOP) approach. Depending on the desired behavior, either an - exception is raised to the user or PyNomaly continues executing - albeit with some form of user warning. + Validates the inputs provided during initialization to ensure + that the needed objects are provided. + :return: a tuple of (data, distance_matrix, neighbor_matrix) or + raises a warning for invalid inputs. """ + if all(v is None for v in [self.data, self.distance_matrix]): + warnings.warn( + "Data or a distance matrix must be provided.", UserWarning + ) + return False + elif all(v is not None for v in [self.data, self.distance_matrix]): + warnings.warn( + "Only one of the following may be provided: data or a " + "distance matrix (not both).", + UserWarning, + ) + return False + if self.data is not None: + points_vector = self._convert_to_array(self.data) + return points_vector, self.distance_matrix, self.neighbor_matrix + if all( + matrix is not None + for matrix in [self.neighbor_matrix, self.distance_matrix] + ): + dist_vector = self._convert_to_array(self.distance_matrix) + neigh_vector = self._convert_to_array(self.neighbor_matrix) + else: + warnings.warn( + "A neighbor index matrix and distance matrix must both be " + "provided when not using raw input data.", + UserWarning, + ) + return False + if self.distance_matrix.shape != self.neighbor_matrix.shape: + warnings.warn( + "The shape of the distance and neighbor " + "index matrices must match.", + UserWarning, + ) + return False + elif (self.distance_matrix.shape[1] != self.n_neighbors) or ( + self.neighbor_matrix.shape[1] != self.n_neighbors + ): + warnings.warn( + "The shape of the distance or " + "neighbor index matrix does not " + "match the number of neighbors " + "specified.", + UserWarning, + ) + return False + return self.data, dist_vector, neigh_vector + + def _check_cluster_size(self) -> None: + """ + Validates the cluster labels to ensure that the smallest cluster + size (number of observations in the cluster) is larger than the + specified number of neighbors. + :raises ClusterSizeError: if any cluster is too small. + """ + c_labels = self._cluster_labels() + for cluster_id in set(c_labels): + c_size = np.where(c_labels == cluster_id)[0].shape[0] + if c_size <= self.n_neighbors: + raise ClusterSizeError( + "Number of neighbors specified larger than smallest " + "cluster. Specify a number of neighbors smaller than " + "the smallest cluster size (observations in smallest " + "cluster minus one)." + ) + def _check_n_neighbors(self) -> bool: """ - Private methods. + Validates the specified number of neighbors to ensure that it is + greater than 0 and that the specified value is less than the total + number of observations. + :return: a boolean indicating whether validation has passed without + adjustment. """ + if not self.n_neighbors > 0: + self.n_neighbors = 10 + warnings.warn( + "n_neighbors must be greater than 0." + " Fit with " + str(self.n_neighbors) + " instead.", + UserWarning, + ) + return False + elif self.n_neighbors >= self._n_observations(): + self.n_neighbors = self._n_observations() - 1 + warnings.warn( + "n_neighbors must be less than the number of observations." + " Fit with " + str(self.n_neighbors) + " instead.", + UserWarning, + ) + return True - @staticmethod - def _data(obj: Union["pd.DataFrame", np.ndarray]) -> np.ndarray: - """ - Validates the input data to ensure it is either a Pandas DataFrame - or Numpy array. - :param obj: user-provided input data. - :return: a vector of values to be used in calculating the local - outlier probability. - """ - if obj.__class__.__name__ == "DataFrame": - points_vector = obj.values - return points_vector - elif obj.__class__.__name__ == "ndarray": - points_vector = obj - return points_vector - else: - warnings.warn( - "Provided data or distance matrix must be in ndarray " - "or DataFrame.", - UserWarning, - ) - if isinstance(obj, list): - points_vector = np.array(obj) - return points_vector - points_vector = np.array([obj]) - return points_vector + def _check_extent(self) -> bool: + """ + Validates the specified extent parameter to ensure it is either 1, + 2, or 3. + :return: a boolean indicating whether validation has passed. + """ + if self.extent not in [1, 2, 3]: + warnings.warn( + "extent parameter (lambda) must be 1, 2, or 3.", UserWarning + ) + return False + return True - def _inputs(self, obj: "LocalOutlierProbability"): - """ - Validates the inputs provided during initialization to ensure - that the needed objects are provided. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has failed or - the data, distance matrix, and neighbor matrix. - """ - if all(v is None for v in [obj.data, obj.distance_matrix]): - warnings.warn( - "Data or a distance matrix must be provided.", UserWarning - ) - return False - elif all(v is not None for v in [obj.data, obj.distance_matrix]): - warnings.warn( - "Only one of the following may be provided: data or a " - "distance matrix (not both).", - UserWarning, - ) - return False - if obj.data is not None: - points_vector = self._data(obj.data) - return points_vector, obj.distance_matrix, obj.neighbor_matrix - if all( - matrix is not None - for matrix in [obj.neighbor_matrix, obj.distance_matrix] - ): - dist_vector = self._data(obj.distance_matrix) - neigh_vector = self._data(obj.neighbor_matrix) - else: - warnings.warn( - "A neighbor index matrix and distance matrix must both be " - "provided when not using raw input data.", - UserWarning, - ) - return False - if obj.distance_matrix.shape != obj.neighbor_matrix.shape: - warnings.warn( - "The shape of the distance and neighbor " - "index matrices must match.", - UserWarning, - ) - return False - elif (obj.distance_matrix.shape[1] != obj.n_neighbors) or ( - obj.neighbor_matrix.shape[1] != obj.n_neighbors - ): - warnings.warn( - "The shape of the distance or " - "neighbor index matrix does not " - "match the number of neighbors " - "specified.", - UserWarning, - ) - return False - return obj.data, dist_vector, neigh_vector - - @staticmethod - def _cluster_size(obj) -> bool: - """ - Validates the cluster labels to ensure that the smallest cluster - size (number of observations in the cluster) is larger than the - specified number of neighbors. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - c_labels = obj._cluster_labels() - for cluster_id in set(c_labels): - c_size = np.where(c_labels == cluster_id)[0].shape[0] - if c_size <= obj.n_neighbors: - warnings.warn( - "Number of neighbors specified larger than smallest " - "cluster. Specify a number of neighbors smaller than " - "the smallest cluster size (observations in smallest " - "cluster minus one).", - UserWarning, - ) - return False - return True - - @staticmethod - def _n_neighbors(obj) -> bool: - """ - Validates the specified number of neighbors to ensure that it is - greater than 0 and that the specified value is less than the total - number of observations. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - if not obj.n_neighbors > 0: - obj.n_neighbors = 10 - warnings.warn( - "n_neighbors must be greater than 0." - " Fit with " + str(obj.n_neighbors) + " instead.", - UserWarning, - ) - return False - elif obj.n_neighbors >= obj._n_observations(): - obj.n_neighbors = obj._n_observations() - 1 - warnings.warn( - "n_neighbors must be less than the number of observations." - " Fit with " + str(obj.n_neighbors) + " instead.", - UserWarning, - ) - return True - - @staticmethod - def _extent(obj) -> bool: - """ - Validates the specified extent parameter to ensure it is either 1, - 2, or 3. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - if obj.extent not in [1, 2, 3]: - warnings.warn( - "extent parameter (lambda) must be 1, 2, or 3.", UserWarning - ) - return False - return True - - @staticmethod - def _missing_values(obj) -> bool: - """ - Validates the provided data to ensure that it contains no - missing values. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - if np.any(np.isnan(obj.data)): - warnings.warn( - "Method does not support missing values in input data.", UserWarning - ) - return False - return True - - @staticmethod - def _fit(obj) -> bool: - """ - Validates that the model was fit prior to calling the stream() - method. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - if obj.is_fit is False: - warnings.warn( - "Must fit on historical data by calling fit() prior to " - "calling stream(x).", - UserWarning, - ) - return False - return True - - @staticmethod - def _no_cluster_labels(obj) -> bool: - """ - Checks to see if cluster labels are attempting to be used in - stream() and, if so, calls fit() once again but without cluster - labels. As PyNomaly does not accept clustering algorithms as input, - the stream approach does not support clustering. - :param obj: a PyNomaly object. - :return: a boolean indicating whether validation has passed. - """ - if len(set(obj._cluster_labels())) > 1: - warnings.warn( - "Stream approach does not support clustered data. " - "Automatically refit using single cluster of points.", - UserWarning, - ) - return False - return True + def _check_missing_values(self) -> None: + """ + Validates the provided data to ensure that it contains no + missing values. + :raises MissingValuesError: if data contains NaN values. + """ + if np.any(np.isnan(self.data)): + raise MissingValuesError( + "Method does not support missing values in input data." + ) + + def _check_is_fit(self) -> bool: + """ + Checks that the model was fit prior to calling the stream() method. + :return: a boolean indicating whether the model has been fit. + """ + if self.is_fit is False: + warnings.warn( + "Must fit on historical data by calling fit() prior to " + "calling stream(x).", + UserWarning, + ) + return False + return True + + def _check_no_cluster_labels(self) -> bool: + """ + Checks to see if cluster labels are attempting to be used in + stream() and, if so, returns False. As PyNomaly does not accept + clustering algorithms as input, the stream approach does not + support clustering. + :return: a boolean indicating whether single cluster (no labels). + """ + if len(set(self._cluster_labels())) > 1: + warnings.warn( + "Stream approach does not support clustered data. " + "Automatically refit using single cluster of points.", + UserWarning, + ) + return False + return True """ Decorators. @@ -389,8 +383,8 @@ def __init__( "Numba is not available, falling back to pure python mode.", UserWarning ) - self.Validate()._inputs(self) - self.Validate._extent(self) + self._validate_inputs() + self._check_extent() """ Private methods. @@ -583,7 +577,7 @@ def _distances(self, progress_bar: bool = False) -> None: [self._n_observations(), self.n_neighbors], 9e10, dtype=float ) indexes = np.full([self._n_observations(), self.n_neighbors], 9e10, dtype=float) - self.points_vector = self.Validate._data(self.data) + self.points_vector = self._convert_to_array(self.data) compute = ( numba.jit(self._compute_distance_and_neighbor_matrix, cache=True) if self.use_numba @@ -806,13 +800,14 @@ def fit(self) -> "LocalOutlierProbability": cluster_labels. :return: self, which contains the local outlier probabilities as self.local_outlier_probabilities. + :raises ClusterSizeError: if any cluster is smaller than n_neighbors. + :raises MissingValuesError: if data contains missing values. """ - self.Validate._n_neighbors(self) - if self.Validate._cluster_size(self) is False: - sys.exit() - if self.data is not None and self.Validate._missing_values(self) is False: - sys.exit() + self._check_n_neighbors() + self._check_cluster_size() + if self.data is not None: + self._check_missing_values() store = self._store() if self.data is not None: @@ -848,14 +843,14 @@ def stream(self, x: np.ndarray) -> np.ndarray: """ orig_cluster_labels = None - if self.Validate._no_cluster_labels(self) is False: + if self._check_no_cluster_labels() is False: orig_cluster_labels = self.cluster_labels self.cluster_labels = np.array([0] * len(self.data)) - if self.Validate._fit(self) is False: + if self._check_is_fit() is False: self.fit() - point_vector = self.Validate._data(x) + point_vector = self._convert_to_array(x) distances = np.full([1, self.n_neighbors], 9e10, dtype=float) if self.data is not None: matrix = self.points_vector diff --git a/changelog.md b/changelog.md index 46520a8..2921b54 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,23 @@ All notable changes to PyNomaly will be documented in this Changelog. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 0.3.5 +### Changed +- Refactored the `Validate` class by dissolving it and moving validation methods +directly into `LocalOutlierProbability` as instance methods +([Issue #69](https://github.com/vc1492a/PyNomaly/issues/69)). +- Renamed validation methods for clarity: `_fit()` → `_check_is_fit()`, +`_data()` → `_convert_to_array()`, `_inputs()` → `_validate_inputs()`, +`_cluster_size()` → `_check_cluster_size()`, `_n_neighbors()` → `_check_n_neighbors()`, +`_extent()` → `_check_extent()`, `_missing_values()` → `_check_missing_values()`, +`_no_cluster_labels()` → `_check_no_cluster_labels()`. +- Replaced `sys.exit()` calls with proper exception handling. The library no longer +terminates the Python process on validation errors. +### Added +- Custom exception classes for better error handling: `PyNomalyError` (base), +`ValidationError`, `ClusterSizeError`, and `MissingValuesError`. These are now +exported from the package and can be caught by users. + ## 0.3.4 ### Changed - Changed source code as necessary to address a [user-reported issue](https://github.com/vc1492a/PyNomaly/issues/49), corrected in [this commit](https://github.com/vc1492a/PyNomaly/commit/bbdd12a318316ca9c7e0272a5b06909f3fc4f9b0) diff --git a/setup.py b/setup.py index 6a42f51..d58b87e 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='PyNomaly', packages=['PyNomaly'], - version='0.3.4', + version='0.3.5', description='A Python 3 implementation of LoOP: Local Outlier ' 'Probabilities, a local density based outlier detection ' 'method providing an outlier score in the range of [0,1].', @@ -16,7 +16,7 @@ long_description=long_description, long_description_content_type='text/markdown', url='https://github.com/vc1492a/PyNomaly', - download_url='https://github.com/vc1492a/PyNomaly/archive/0.3.4.tar.gz', + download_url='https://github.com/vc1492a/PyNomaly/archive/0.3.5.tar.gz', keywords=['outlier', 'anomaly', 'detection', 'machine', 'learning', 'probability'], classifiers=[], diff --git a/tests/test_loop.py b/tests/test_loop.py index ad97bde..b9f4aaf 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -2,6 +2,7 @@ # License: Apache 2.0 from PyNomaly import loop +from PyNomaly.loop import ClusterSizeError, MissingValuesError import logging from typing import Tuple @@ -568,30 +569,26 @@ def test_data_format() -> None: def test_missing_values() -> None: """ - Test to ensure that the program exits of a missing value is encountered - in the input data, as this is not allowable. + Test to ensure that MissingValuesError is raised if a missing value is + encountered in the input data, as this is not allowable. :return: None """ X = np.array([1.3, 1.1, 0.9, 1.4, 1.5, np.nan, 3.2]) clf = loop.LocalOutlierProbability(X, n_neighbors=3, use_numba=NUMBA) - with pytest.raises(SystemExit) as record_a, pytest.warns(UserWarning) as record_b: + with pytest.raises(MissingValuesError) as record: clf.fit() - assert record_a.type == SystemExit - - # check that only one warning was raised - assert len(record_b) == 1 # check that the message matches assert ( - record_b[0].message.args[0] + str(record.value) == "Method does not support missing values in input data." ) def test_small_cluster_size(X_n140_outliers) -> None: """ - Test to ensure that the program exits when the specified number of + Test to ensure that ClusterSizeError is raised when the specified number of neighbors is larger than the smallest cluster size in the input data. :param X_n140_outliers: A pytest Fixture that generates 140 observations. :return: None @@ -605,16 +602,12 @@ def test_small_cluster_size(X_n140_outliers) -> None: X_n140_outliers, n_neighbors=50, cluster_labels=cluster_labels, use_numba=NUMBA ) - with pytest.raises(SystemExit) as record_a, pytest.warns(UserWarning) as record_b: + with pytest.raises(ClusterSizeError) as record: clf.fit() - assert record_a.type == SystemExit - - # check that only one warning was raised - assert len(record_b) == 1 # check that the message matches assert ( - record_b[0].message.args[0] + str(record.value) == "Number of neighbors specified larger than smallest " "cluster. Specify a number of neighbors smaller than " "the smallest cluster size (observations in smallest " From 155ba8a446d6109f48fb224bd18af7a9a47ed9b8 Mon Sep 17 00:00:00 2001 From: vc1492a Date: Wed, 4 Feb 2026 12:44:12 -0800 Subject: [PATCH 2/5] fix: scalar position assignment --- PyNomaly/loop.py | 4 ++++ changelog.md | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/PyNomaly/loop.py b/PyNomaly/loop.py index 2ab26d7..48f499b 100644 --- a/PyNomaly/loop.py +++ b/PyNomaly/loop.py @@ -856,6 +856,10 @@ def stream(self, x: np.ndarray) -> np.ndarray: matrix = self.points_vector else: matrix = self.distance_matrix + # When using distance matrix mode, x is a scalar distance value. + # Extract scalar from array to avoid NumPy assignment errors. + if point_vector.size == 1: + point_vector = float(point_vector.flat[0]) for p in range(0, matrix.shape[0]): if self.data is not None: d = self._euclidean(matrix[p, :], point_vector) diff --git a/changelog.md b/changelog.md index 2921b54..ee92cdd 100644 --- a/changelog.md +++ b/changelog.md @@ -20,6 +20,10 @@ terminates the Python process on validation errors. - Custom exception classes for better error handling: `PyNomalyError` (base), `ValidationError`, `ClusterSizeError`, and `MissingValuesError`. These are now exported from the package and can be caught by users. +### Fixed +- Fixed a compatibility issue with NumPy in Python 3.11+ where assigning an array +to a scalar position in `stream()` would raise a `ValueError` when using distance +matrix mode. ## 0.3.4 ### Changed From 969d91028495a48231fffc8e4285e8c9498d9a5e Mon Sep 17 00:00:00 2001 From: vc1492a Date: Wed, 4 Feb 2026 14:33:35 -0800 Subject: [PATCH 3/5] readme --- readme.md => README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename readme.md => README.md (99%) diff --git a/readme.md b/README.md similarity index 99% rename from readme.md rename to README.md index bbc038e..d639fd8 100644 --- a/readme.md +++ b/README.md @@ -7,7 +7,7 @@ scores in the range of [0,1] that are directly interpretable as the probability PyNomaly is a core library of [deepchecks](https://github.com/deepchecks/deepchecks), [OmniDocBench](https://github.com/opendatalab/OmniDocBench) and [pysad](https://github.com/selimfirat/pysad). [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -[![PyPi](https://img.shields.io/badge/pypi-0.3.4-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.3.4) +[![PyPi](https://img.shields.io/badge/pypi-0.3.5-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.3.5) [![Total Downloads](https://static.pepy.tech/badge/pynomaly)](https://pepy.tech/projects/pynomaly) [![Monthly Downloads](https://static.pepy.tech/badge/pynomaly/month)](https://pepy.tech/projects/pynomaly) ![Tests](https://github.com/vc1492a/PyNomaly/actions/workflows/tests.yml/badge.svg) From 5e93be28feecfbce117b48160468fe045ffbdf09 Mon Sep 17 00:00:00 2001 From: vc1492a Date: Fri, 20 Mar 2026 11:12:01 -0700 Subject: [PATCH 4/5] feat: add parallel distance computation and vectorized pipeline Rewrite the distance computation engine from scratch on top of v0.3.5: - Vectorized kNN distances using NumPy broadcasting with chunked processing for memory efficiency and progress bar support - Add n_jobs parameter for cross-cluster multiprocessing via concurrent.futures (n_jobs=-1 uses all cores) - Restructure Numba path with non-generator kernels that support numba.prange for thread-level parallelism - Optional scipy.spatial.distance.cdist and scipy.special.erf acceleration when scipy is available - Vectorize _standard_distances, _prob_distances, and _norm_prob_outlier_factor pipeline methods - Fully backward-compatible: all existing API calls work unchanged Closes #36 Made-with: Cursor --- PyNomaly/loop.py | 291 +++++++++++++++++++++++++++++++++++++++------ README.md | 42 ++++++- tests/test_loop.py | 50 ++++++++ 3 files changed, 343 insertions(+), 40 deletions(-) diff --git a/PyNomaly/loop.py b/PyNomaly/loop.py index 48f499b..46bdf31 100644 --- a/PyNomaly/loop.py +++ b/PyNomaly/loop.py @@ -1,12 +1,79 @@ +from concurrent.futures import ProcessPoolExecutor, as_completed from math import erf, sqrt import numpy as np +import os from python_utils.terminal import get_terminal_size import sys from typing import Tuple, Union import warnings +try: + from scipy.spatial.distance import cdist as _scipy_cdist +except ImportError: + _scipy_cdist = None + +try: + from scipy.special import erf as _scipy_erf +except ImportError: + _scipy_erf = None + try: import numba + + def _numba_distance_kernel_parallel(clust_points_vector, n_neighbors): + n = clust_points_vector.shape[0] + d_features = clust_points_vector.shape[1] + local_distances = np.full((n, n_neighbors), 9e10, dtype=np.float64) + local_indexes = np.full((n, n_neighbors), 0, dtype=np.int64) + for i in numba.prange(n): + for j in range(n): + if i == j: + continue + d = 0.0 + for f in range(d_features): + diff_val = clust_points_vector[i, f] - clust_points_vector[j, f] + d += diff_val * diff_val + d = d ** 0.5 + idx_max = 0 + for k in range(1, n_neighbors): + if local_distances[i, k] > local_distances[i, idx_max]: + idx_max = k + if d < local_distances[i, idx_max]: + local_distances[i, idx_max] = d + local_indexes[i, idx_max] = j + return local_distances, local_indexes + + _numba_kernel_parallel = numba.jit( + _numba_distance_kernel_parallel, nopython=True, parallel=True, cache=True + ) + + def _numba_distance_kernel_sequential(clust_points_vector, n_neighbors): + n = clust_points_vector.shape[0] + d_features = clust_points_vector.shape[1] + local_distances = np.full((n, n_neighbors), 9e10, dtype=np.float64) + local_indexes = np.full((n, n_neighbors), 0, dtype=np.int64) + for i in range(n): + for j in range(n): + if i == j: + continue + d = 0.0 + for f in range(d_features): + diff_val = clust_points_vector[i, f] - clust_points_vector[j, f] + d += diff_val * diff_val + d = d ** 0.5 + idx_max = 0 + for k in range(1, n_neighbors): + if local_distances[i, k] > local_distances[i, idx_max]: + idx_max = k + if d < local_distances[i, idx_max]: + local_distances[i, idx_max] = d + local_indexes[i, idx_max] = j + return local_distances, local_indexes + + _numba_kernel_sequential = numba.jit( + _numba_distance_kernel_sequential, nopython=True, cache=True + ) + except ImportError: pass @@ -15,6 +82,40 @@ __license__ = "Apache License, Version 2.0" +def _cluster_distances_worker(args): + """Top-level worker for ProcessPoolExecutor (must be picklable).""" + clust_points_vector, global_indices, n_neighbors = args + n = clust_points_vector.shape[0] + + if clust_points_vector.ndim == 1: + clust_points_vector = clust_points_vector.reshape(-1, 1) + + local_distances = np.full((n, n_neighbors), 9e10, dtype=float) + local_indexes = np.full((n, n_neighbors), 9e10, dtype=float) + + chunk_size = min(256, n) + for chunk_start in range(0, n, chunk_size): + chunk_end = min(chunk_start + chunk_size, n) + chunk = clust_points_vector[chunk_start:chunk_end] + + if _scipy_cdist is not None: + dist = _scipy_cdist(chunk, clust_points_vector, metric="euclidean") + else: + diff = chunk[:, np.newaxis, :] - clust_points_vector[np.newaxis, :, :] + dist = np.sqrt((diff ** 2).sum(axis=2)) + + row_idx = np.arange(chunk_end - chunk_start) + dist[row_idx, row_idx + chunk_start] = np.inf + + knn_idx = np.argpartition(dist, n_neighbors, axis=1)[:, :n_neighbors] + knn_dists = np.take_along_axis(dist, knn_idx, axis=1) + + local_distances[chunk_start:chunk_end] = knn_dists + local_indexes[chunk_start:chunk_end] = global_indices[knn_idx] + + return local_distances, local_indexes, global_indices + + # Custom Exceptions class PyNomalyError(Exception): """Base exception for PyNomaly.""" @@ -74,6 +175,9 @@ class LocalOutlierProbability(object): sample (optional, default 10) :param cluster_labels: a numpy array of cluster assignments w.r.t. each sample (optional, default None) + :param n_jobs: the number of parallel workers for distance computation. + Use -1 to use all available CPU cores, or 1 for sequential processing + (optional, default 1) :return: """ """ @@ -315,7 +419,8 @@ def new_f(*args, **kwds): "n_neighbors": {"type": types[5]}, "cluster_labels": {"type": types[6]}, "use_numba": {"type": types[7]}, - "progress_bar": {"type": types[8]}, + "n_jobs": {"type": types[8]}, + "progress_bar": {"type": types[9]}, } for x in kwds: opt_types[x]["value"] = kwds[x] @@ -348,6 +453,7 @@ def new_f(*args, **kwds): (int, np.integer), list, bool, + (int, np.integer), bool, ) def __init__( @@ -359,6 +465,7 @@ def __init__( n_neighbors=10, cluster_labels=None, use_numba=False, + n_jobs=1, progress_bar=False, ) -> None: self.data = data @@ -368,6 +475,7 @@ def __init__( self.n_neighbors = n_neighbors self.cluster_labels = cluster_labels self.use_numba = use_numba + self.n_jobs = n_jobs self.points_vector = None self.prob_distances = None self.prob_distances_ev = None @@ -383,6 +491,13 @@ def __init__( "Numba is not available, falling back to pure python mode.", UserWarning ) + if self.n_jobs < -1 or self.n_jobs == 0: + warnings.warn( + "n_jobs must be -1 or a positive integer. Defaulting to 1.", + UserWarning, + ) + self.n_jobs = 1 + self._validate_inputs() self._check_extent() @@ -444,10 +559,8 @@ def _norm_prob_outlier_factor( outlier factor of the input observation. :return: the normalized probabilistic outlier factor. """ - npofs = [] - for i in ev_probabilistic_outlier_factor: - npofs.append(extent * sqrt(i)) - return npofs + ev_arr = np.array(ev_probabilistic_outlier_factor, dtype=float) + return (extent * np.sqrt(ev_arr)).tolist() @staticmethod def _local_outlier_probability( @@ -461,11 +574,14 @@ def _local_outlier_probability( input observation. :return: the local outlier probability. """ - erf_vec = np.vectorize(erf) if np.all(plof_val == nplof_val): return np.zeros(plof_val.shape) - else: - return np.maximum(0, erf_vec(plof_val / (nplof_val * np.sqrt(2.0)))) + plof_f = np.asarray(plof_val, dtype=float) + nplof_f = np.asarray(nplof_val, dtype=float) + if _scipy_erf is not None: + return np.maximum(0, _scipy_erf(plof_f / (nplof_f * np.sqrt(2.0)))) + erf_vec = np.vectorize(erf) + return np.maximum(0, erf_vec(plof_f / (nplof_f * np.sqrt(2.0)))) def _n_observations(self) -> int: """ @@ -564,6 +680,102 @@ def _compute_distance_and_neighbor_matrix( yield distances, indexes, i + def _distances_vectorized( + self, clusters, distances, indexes, progress_bar + ) -> None: + """Vectorized kNN distance computation with chunked progress.""" + progress = "=" + total_points = sum(cv.shape[0] for cv, _ in clusters) + completed = 0 + + for clust_points_vector, global_indices in clusters: + n = clust_points_vector.shape[0] + + if clust_points_vector.ndim == 1: + clust_points_vector = clust_points_vector.reshape(-1, 1) + + chunk_size = min(256, n) + for chunk_start in range(0, n, chunk_size): + chunk_end = min(chunk_start + chunk_size, n) + chunk = clust_points_vector[chunk_start:chunk_end] + + if _scipy_cdist is not None: + dist = _scipy_cdist( + chunk, clust_points_vector, metric="euclidean" + ) + else: + diff = ( + chunk[:, np.newaxis, :] + - clust_points_vector[np.newaxis, :, :] + ) + dist = np.sqrt((diff ** 2).sum(axis=2)) + + row_idx = np.arange(chunk_end - chunk_start) + dist[row_idx, row_idx + chunk_start] = np.inf + + knn_idx = np.argpartition(dist, self.n_neighbors, axis=1)[ + :, : self.n_neighbors + ] + knn_dists = np.take_along_axis(dist, knn_idx, axis=1) + + chunk_global = global_indices[chunk_start:chunk_end] + distances[chunk_global] = knn_dists + indexes[chunk_global] = global_indices[knn_idx] + + completed += chunk_end - chunk_start + if progress_bar: + progress = Utils.emit_progress_bar( + progress, completed, total_points + ) + + def _distances_numba( + self, clusters, distances, indexes, progress_bar, parallel=False + ) -> None: + """Numba-accelerated distance computation.""" + progress = "=" + kernel = _numba_kernel_parallel if parallel else _numba_kernel_sequential + + for idx, (clust_points_vector, global_indices) in enumerate(clusters): + if clust_points_vector.ndim == 1: + clust_points_vector = clust_points_vector.reshape(-1, 1) + + local_dists, local_idxs = kernel( + clust_points_vector.astype(np.float64), self.n_neighbors + ) + + distances[global_indices] = local_dists + indexes[global_indices] = global_indices[local_idxs] + + if progress_bar: + progress = Utils.emit_progress_bar( + progress, idx + 1, len(clusters) + ) + + def _distances_parallel( + self, clusters, distances, indexes, n_jobs, progress_bar + ) -> None: + """Parallel distance computation across clusters via multiprocessing.""" + progress = "=" + worker_args = [ + (cv, gi, self.n_neighbors) for cv, gi in clusters + ] + + with ProcessPoolExecutor(max_workers=n_jobs) as executor: + futures = { + executor.submit(_cluster_distances_worker, args): idx + for idx, args in enumerate(worker_args) + } + completed_clusters = 0 + for future in as_completed(futures): + local_dists, local_idxs, global_indices = future.result() + distances[global_indices] = local_dists + indexes[global_indices] = local_idxs + completed_clusters += 1 + if progress_bar: + progress = Utils.emit_progress_bar( + progress, completed_clusters, len(clusters) + ) + def _distances(self, progress_bar: bool = False) -> None: """ Provides the distances between each observation and it's closest @@ -576,27 +788,40 @@ def _distances(self, progress_bar: bool = False) -> None: distances = np.full( [self._n_observations(), self.n_neighbors], 9e10, dtype=float ) - indexes = np.full([self._n_observations(), self.n_neighbors], 9e10, dtype=float) - self.points_vector = self._convert_to_array(self.data) - compute = ( - numba.jit(self._compute_distance_and_neighbor_matrix, cache=True) - if self.use_numba - else self._compute_distance_and_neighbor_matrix + indexes = np.full( + [self._n_observations(), self.n_neighbors], 9e10, dtype=float ) - progress = "=" - for cluster_id in set(self._cluster_labels()): - indices = np.where(self._cluster_labels() == cluster_id) + self.points_vector = self._convert_to_array(self.data) + + cluster_labels = self._cluster_labels() + cluster_ids = sorted(set(cluster_labels)) + + clusters = [] + for cluster_id in cluster_ids: + indices = np.where(cluster_labels == cluster_id) clust_points_vector = np.array( self.points_vector.take(indices, axis=0)[0], dtype=np.float64 ) - # a generator that yields an updated distance matrix on each loop - for c in compute(clust_points_vector, indices, distances, indexes): - distances, indexes, i = c - # update the progress bar - if progress_bar is True: - progress = Utils.emit_progress_bar( - progress, i + 1, clust_points_vector.shape[0] - ) + clusters.append((clust_points_vector, indices[0])) + + n_jobs = self.n_jobs + if n_jobs == -1: + n_jobs = os.cpu_count() or 1 + n_jobs = min(n_jobs, len(clusters)) + + if self.use_numba: + self._distances_numba( + clusters, distances, indexes, progress_bar, + parallel=(n_jobs > 1) + ) + elif n_jobs > 1 and len(clusters) > 1: + self._distances_parallel( + clusters, distances, indexes, n_jobs, progress_bar + ) + else: + self._distances_vectorized( + clusters, distances, indexes, progress_bar + ) self.distance_matrix = distances self.neighbor_matrix = indexes @@ -626,18 +851,14 @@ def _standard_distances(self, data_store: np.ndarray) -> np.ndarray: Calculated the standard distance for each observation in the input data. First calculates the cardinality and then calculates the standard distance with respect to each observation. - :param data_store: :param data_store: the storage matrix that collects information on each observation. :return: the updated storage matrix that collects information on each observation. """ - cardinality = [self.n_neighbors] * self._n_observations() - vals = data_store[:, 3].tolist() - std_distances = [] - for c, v in zip(cardinality, vals): - std_distances.append(self._standard_distance(c, v)) - return np.hstack((data_store, np.array([std_distances]).T)) + ssd_vals = data_store[:, 3].astype(float) + std_distances = np.sqrt(ssd_vals / self.n_neighbors) + return np.hstack((data_store, std_distances.reshape(-1, 1))) def _prob_distances(self, data_store: np.ndarray) -> np.ndarray: """ @@ -648,10 +869,8 @@ def _prob_distances(self, data_store: np.ndarray) -> np.ndarray: :return: the updated storage matrix that collects information on each observation. """ - prob_distances = [] - for i in range(data_store[:, 4].shape[0]): - prob_distances.append(self._prob_distance(self.extent, data_store[:, 4][i])) - return np.hstack((data_store, np.array([prob_distances]).T)) + prob_distances = self.extent * data_store[:, 4].astype(float) + return np.hstack((data_store, prob_distances.reshape(-1, 1))) def _prob_distances_ev(self, data_store) -> np.ndarray: """ diff --git a/README.md b/README.md index d639fd8..4faab28 100644 --- a/README.md +++ b/README.md @@ -43,12 +43,14 @@ to calculate the Local Outlier Probability of each sample. - numpy >= 1.16.3 - python-utils >= 2.3.0 - (optional) numba >= 0.45.1 +- (optional) scipy >= 1.3.0 -Numba just-in-time (JIT) compiles the function with calculates the Euclidean +Numba just-in-time (JIT) compiles the function which calculates the Euclidean distance between observations, providing a reduction in computation time (significantly when a large number of observations are scored). Numba is not a requirement and PyNomaly may still be used solely with numpy if desired -(details below). +(details below). When scipy is available, PyNomaly uses its optimized distance +computation and error function implementations for additional performance gains. ## Quick Start @@ -101,6 +103,36 @@ normalization scheme prior to using LoOP, especially when working with multiple Users must also appropriately handle missing values prior to using LoOP, as LoOP does not support Pandas DataFrames or Numpy arrays with missing values. +### Parallel Processing + +PyNomaly supports parallel distance computation via the `n_jobs` parameter. +When data includes multiple clusters (via `cluster_labels`), each cluster's +distance computation is independent and can be processed across multiple CPU cores: + +```python +from PyNomaly import loop +from sklearn.cluster import DBSCAN +db = DBSCAN(eps=0.6, min_samples=50).fit(data) +m = loop.LocalOutlierProbability( + data, extent=2, n_neighbors=20, + cluster_labels=list(db.labels_), n_jobs=-1 +).fit() +scores = m.local_outlier_probabilities +print(scores) +``` + +Set `n_jobs=-1` to use all available CPU cores, or specify a positive integer +for a fixed number of workers. The default value of `n_jobs=1` runs sequentially. +Parallelism is most beneficial when the data contains multiple clusters. For +single-cluster data, `n_jobs` has no effect as there is only one unit of work. + +When using Numba (`use_numba=True`) with `n_jobs > 1`, PyNomaly uses Numba's +thread-level parallelism (`prange`) instead of multiprocessing. + +Note that parallel processing is only applicable when raw input data is provided. +If a pre-existing distance matrix is provided, the distance computation step is +skipped entirely and `n_jobs` has no effect. + ### Utilizing Numba and Progress Bars It may be helpful to use just-in-time (JIT) compilation in the cases where a lot of @@ -117,10 +149,12 @@ print(scores) Numba must be installed if the above to use JIT compilation and improve the speed of multiple calls to `LocalOutlierProbability()`, and PyNomaly has been tested with Numba version 0.45.1. An example of the speed difference that can -be realized with using Numba is avaialble in `examples/numba_speed_diff.py`. +be realized with using Numba is available in `examples/numba_speed_diff.py`. -You may also choose to print progress bars _with our without_ the use of numba +You may also choose to print progress bars _with or without_ the use of numba by passing `progress_bar=True` to the `LocalOutlierProbability()` method as above. +Progress bars are supported across all execution modes (sequential, parallel, +and Numba). ### Choosing Parameters diff --git a/tests/test_loop.py b/tests/test_loop.py index b9f4aaf..4458e91 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -827,3 +827,53 @@ def test_distance_matrix_consistency(X_n120) -> None: # Compare scores allowing for minor floating-point differences assert_array_almost_equal(scores_data, scores_dist, decimal=10, err_msg="Inconsistent LoOP scores due to self-distances") + + +def test_n_jobs_equivalence(X_n140_outliers) -> None: + """ + Tests that n_jobs > 1 produces equivalent results to n_jobs=1 when + using cluster labels (multiple clusters processed in parallel). + """ + a = [0] * 120 + b = [1] * 20 + cluster_labels = a + b + + clf_seq = loop.LocalOutlierProbability( + X_n140_outliers, n_neighbors=10, cluster_labels=cluster_labels, n_jobs=1 + ) + scores_seq = clf_seq.fit().local_outlier_probabilities + + clf_par = loop.LocalOutlierProbability( + X_n140_outliers, n_neighbors=10, cluster_labels=cluster_labels, n_jobs=2 + ) + scores_par = clf_par.fit().local_outlier_probabilities + + assert_array_almost_equal(scores_seq, scores_par, decimal=10) + + +def test_n_jobs_single_cluster(X_n120) -> None: + """ + Tests that n_jobs=-1 works correctly with a single cluster (falls back + to sequential since there is only one cluster to process). + """ + clf1 = loop.LocalOutlierProbability(X_n120, n_neighbors=10, n_jobs=1) + scores1 = clf1.fit().local_outlier_probabilities + + clf2 = loop.LocalOutlierProbability(X_n120, n_neighbors=10, n_jobs=-1) + scores2 = clf2.fit().local_outlier_probabilities + + assert_array_almost_equal(scores1, scores2, decimal=10) + + +def test_n_jobs_invalid() -> None: + """ + Tests that invalid n_jobs values produce a warning and default to 1. + """ + X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]]) + + with pytest.warns(UserWarning) as record: + clf = loop.LocalOutlierProbability(X, n_neighbors=2, n_jobs=0) + + messages = [r.message.args[0] for r in record] + assert any("n_jobs must be -1 or a positive integer" in m for m in messages) + assert clf.n_jobs == 1 From 8dd865a0a9c6bc01785a47ba177778b8f08cc86c Mon Sep 17 00:00:00 2001 From: vc1492a Date: Fri, 20 Mar 2026 11:16:37 -0700 Subject: [PATCH 5/5] chore: bump version to 0.4.0 and update changelog Update version across loop.py, setup.py, and README badge. Add changelog entry documenting all new features and improvements. Made-with: Cursor --- PyNomaly/loop.py | 2 +- README.md | 2 +- changelog.md | 21 +++++++++++++++++++++ setup.py | 4 ++-- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/PyNomaly/loop.py b/PyNomaly/loop.py index 46bdf31..d3dcc84 100644 --- a/PyNomaly/loop.py +++ b/PyNomaly/loop.py @@ -78,7 +78,7 @@ def _numba_distance_kernel_sequential(clust_points_vector, n_neighbors): pass __author__ = "Valentino Constantinou" -__version__ = "0.3.5" +__version__ = "0.4.0" __license__ = "Apache License, Version 2.0" diff --git a/README.md b/README.md index 4faab28..261a6bd 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ scores in the range of [0,1] that are directly interpretable as the probability PyNomaly is a core library of [deepchecks](https://github.com/deepchecks/deepchecks), [OmniDocBench](https://github.com/opendatalab/OmniDocBench) and [pysad](https://github.com/selimfirat/pysad). [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -[![PyPi](https://img.shields.io/badge/pypi-0.3.5-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.3.5) +[![PyPi](https://img.shields.io/badge/pypi-0.4.0-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.4.0) [![Total Downloads](https://static.pepy.tech/badge/pynomaly)](https://pepy.tech/projects/pynomaly) [![Monthly Downloads](https://static.pepy.tech/badge/pynomaly/month)](https://pepy.tech/projects/pynomaly) ![Tests](https://github.com/vc1492a/PyNomaly/actions/workflows/tests.yml/badge.svg) diff --git a/changelog.md b/changelog.md index ee92cdd..d845315 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,27 @@ All notable changes to PyNomaly will be documented in this Changelog. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## 0.4.0 +### Added +- Parallel distance computation via the new `n_jobs` parameter. Set `n_jobs=-1` +to use all available CPU cores for cross-cluster multiprocessing, or specify a +positive integer for a fixed number of workers +([Issue #36](https://github.com/vc1492a/PyNomaly/issues/36)). +- Numba `prange`-based parallel kernels for thread-level parallelism when +`use_numba=True` and `n_jobs > 1`. +- Optional `scipy` acceleration: uses `scipy.spatial.distance.cdist` for +distance computation and `scipy.special.erf` for the error function when +scipy is available, with graceful fallback to pure NumPy. +### Changed +- Replaced the O(n^2) Python nested loop for distance computation with a +vectorized NumPy implementation using chunked broadcasting. Progress bar +support is preserved via chunk-level updates. +- Restructured the Numba JIT path from a generator-based approach (incompatible +with Numba's parallel mode) to non-generator kernels that support `numba.prange`. +- Vectorized `_standard_distances`, `_prob_distances`, and +`_norm_prob_outlier_factor` pipeline methods, replacing Python `for` loops +with NumPy array operations. + ## 0.3.5 ### Changed - Refactored the `Validate` class by dissolving it and moving validation methods diff --git a/setup.py b/setup.py index d58b87e..a73829b 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='PyNomaly', packages=['PyNomaly'], - version='0.3.5', + version='0.4.0', description='A Python 3 implementation of LoOP: Local Outlier ' 'Probabilities, a local density based outlier detection ' 'method providing an outlier score in the range of [0,1].', @@ -16,7 +16,7 @@ long_description=long_description, long_description_content_type='text/markdown', url='https://github.com/vc1492a/PyNomaly', - download_url='https://github.com/vc1492a/PyNomaly/archive/0.3.5.tar.gz', + download_url='https://github.com/vc1492a/PyNomaly/archive/0.4.0.tar.gz', keywords=['outlier', 'anomaly', 'detection', 'machine', 'learning', 'probability'], classifiers=[],