Skip to content

Commit e6727b0

Browse files
committed
fix(lake): prefix should be underscore
1 parent b54d421 commit e6727b0

13 files changed

Lines changed: 692 additions & 8 deletions

File tree

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ jobs:
5858
BUS_URL: amqp://guest:guest@${{ github.server_url != 'https://github.com' && 'rabbitmq' || 'localhost' }}
5959
run: |
6060
pip install coverage
61-
coverage run -m unittest tests/**/*.py
61+
coverage run -m unittest tests/*.py
6262
coverage report -m --fail-under=60

servc/svc/com/storage/lake.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def _get_table_name(self) -> str:
4545
name_w_medallion = self._table
4646
else:
4747
name_w_medallion = "".join(
48-
[self._table["medallion"].value, "-", self._table["name"]]
48+
[self._table["medallion"].value, "_", self._table["name"]]
4949
)
5050

5151
return ".".join([schema, name_w_medallion])

servc/svc/com/worker/hooks/parallelize.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def evaluate_part_pre_hook(
5757

5858
jobs = resolvers[part_method](message["id"], artifact, context)
5959
if not isinstance(jobs, list):
60-
print(f"Resolver {part_method} did not return a list")
61-
return True
60+
raise Exception(f"Resolver {part_method} did not return a list")
6261

6362
# formulate on complete hook
6463
complete_hook: List[OnCompleteHook] = []
@@ -84,8 +83,9 @@ def evaluate_part_pre_hook(
8483
complete_hook.append(newHook)
8584

8685
# create task queue
87-
task_queue = f"part.{route}-{method}-{message['id']}"
88-
bus.create_queue(task_queue, False)
86+
if len(jobs):
87+
task_queue = f"part.{route}-{method}-{message['id']}"
88+
bus.create_queue(task_queue, False)
8989

9090
# publish messages to part queue
9191
payload_template: InputPayload = {

tests/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import json
2+
3+
4+
def get_route_message(channel, cache, route, deleteRoute=False):
5+
queue = channel.queue_declare(
6+
queue=route,
7+
passive=True,
8+
durable=True,
9+
exclusive=False,
10+
auto_delete=False,
11+
)
12+
count = queue.method.message_count
13+
body = None
14+
15+
if count:
16+
_m, _h, body = channel.basic_get(route)
17+
if deleteRoute:
18+
channel.queue_delete(queue=route)
19+
if body:
20+
body = json.loads(body.decode("utf-8"))
21+
if "argumentId" in body:
22+
body["argument"] = cache.getKey(body["argumentId"])
23+
return body, count

tests/lake/test_iceberg.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_connect(self):
4848
self.assertTrue(self.iceberg.isOpen)
4949

5050
def test_name(self):
51-
self.assertEqual(self.iceberg.tablename, "default.bronze-test")
51+
self.assertEqual(self.iceberg.tablename, "default.bronze_test")
5252

5353
def test_insert(self):
5454
self.iceberg.overwrite([])
@@ -114,7 +114,7 @@ def test_load_from_catalog(self):
114114
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
115115
orig_data = self.iceberg.read(["date"]).to_pylist()
116116

117-
iceberg = IceBerg(config, "default.bronze-test")
117+
iceberg = IceBerg(config, "default.bronze_test")
118118
iceberg._connect()
119119
self.assertTrue(iceberg.isOpen)
120120

tests/test_complete.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import unittest
2+
3+
import pika
4+
5+
from servc.svc.com.bus.rabbitmq import BusRabbitMQ
6+
from servc.svc.com.cache.redis import CacheRedis
7+
from servc.svc.com.worker.hooks.oncomplete import process_complete_hook
8+
from servc.svc.config import Config
9+
from servc.svc.io.hooks import CompleteHookType
10+
from servc.svc.io.input import ArgumentArtifact, InputPayload, InputType
11+
from tests import get_route_message
12+
13+
message: InputPayload = {
14+
"id": "123",
15+
"type": InputType.INPUT.value,
16+
"route": "test",
17+
"argumentId": "",
18+
}
19+
art: ArgumentArtifact = {
20+
"method": "test",
21+
"inputs": {"id": "123"},
22+
"hooks": {
23+
"on_complete": [
24+
{
25+
"type": CompleteHookType.SENDMESSAGE,
26+
"method": "test",
27+
"route": "random",
28+
}
29+
]
30+
},
31+
}
32+
33+
34+
class TestCompleteHook(unittest.TestCase):
35+
@classmethod
36+
def setUpClass(cls) -> None:
37+
config = Config()
38+
cls.bus = BusRabbitMQ(config.get("conf.bus"))
39+
cls.cache = CacheRedis(config.get("conf.cache"))
40+
41+
params = pika.URLParameters(config.get("conf.bus.url"))
42+
cls.conn = pika.BlockingConnection(params)
43+
cls.channel = cls.conn.channel()
44+
45+
@classmethod
46+
def tearDownClass(cls) -> None:
47+
cls.bus.delete_queue("random")
48+
cls.cache.close()
49+
cls.bus.close()
50+
cls.channel.close()
51+
cls.conn.close()
52+
53+
def setUp(self):
54+
self.bus.create_queue("random", False)
55+
56+
def tearDown(self):
57+
self.bus.delete_queue("random")
58+
59+
def test_complete_hook_simple(self):
60+
res = process_complete_hook(
61+
self.bus, self.cache, message, art, art["hooks"]["on_complete"][0]
62+
)
63+
body, _ = get_route_message(self.channel, self.cache, "random")
64+
65+
self.assertTrue(body["argument"]["inputs"]["inputs"], art["inputs"])
66+
self.assertTrue(res)
67+
68+
def test_w_hook_override(self):
69+
hook = {
70+
"type": CompleteHookType.SENDMESSAGE,
71+
"method": "test",
72+
"route": "random",
73+
"inputs": True,
74+
}
75+
res = process_complete_hook(self.bus, self.cache, message, art, hook)
76+
body, _ = get_route_message(self.channel, self.cache, "random")
77+
78+
self.assertTrue(body["argument"]["inputs"], True)
79+
self.assertTrue(res)
80+
81+
82+
if __name__ == "__main__":
83+
unittest.main()

tests/test_config.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import unittest
2+
3+
from servc.svc.config import Config
4+
5+
6+
class TestConfig(unittest.TestCase):
7+
@classmethod
8+
def setUpClass(cls) -> None:
9+
cls.config = Config()
10+
11+
def test_get_defaults(self):
12+
self.assertEqual(self.config.get("conf.file"), "/config/config.yaml")
13+
self.assertEqual(self.config.get("conf.bus.routemap"), {})
14+
self.assertEqual(self.config.get("conf.bus.prefix"), "")
15+
16+
def test_value(self):
17+
self.config.setValue("conf.bus.prefix", "test")
18+
self.assertEqual(self.config.get("conf.bus.prefix"), "test")
19+
20+
self.config.setValue("conf.bus.routemap.api", "test_route")
21+
self.assertEqual(self.config.get("conf.bus.routemap.api"), "test_route")
22+
23+
self.config.setValue("conf.bus.routemap_DOT_api", "test_route")
24+
self.assertEqual(self.config.get("conf.bus.routemap_DOT_api"), "test_route")
25+
self.assertIn("routemap.api", self.config.get("conf.bus"))
26+
27+
def test_wrong_location(self):
28+
try:
29+
Config("config.test.yaml")
30+
except FileNotFoundError:
31+
return self.assertTrue(True)
32+
self.assertTrue(False)
33+
34+
35+
if __name__ == "__main__":
36+
unittest.main()

tests/test_iceberg.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import unittest
2+
3+
import pyarrow as pa
4+
import pyiceberg.types as types
5+
from pyiceberg.expressions import EqualTo
6+
from pyiceberg.schema import Schema
7+
8+
from servc.svc.com.storage.iceberg import IceBerg
9+
from servc.svc.com.storage.lake import LakeTable, Medallion
10+
11+
schema = pa.schema(
12+
[
13+
("date", pa.string()),
14+
("some_int", pa.int64()),
15+
]
16+
)
17+
18+
mytable: LakeTable = {
19+
"name": "test",
20+
"partitions": ["date"],
21+
"medallion": Medallion.BRONZE,
22+
"schema": Schema(
23+
types.NestedField(field_id=1, name="date", type=types.StringType()),
24+
types.NestedField(field_id=2, name="some_int", type=types.IntegerType()),
25+
),
26+
}
27+
28+
config = {
29+
"database": "default",
30+
"catalog_name": "default",
31+
"catalog_properties": {
32+
"type": "sql",
33+
"uri": "sqlite:////tmp/pyiceberg.db",
34+
"echo": "true",
35+
"init_catalog_tables": "true",
36+
"warehouse": "file:///tmp/warehouse",
37+
},
38+
}
39+
40+
41+
class TestLakeIceberg(unittest.TestCase):
42+
@classmethod
43+
def setUpClass(cls) -> None:
44+
cls.iceberg = IceBerg(config, mytable)
45+
46+
def test_connect(self):
47+
self.iceberg._connect()
48+
self.assertTrue(self.iceberg.isOpen)
49+
50+
def test_name(self):
51+
self.assertEqual(self.iceberg.tablename, "default.bronze_test")
52+
53+
def test_insert(self):
54+
self.iceberg.overwrite([])
55+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
56+
data = self.iceberg.read(["date"]).to_pylist()
57+
self.assertEqual(len(data), 1)
58+
self.assertEqual(data, [{"date": "2021-01-01"}])
59+
60+
def test_overwrite(self):
61+
self.iceberg.overwrite([])
62+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
63+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 1}])
64+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 3}])
65+
66+
data = self.iceberg.read(["date"]).to_pylist()
67+
self.assertEqual(len(data), 3)
68+
69+
self.iceberg.overwrite([], {"date": ["2021-01-02"]})
70+
data = self.iceberg.read(["date"]).to_pylist()
71+
self.assertEqual(len(data), 1)
72+
73+
def test_reading_partitions(self):
74+
self.iceberg.overwrite([])
75+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
76+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 1}])
77+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 3}])
78+
79+
data = self.iceberg.read(
80+
["date"], partitions={"date": ["2021-01-01"]}
81+
).to_pylist()
82+
self.assertEqual(len(data), 1)
83+
84+
data = self.iceberg.read(
85+
["date"], partitions={"date": ["2021-01-02"]}
86+
).to_pylist()
87+
self.assertEqual(len(data), 2)
88+
89+
data = self.iceberg.read(
90+
["date"], partitions={"date": ["2021-01-02", "2021-01-01"]}
91+
).to_pylist()
92+
self.assertEqual(len(data), 3)
93+
94+
data = self.iceberg.read(
95+
["date"],
96+
options={"limit": 1},
97+
).to_pylist()
98+
self.assertEqual(len(data), 1)
99+
100+
data = self.iceberg.read(
101+
["date"],
102+
partitions={"date": ["2021-01-02"]},
103+
options={"row_filter": EqualTo("some_int", 3)},
104+
).to_pylist()
105+
self.assertEqual(len(data), 1)
106+
107+
data = self.iceberg.read(
108+
["date"],
109+
partitions={"date": ["2021-01-02"], "some_int": [3]},
110+
).to_pylist()
111+
self.assertEqual(len(data), 1)
112+
113+
def test_load_from_catalog(self):
114+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
115+
orig_data = self.iceberg.read(["date"]).to_pylist()
116+
117+
iceberg = IceBerg(config, "default.bronze_test")
118+
iceberg._connect()
119+
self.assertTrue(iceberg.isOpen)
120+
121+
data = iceberg.read(["date"]).to_pylist()
122+
self.assertEqual(orig_data, data)
123+
self.assertGreater(len(data), 0)
124+
125+
def test_version_travel(self):
126+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
127+
orig_data = self.iceberg.read(["date"]).to_pylist()
128+
currentVersion = self.iceberg.getCurrentVersion()
129+
130+
versions = self.iceberg.getVersions()
131+
self.assertGreater(len(versions), 0)
132+
self.assertIn(currentVersion, versions)
133+
134+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 1}])
135+
new_version = self.iceberg.getCurrentVersion()
136+
self.assertNotEqual(currentVersion, new_version)
137+
138+
data = self.iceberg.read(["date"], version=currentVersion).to_pylist()
139+
self.assertEqual(len(data), len(orig_data))
140+
self.assertEqual(data, orig_data)
141+
142+
def test_partitions(self):
143+
self.iceberg.overwrite([])
144+
self.iceberg.insert([{"date": "2021-01-01", "some_int": 1}])
145+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 1}])
146+
self.iceberg.insert([{"date": "2021-01-02", "some_int": 3}])
147+
148+
partitions = self.iceberg.getPartitions()
149+
self.assertEqual(list(partitions.keys()), ["date"])
150+
self.assertEqual(len(partitions["date"]), 2)
151+
self.assertIn("2021-01-01", partitions["date"])
152+
self.assertIn("2021-01-02", partitions["date"])
153+
154+
def test_schema(self):
155+
schema = self.iceberg.getSchema()
156+
self.assertIsInstance(schema, pa.Schema)
157+
self.assertEqual(len(schema.names), 2)
158+
self.assertEqual(schema.names, ["date", "some_int"])
159+
160+
def test_close(self):
161+
self.iceberg.close()
162+
self.iceberg.connect()
163+
self.iceberg.close()
164+
165+
166+
if __name__ == "__main__":
167+
unittest.main()

tests/test_idgen.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import unittest
2+
3+
from servc.svc.idgen.simple import simple
4+
5+
6+
class TestIDGen(unittest.TestCase):
7+
def test_simple_idgen(self):
8+
route = "/test"
9+
message = {"test": "test"}
10+
self.assertIsInstance(simple(route, [], message), str)
11+
12+
13+
if __name__ == "__main__":
14+
unittest.main()

0 commit comments

Comments
 (0)