Skip to content

Commit 99237ae

Browse files
committed
Add unit tests to increase coverage from 65% to 68%
Add 103 new unit tests across 6 test files targeting modules with the lowest coverage: - test_gpx_serializer.py: GPX serialization (32% -> 99%) - test_http.py: HTTP utilities, truncation, sanitization (23% -> 55%) - test_api_v4.py: API client, auth error handling (33% -> 70%) - test_ipc.py: IPC write/send (0% -> 100%) - test_history.py: Upload history read/write (59% -> 81%) - test_gpmf_gps_filter.py: GPS filtering and outlier removal (22% -> 97%)
1 parent 43cbcaf commit 99237ae

6 files changed

Lines changed: 982 additions & 0 deletions

File tree

tests/unit/test_api_v4.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
#
3+
# This source code is licensed under the BSD license found in the
4+
# LICENSE file in the root directory of this source tree.
5+
6+
from unittest.mock import MagicMock
7+
8+
import pytest
9+
import requests
10+
11+
from mapillary_tools import api_v4
12+
13+
14+
class TestClusterFileType:
15+
def test_values(self):
16+
assert api_v4.ClusterFileType.ZIP.value == "zip"
17+
assert api_v4.ClusterFileType.BLACKVUE.value == "mly_blackvue_video"
18+
assert api_v4.ClusterFileType.CAMM.value == "mly_camm_video"
19+
assert api_v4.ClusterFileType.MLY_BUNDLE_MANIFEST.value == "mly_bundle_manifest"
20+
21+
22+
class TestCreateSessions:
23+
def test_create_user_session(self):
24+
session = api_v4.create_user_session("test_token_123")
25+
assert "Authorization" in session.headers
26+
assert session.headers["Authorization"] == "OAuth test_token_123"
27+
28+
def test_create_client_session(self):
29+
session = api_v4.create_client_session()
30+
assert "Authorization" in session.headers
31+
assert "OAuth" in session.headers["Authorization"]
32+
33+
def test_create_client_session_disable_logging(self):
34+
session = api_v4.create_client_session(disable_logging=True)
35+
assert session.disable_logging_request is True
36+
assert session.disable_logging_response is True
37+
38+
def test_create_client_session_enable_logging(self):
39+
session = api_v4.create_client_session(disable_logging=False)
40+
assert session.disable_logging_request is False
41+
assert session.disable_logging_response is False
42+
43+
44+
class TestIsAuthError:
45+
def _make_response(self, status_code: int, json_data=None):
46+
resp = MagicMock(spec=requests.Response)
47+
resp.status_code = status_code
48+
if json_data is not None:
49+
resp.json.return_value = json_data
50+
else:
51+
resp.json.side_effect = Exception("no json")
52+
return resp
53+
54+
def test_401_is_auth_error(self):
55+
resp = self._make_response(401)
56+
assert api_v4.is_auth_error(resp) is True
57+
58+
def test_403_is_auth_error(self):
59+
resp = self._make_response(403)
60+
assert api_v4.is_auth_error(resp) is True
61+
62+
def test_400_with_not_authorized_type(self):
63+
resp = self._make_response(
64+
400,
65+
json_data={"debug_info": {"type": "NotAuthorizedError"}},
66+
)
67+
assert api_v4.is_auth_error(resp) is True
68+
69+
def test_400_without_auth_type(self):
70+
resp = self._make_response(
71+
400,
72+
json_data={"debug_info": {"type": "SomeOtherError"}},
73+
)
74+
assert api_v4.is_auth_error(resp) is False
75+
76+
def test_400_no_json(self):
77+
resp = self._make_response(400)
78+
assert api_v4.is_auth_error(resp) is False
79+
80+
def test_200_is_not_auth_error(self):
81+
resp = self._make_response(200)
82+
assert api_v4.is_auth_error(resp) is False
83+
84+
def test_500_is_not_auth_error(self):
85+
resp = self._make_response(500)
86+
assert api_v4.is_auth_error(resp) is False
87+
88+
89+
class TestExtractAuthErrorMessage:
90+
def _make_auth_response(self, status_code: int, json_data=None, text: str = ""):
91+
resp = MagicMock(spec=requests.Response)
92+
resp.status_code = status_code
93+
resp.text = text
94+
if json_data is not None:
95+
resp.json.return_value = json_data
96+
else:
97+
resp.json.side_effect = Exception("no json")
98+
return resp
99+
100+
def test_graph_api_error_message(self):
101+
resp = self._make_auth_response(
102+
401,
103+
json_data={"error": {"message": "Invalid token"}},
104+
)
105+
assert api_v4.extract_auth_error_message(resp) == "Invalid token"
106+
107+
def test_upload_service_error_message(self):
108+
resp = self._make_auth_response(
109+
403,
110+
json_data={"debug_info": {"message": "Forbidden access"}},
111+
)
112+
assert api_v4.extract_auth_error_message(resp) == "Forbidden access"
113+
114+
def test_fallback_to_text(self):
115+
resp = self._make_auth_response(
116+
401,
117+
json_data={},
118+
text="Unauthorized",
119+
)
120+
assert api_v4.extract_auth_error_message(resp) == "Unauthorized"
121+
122+
def test_no_json_fallback(self):
123+
resp = self._make_auth_response(
124+
401,
125+
text="Auth failed",
126+
)
127+
assert api_v4.extract_auth_error_message(resp) == "Auth failed"
128+
129+
130+
class TestHTTPContentError:
131+
def test_exception_has_response(self):
132+
resp = MagicMock(spec=requests.Response)
133+
exc = api_v4.HTTPContentError("bad content", resp)
134+
assert exc.response is resp
135+
assert str(exc) == "bad content"
136+
137+
138+
class TestJsonifyResponse:
139+
def test_valid_json(self):
140+
resp = MagicMock(spec=requests.Response)
141+
resp.json.return_value = {"key": "value"}
142+
result = api_v4.jsonify_response(resp)
143+
assert result == {"key": "value"}
144+
145+
def test_invalid_json_raises(self):
146+
resp = MagicMock(spec=requests.Response)
147+
resp.json.side_effect = requests.JSONDecodeError("err", "", 0)
148+
with pytest.raises(api_v4.HTTPContentError):
149+
api_v4.jsonify_response(resp)

tests/unit/test_gpmf_gps_filter.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
#
3+
# This source code is licensed under the BSD license found in the
4+
# LICENSE file in the root directory of this source tree.
5+
6+
from __future__ import annotations
7+
8+
import statistics
9+
10+
import pytest
11+
12+
from mapillary_tools.geo import Point
13+
from mapillary_tools.gpmf import gps_filter
14+
from mapillary_tools.gpmf.gpmf_gps_filter import remove_noisy_points, remove_outliers
15+
from mapillary_tools.telemetry import GPSFix, GPSPoint
16+
17+
18+
def _make_point(time: float, lat: float, lon: float) -> Point:
19+
return Point(time=time, lat=lat, lon=lon, alt=None, angle=None)
20+
21+
22+
def _make_gps_point(
23+
time: float,
24+
lat: float,
25+
lon: float,
26+
fix: GPSFix | None = GPSFix.FIX_3D,
27+
precision: float | None = 100,
28+
ground_speed: float | None = 5.0,
29+
) -> GPSPoint:
30+
return GPSPoint(
31+
time=time,
32+
lat=lat,
33+
lon=lon,
34+
alt=None,
35+
angle=None,
36+
epoch_time=None,
37+
fix=fix,
38+
precision=precision,
39+
ground_speed=ground_speed,
40+
)
41+
42+
43+
# --- Tests for gps_filter module ---
44+
45+
46+
class TestCalculatePointSpeed:
47+
def test_same_point_zero_time(self):
48+
p = _make_point(0.0, 48.0, 11.0)
49+
speed = gps_filter.calculate_point_speed(p, p)
50+
assert speed == float("inf")
51+
52+
def test_same_point_different_time(self):
53+
p1 = _make_point(0.0, 48.0, 11.0)
54+
p2 = _make_point(10.0, 48.0, 11.0)
55+
speed = gps_filter.calculate_point_speed(p1, p2)
56+
assert speed == 0.0
57+
58+
def test_speed_calculation(self):
59+
p1 = _make_point(0.0, 0.0, 0.0)
60+
p2 = _make_point(10.0, 0.001, 0.0) # ~111 meters
61+
speed = gps_filter.calculate_point_speed(p1, p2)
62+
assert 10 < speed < 12 # ~11.1 m/s
63+
64+
65+
class TestSplitIf:
66+
def test_empty_list(self):
67+
assert gps_filter.split_if([], lambda a, b: True) == []
68+
69+
def test_single_point(self):
70+
p = _make_point(0.0, 0.0, 0.0)
71+
result = gps_filter.split_if([p], lambda a, b: True)
72+
assert len(result) == 1
73+
assert result[0] == [p]
74+
75+
def test_no_splits(self):
76+
points = [_make_point(float(i), 0.0, 0.0) for i in range(5)]
77+
result = gps_filter.split_if(points, lambda a, b: False)
78+
assert len(result) == 1
79+
assert len(result[0]) == 5
80+
81+
def test_split_every_point(self):
82+
points = [_make_point(float(i), 0.0, 0.0) for i in range(5)]
83+
result = gps_filter.split_if(points, lambda a, b: True)
84+
assert len(result) == 5
85+
for seq in result:
86+
assert len(seq) == 1
87+
88+
89+
class TestDistanceGt:
90+
def test_close_points_not_split(self):
91+
decider = gps_filter.distance_gt(100000)
92+
p1 = _make_point(0.0, 48.0, 11.0)
93+
p2 = _make_point(1.0, 48.001, 11.001)
94+
assert decider(p1, p2) is False
95+
96+
def test_far_points_split(self):
97+
decider = gps_filter.distance_gt(100)
98+
p1 = _make_point(0.0, 0.0, 0.0)
99+
p2 = _make_point(1.0, 1.0, 1.0)
100+
assert decider(p1, p2) is True
101+
102+
103+
class TestSpeedLe:
104+
def test_slow_speed_true(self):
105+
decider = gps_filter.speed_le(1000)
106+
p1 = _make_point(0.0, 48.0, 11.0)
107+
p2 = _make_point(10.0, 48.001, 11.001)
108+
assert decider(p1, p2) is True
109+
110+
def test_fast_speed_false(self):
111+
decider = gps_filter.speed_le(0.001)
112+
p1 = _make_point(0.0, 0.0, 0.0)
113+
p2 = _make_point(1.0, 1.0, 1.0)
114+
assert decider(p1, p2) is False
115+
116+
117+
class TestBoth:
118+
def test_both_true(self):
119+
f = gps_filter.both(lambda a, b: True, lambda a, b: True)
120+
assert f(None, None) is True
121+
122+
def test_one_false(self):
123+
f = gps_filter.both(lambda a, b: True, lambda a, b: False)
124+
assert f(None, None) is False
125+
126+
127+
class TestFindMajority:
128+
def test_single_sequence(self):
129+
seq = [_make_point(0.0, 0.0, 0.0)]
130+
assert gps_filter.find_majority([seq]) == seq
131+
132+
def test_multiple_sequences(self):
133+
short = [_make_point(0.0, 0.0, 0.0)]
134+
long = [_make_point(float(i), 0.0, 0.0) for i in range(10)]
135+
assert gps_filter.find_majority([short, long]) == long
136+
137+
138+
class TestUpperWhiskerEdge:
139+
def test_raises_on_single_value(self):
140+
with pytest.raises(statistics.StatisticsError):
141+
gps_filter.upper_whisker([1])
142+
143+
def test_even_length(self):
144+
# [1, 2, 3, 4] -> q1=1.5, q3=3.5, irq=2, upper=3.5+3=6.5
145+
assert gps_filter.upper_whisker([1, 2, 3, 4]) == 6.5
146+
147+
148+
# --- Tests for gpmf_gps_filter module ---
149+
150+
151+
class TestRemoveNoisyPoints:
152+
def test_empty_sequence(self):
153+
result = remove_noisy_points([])
154+
assert list(result) == []
155+
156+
def test_all_good_points(self):
157+
points = [
158+
_make_gps_point(
159+
float(i), 48.0 + i * 0.0001, 11.0, fix=GPSFix.FIX_3D, precision=100
160+
)
161+
for i in range(10)
162+
]
163+
result = remove_noisy_points(points)
164+
assert len(result) == len(points)
165+
166+
def test_filters_bad_fix(self):
167+
points = [
168+
_make_gps_point(0.0, 48.0, 11.0, fix=GPSFix.FIX_3D),
169+
_make_gps_point(1.0, 48.001, 11.001, fix=GPSFix.NO_FIX),
170+
_make_gps_point(2.0, 48.002, 11.002, fix=GPSFix.FIX_3D),
171+
]
172+
result = remove_noisy_points(points)
173+
assert len(result) < len(points)
174+
175+
def test_filters_high_precision(self):
176+
points = [
177+
_make_gps_point(0.0, 48.0, 11.0, precision=100),
178+
_make_gps_point(1.0, 48.001, 11.001, precision=9999), # Very high DOP
179+
_make_gps_point(2.0, 48.002, 11.002, precision=100),
180+
]
181+
result = remove_noisy_points(points)
182+
assert len(result) < len(points)
183+
184+
def test_none_fix_kept(self):
185+
"""Points without GPS fix info should be kept."""
186+
points = [
187+
_make_gps_point(0.0, 48.0, 11.0, fix=None),
188+
_make_gps_point(1.0, 48.001, 11.001, fix=None),
189+
]
190+
result = remove_noisy_points(points)
191+
assert len(result) == 2
192+
193+
def test_none_precision_kept(self):
194+
"""Points without precision info should be kept."""
195+
points = [
196+
_make_gps_point(0.0, 48.0, 11.0, precision=None),
197+
_make_gps_point(1.0, 48.001, 11.001, precision=None),
198+
]
199+
result = remove_noisy_points(points)
200+
assert len(result) == 2
201+
202+
203+
class TestRemoveOutliers:
204+
def test_short_sequence_unchanged(self):
205+
points = [
206+
_make_gps_point(0.0, 48.0, 11.0),
207+
]
208+
result = remove_outliers(points)
209+
assert len(result) == 1
210+
211+
def test_no_ground_speed_returns_original(self):
212+
points = [
213+
_make_gps_point(0.0, 48.0, 11.0, ground_speed=None),
214+
_make_gps_point(1.0, 48.001, 11.001, ground_speed=None),
215+
_make_gps_point(2.0, 48.002, 11.002, ground_speed=None),
216+
]
217+
result = remove_outliers(points)
218+
assert len(result) == len(points)
219+
220+
def test_consistent_sequence_kept(self):
221+
points = [
222+
_make_gps_point(
223+
float(i), 48.0 + i * 0.0001, 11.0 + i * 0.0001, ground_speed=5.0
224+
)
225+
for i in range(10)
226+
]
227+
result = remove_outliers(points)
228+
assert len(result) == len(points)

0 commit comments

Comments
 (0)