Skip to content

Commit 3cd993c

Browse files
committed
Add unit tests to increase coverage from 65% to 68%
Add 103 new unit tests across 6 test files targeting modules with the lowest coverage: - test_gpx_serializer.py: GPX serialization (32% -> 99%) - test_http.py: HTTP utilities, truncation, sanitization (23% -> 55%) - test_api_v4.py: API client, auth error handling (33% -> 70%) - test_ipc.py: IPC write/send (0% -> 100%) - test_history.py: Upload history read/write (59% -> 81%) - test_gpmf_gps_filter.py: GPS filtering and outlier removal (22% -> 97%)
1 parent 43cbcaf commit 3cd993c

File tree

6 files changed

+902
-0
lines changed

6 files changed

+902
-0
lines changed

tests/unit/test_api_v4.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
#
3+
# This source code is licensed under the BSD license found in the
4+
# LICENSE file in the root directory of this source tree.
5+
6+
from unittest.mock import MagicMock
7+
8+
import pytest
9+
import requests
10+
11+
from mapillary_tools import api_v4
12+
13+
14+
class TestCreateSessions:
15+
def test_create_user_session_sets_oauth_header(self):
16+
session = api_v4.create_user_session("test_token_123")
17+
assert session.headers["Authorization"] == "OAuth test_token_123"
18+
19+
def test_create_client_session_sets_oauth_header(self):
20+
session = api_v4.create_client_session()
21+
assert session.headers["Authorization"].startswith("OAuth ")
22+
23+
24+
class TestIsAuthError:
25+
def _make_response(self, status_code: int, json_data=None):
26+
resp = MagicMock(spec=requests.Response)
27+
resp.status_code = status_code
28+
if json_data is not None:
29+
resp.json.return_value = json_data
30+
else:
31+
resp.json.side_effect = Exception("no json")
32+
return resp
33+
34+
def test_401_is_auth_error(self):
35+
resp = self._make_response(401)
36+
assert api_v4.is_auth_error(resp) is True
37+
38+
def test_403_is_auth_error(self):
39+
resp = self._make_response(403)
40+
assert api_v4.is_auth_error(resp) is True
41+
42+
def test_400_with_not_authorized_type(self):
43+
resp = self._make_response(
44+
400,
45+
json_data={"debug_info": {"type": "NotAuthorizedError"}},
46+
)
47+
assert api_v4.is_auth_error(resp) is True
48+
49+
def test_400_without_auth_type(self):
50+
resp = self._make_response(
51+
400,
52+
json_data={"debug_info": {"type": "SomeOtherError"}},
53+
)
54+
assert api_v4.is_auth_error(resp) is False
55+
56+
def test_400_no_json(self):
57+
resp = self._make_response(400)
58+
assert api_v4.is_auth_error(resp) is False
59+
60+
def test_200_is_not_auth_error(self):
61+
resp = self._make_response(200)
62+
assert api_v4.is_auth_error(resp) is False
63+
64+
def test_500_is_not_auth_error(self):
65+
resp = self._make_response(500)
66+
assert api_v4.is_auth_error(resp) is False
67+
68+
69+
class TestExtractAuthErrorMessage:
70+
def _make_auth_response(self, status_code: int, json_data=None, text: str = ""):
71+
resp = MagicMock(spec=requests.Response)
72+
resp.status_code = status_code
73+
resp.text = text
74+
if json_data is not None:
75+
resp.json.return_value = json_data
76+
else:
77+
resp.json.side_effect = Exception("no json")
78+
return resp
79+
80+
def test_graph_api_error_message(self):
81+
resp = self._make_auth_response(
82+
401,
83+
json_data={"error": {"message": "Invalid token"}},
84+
)
85+
assert api_v4.extract_auth_error_message(resp) == "Invalid token"
86+
87+
def test_upload_service_error_message(self):
88+
resp = self._make_auth_response(
89+
403,
90+
json_data={"debug_info": {"message": "Forbidden access"}},
91+
)
92+
assert api_v4.extract_auth_error_message(resp) == "Forbidden access"
93+
94+
def test_fallback_to_text(self):
95+
resp = self._make_auth_response(
96+
401,
97+
json_data={},
98+
text="Unauthorized",
99+
)
100+
assert api_v4.extract_auth_error_message(resp) == "Unauthorized"
101+
102+
def test_no_json_fallback(self):
103+
resp = self._make_auth_response(
104+
401,
105+
text="Auth failed",
106+
)
107+
assert api_v4.extract_auth_error_message(resp) == "Auth failed"
108+
109+
110+
class TestJsonifyResponse:
111+
def test_invalid_json_raises_http_content_error(self):
112+
resp = MagicMock(spec=requests.Response)
113+
resp.json.side_effect = requests.JSONDecodeError("err", "", 0)
114+
with pytest.raises(api_v4.HTTPContentError) as exc_info:
115+
api_v4.jsonify_response(resp)
116+
assert exc_info.value.response is resp

tests/unit/test_gpmf_gps_filter.py

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
#
3+
# This source code is licensed under the BSD license found in the
4+
# LICENSE file in the root directory of this source tree.
5+
6+
from __future__ import annotations
7+
8+
import statistics
9+
10+
import pytest
11+
12+
from mapillary_tools.geo import Point
13+
from mapillary_tools.gpmf import gps_filter
14+
from mapillary_tools.gpmf.gpmf_gps_filter import remove_noisy_points, remove_outliers
15+
from mapillary_tools.telemetry import GPSFix, GPSPoint
16+
17+
18+
def _make_point(time: float, lat: float, lon: float) -> Point:
19+
return Point(time=time, lat=lat, lon=lon, alt=None, angle=None)
20+
21+
22+
def _make_gps_point(
23+
time: float,
24+
lat: float,
25+
lon: float,
26+
fix: GPSFix | None = GPSFix.FIX_3D,
27+
precision: float | None = 100,
28+
ground_speed: float | None = 5.0,
29+
) -> GPSPoint:
30+
return GPSPoint(
31+
time=time,
32+
lat=lat,
33+
lon=lon,
34+
alt=None,
35+
angle=None,
36+
epoch_time=None,
37+
fix=fix,
38+
precision=precision,
39+
ground_speed=ground_speed,
40+
)
41+
42+
43+
# --- Tests for gps_filter module ---
44+
45+
46+
class TestCalculatePointSpeed:
47+
def test_same_point_zero_time(self):
48+
p = _make_point(0.0, 48.0, 11.0)
49+
speed = gps_filter.calculate_point_speed(p, p)
50+
assert speed == float("inf")
51+
52+
def test_same_point_different_time(self):
53+
p1 = _make_point(0.0, 48.0, 11.0)
54+
p2 = _make_point(10.0, 48.0, 11.0)
55+
speed = gps_filter.calculate_point_speed(p1, p2)
56+
assert speed == 0.0
57+
58+
def test_speed_calculation(self):
59+
p1 = _make_point(0.0, 0.0, 0.0)
60+
p2 = _make_point(10.0, 0.001, 0.0) # ~111 meters
61+
speed = gps_filter.calculate_point_speed(p1, p2)
62+
assert 10 < speed < 12 # ~11.1 m/s
63+
64+
65+
class TestSplitIf:
66+
def test_empty_list(self):
67+
assert gps_filter.split_if([], lambda a, b: True) == []
68+
69+
def test_single_point(self):
70+
p = _make_point(0.0, 0.0, 0.0)
71+
result = gps_filter.split_if([p], lambda a, b: True)
72+
assert len(result) == 1
73+
assert result[0] == [p]
74+
75+
def test_no_splits(self):
76+
points = [_make_point(float(i), 0.0, 0.0) for i in range(5)]
77+
result = gps_filter.split_if(points, lambda a, b: False)
78+
assert len(result) == 1
79+
assert len(result[0]) == 5
80+
81+
def test_split_every_point(self):
82+
points = [_make_point(float(i), 0.0, 0.0) for i in range(5)]
83+
result = gps_filter.split_if(points, lambda a, b: True)
84+
assert len(result) == 5
85+
for seq in result:
86+
assert len(seq) == 1
87+
88+
89+
class TestDistanceGt:
90+
def test_close_points_not_split(self):
91+
decider = gps_filter.distance_gt(100000)
92+
p1 = _make_point(0.0, 48.0, 11.0)
93+
p2 = _make_point(1.0, 48.001, 11.001)
94+
assert decider(p1, p2) is False
95+
96+
def test_far_points_split(self):
97+
decider = gps_filter.distance_gt(100)
98+
p1 = _make_point(0.0, 0.0, 0.0)
99+
p2 = _make_point(1.0, 1.0, 1.0)
100+
assert decider(p1, p2) is True
101+
102+
103+
class TestSpeedLe:
104+
def test_slow_speed_true(self):
105+
decider = gps_filter.speed_le(1000)
106+
p1 = _make_point(0.0, 48.0, 11.0)
107+
p2 = _make_point(10.0, 48.001, 11.001)
108+
assert decider(p1, p2) is True
109+
110+
def test_fast_speed_false(self):
111+
decider = gps_filter.speed_le(0.001)
112+
p1 = _make_point(0.0, 0.0, 0.0)
113+
p2 = _make_point(1.0, 1.0, 1.0)
114+
assert decider(p1, p2) is False
115+
116+
117+
class TestUpperWhiskerEdge:
118+
def test_raises_on_single_value(self):
119+
with pytest.raises(statistics.StatisticsError):
120+
gps_filter.upper_whisker([1])
121+
122+
def test_even_length(self):
123+
# [1, 2, 3, 4] -> q1=1.5, q3=3.5, irq=2, upper=3.5+3=6.5
124+
assert gps_filter.upper_whisker([1, 2, 3, 4]) == 6.5
125+
126+
def test_odd_length(self):
127+
# [1, 2, 3, 4, 5] -> q1=median([1,2])=1.5, q3=median([4,5])=4.5, irq=3, upper=4.5+4.5=9.0
128+
assert gps_filter.upper_whisker([1, 2, 3, 4, 5]) == 9.0
129+
130+
131+
# --- Tests for gpmf_gps_filter module ---
132+
133+
134+
class TestRemoveNoisyPoints:
135+
def test_empty_sequence(self):
136+
result = remove_noisy_points([])
137+
assert list(result) == []
138+
139+
def test_all_good_points(self):
140+
points = [
141+
_make_gps_point(
142+
float(i), 48.0 + i * 0.0001, 11.0, fix=GPSFix.FIX_3D, precision=100
143+
)
144+
for i in range(10)
145+
]
146+
result = remove_noisy_points(points)
147+
assert len(result) == len(points)
148+
149+
def test_filters_bad_fix(self):
150+
good_0 = _make_gps_point(0.0, 48.0, 11.0, fix=GPSFix.FIX_3D)
151+
bad_1 = _make_gps_point(1.0, 48.001, 11.001, fix=GPSFix.NO_FIX)
152+
good_2 = _make_gps_point(2.0, 48.002, 11.002, fix=GPSFix.FIX_3D)
153+
result = list(remove_noisy_points([good_0, bad_1, good_2]))
154+
# NO_FIX point should be removed; FIX_3D points kept
155+
assert bad_1 not in result
156+
assert good_0 in result
157+
assert good_2 in result
158+
159+
def test_filters_high_precision(self):
160+
good_0 = _make_gps_point(0.0, 48.0, 11.0, precision=100)
161+
bad_1 = _make_gps_point(1.0, 48.001, 11.001, precision=9999) # Very high DOP
162+
good_2 = _make_gps_point(2.0, 48.002, 11.002, precision=100)
163+
result = list(remove_noisy_points([good_0, bad_1, good_2]))
164+
# High DOP point should be removed; low DOP points kept
165+
assert bad_1 not in result
166+
assert good_0 in result
167+
assert good_2 in result
168+
169+
def test_none_fix_kept(self):
170+
"""Points without GPS fix info should be kept."""
171+
points = [
172+
_make_gps_point(0.0, 48.0, 11.0, fix=None),
173+
_make_gps_point(1.0, 48.001, 11.001, fix=None),
174+
]
175+
result = remove_noisy_points(points)
176+
assert len(result) == 2
177+
178+
def test_none_precision_kept(self):
179+
"""Points without precision info should be kept."""
180+
points = [
181+
_make_gps_point(0.0, 48.0, 11.0, precision=None),
182+
_make_gps_point(1.0, 48.001, 11.001, precision=None),
183+
]
184+
result = remove_noisy_points(points)
185+
assert len(result) == 2
186+
187+
188+
class TestRemoveOutliers:
189+
def test_short_sequence_unchanged(self):
190+
points = [
191+
_make_gps_point(0.0, 48.0, 11.0),
192+
]
193+
result = remove_outliers(points)
194+
assert len(result) == 1
195+
196+
def test_no_ground_speed_returns_original(self):
197+
points = [
198+
_make_gps_point(0.0, 48.0, 11.0, ground_speed=None),
199+
_make_gps_point(1.0, 48.001, 11.001, ground_speed=None),
200+
_make_gps_point(2.0, 48.002, 11.002, ground_speed=None),
201+
]
202+
result = remove_outliers(points)
203+
assert len(result) == len(points)
204+
205+
def test_consistent_sequence_kept(self):
206+
points = [
207+
_make_gps_point(
208+
float(i), 48.0 + i * 0.0001, 11.0 + i * 0.0001, ground_speed=5.0
209+
)
210+
for i in range(10)
211+
]
212+
result = remove_outliers(points)
213+
assert len(result) == len(points)
214+
215+
def test_outlier_removed(self):
216+
"""A point far away from a consistent cluster should be dropped."""
217+
# 9 points in a tight cluster, then 1 point far away
218+
cluster = [
219+
_make_gps_point(
220+
float(i), 48.0 + i * 0.00001, 11.0 + i * 0.00001, ground_speed=1.0
221+
)
222+
for i in range(9)
223+
]
224+
outlier = _make_gps_point(9.0, 10.0, 10.0, ground_speed=1.0)
225+
result = remove_outliers(cluster + [outlier])
226+
# The outlier is far from the cluster and should be removed
227+
assert len(result) < len(cluster) + 1
228+
# The cluster points should survive
229+
assert len(result) >= len(cluster)

0 commit comments

Comments
 (0)