Skip to content

Commit 469a510

Browse files
Create approx_nearest_neighbours.py
1 parent 7530a41 commit 469a510

File tree

1 file changed

+107
-0
lines changed

1 file changed

+107
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
Approximate Nearest Neighbor (ANN) Search
3+
https://en.wikipedia.org/wiki/Nearest_neighbor_search#Approximate_nearest_neighbor
4+
5+
ANN search finds "close enough" vectors instead of the exact nearest neighbor,
6+
which makes it much faster for large datasets.
7+
This implementation uses a simple **random projection hashing** method.
8+
9+
Steps:
10+
1. Generate random hyperplanes to hash vectors into buckets.
11+
2. Place dataset vectors into buckets.
12+
3. For a query vector, look into its bucket (and maybe nearby buckets).
13+
4. Return the approximate nearest neighbor from those candidates.
14+
15+
Each result contains:
16+
1. The nearest (approximate) vector.
17+
2. Its distance from the query vector.
18+
"""
19+
20+
from __future__ import annotations
21+
import math
22+
import numpy as np
23+
from numpy.linalg import norm
24+
from collections import defaultdict
25+
26+
27+
def euclidean(input_a: np.ndarray, input_b: np.ndarray) -> float:
28+
"""
29+
Calculates Euclidean distance between two vectors.
30+
31+
>>> euclidean(np.array([0]), np.array([1]))
32+
1.0
33+
>>> euclidean(np.array([1, 2]), np.array([1, 5]))
34+
3.0
35+
"""
36+
return math.sqrt(sum(pow(a - b, 2) for a, b in zip(input_a, input_b)))
37+
38+
39+
class ANN:
40+
"""
41+
Approximate Nearest Neighbor using random projection hashing.
42+
"""
43+
44+
def __init__(self, dataset: np.ndarray, n_planes: int = 5, seed: int = 42):
45+
"""
46+
:param dataset: ndarray of shape (n_samples, n_features)
47+
:param n_planes: number of random hyperplanes for hashing
48+
:param seed: random seed for reproducibility
49+
"""
50+
self.dataset = dataset
51+
np.random.seed(seed)
52+
self.n_planes = n_planes
53+
self.planes = np.random.randn(n_planes, dataset.shape[1]) # random hyperplanes
54+
self.buckets = defaultdict(list)
55+
self._build_index()
56+
57+
def _hash_vector(self, vec: np.ndarray) -> str:
58+
"""
59+
Hash a vector based on which side of each hyperplane it falls on.
60+
Returns a bit string.
61+
"""
62+
signs = (vec @ self.planes.T) >= 0
63+
return "".join(["1" if s else "0" for s in signs])
64+
65+
def _build_index(self):
66+
"""
67+
Build hash buckets for all dataset vectors.
68+
"""
69+
for vec in self.dataset:
70+
h = self._hash_vector(vec)
71+
self.buckets[h].append(vec)
72+
73+
def query(self, q: np.ndarray) -> list[list[list[float] | float]]:
74+
"""
75+
Find approximate nearest neighbor for query vector(s).
76+
77+
:param q: ndarray of shape (m, n_features)
78+
:return: list of [nearest_vector, distance]
79+
80+
>>> dataset = np.array([[0,0], [1,1], [2,2], [10,10]])
81+
>>> ann = ANN(dataset, n_planes=4, seed=0)
82+
>>> ann.query(np.array([[0,1]])) # doctest: +NORMALIZE_WHITESPACE
83+
[[[0, 0], 1.0]]
84+
"""
85+
results = []
86+
for vec in q:
87+
h = self._hash_vector(vec)
88+
candidates = self.buckets[h]
89+
90+
if not candidates: # fallback: search entire dataset
91+
candidates = self.dataset
92+
93+
# Approximate NN search among candidates
94+
best_vec = candidates[0]
95+
best_dist = euclidean(vec, best_vec)
96+
for cand in candidates[1:]:
97+
d = euclidean(vec, cand)
98+
if d < best_dist:
99+
best_vec, best_dist = cand, d
100+
results.append([best_vec.tolist(), best_dist])
101+
return results
102+
103+
104+
if __name__ == "__main__":
105+
import doctest
106+
107+
doctest.testmod()

0 commit comments

Comments
 (0)