From 68ba4d5c2d1ca4877de0f343afcb768b9aa615a2 Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 17:55:55 +0000 Subject: [PATCH 1/6] Add to .gitignore --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 9aa5b22..c005a0b 100644 --- a/.gitignore +++ b/.gitignore @@ -93,3 +93,9 @@ ENV/ # PyCharm .idea/ + +# VSCode +.vscode/ + +# mypy +.mypy_cache/ From cd610deca3cbd35c421e827bd80f6d73a79d885b Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 17:59:41 +0000 Subject: [PATCH 2/6] Make State a first class object --- setup.py | 3 +- singer/__init__.py | 12 +- singer/bookmarks.py | 142 +++++++++----- singer/catalog.py | 3 +- tests/test_bookmarks.py | 399 +++++++++++++++++++++++++++++++++++----- tests/test_catalog.py | 5 +- 6 files changed, 453 insertions(+), 111 deletions(-) diff --git a/setup.py b/setup.py index 8df52bb..37c2004 100755 --- a/setup.py +++ b/setup.py @@ -15,7 +15,8 @@ 'simplejson==3.11.1', 'python-dateutil>=2.6.0', 'backoff==1.8.0', - 'ciso8601', + 'ciso8601', + 'typing-extensions' ], extras_require={ 'dev': [ diff --git a/singer/__init__.py b/singer/__init__.py index d3b2c87..76d456a 100644 --- a/singer/__init__.py +++ b/singer/__init__.py @@ -61,17 +61,7 @@ ) from singer.schema import Schema -from singer.bookmarks import ( - write_bookmark, - get_bookmark, - clear_bookmark, - reset_stream, - set_offset, - clear_offset, - get_offset, - set_currently_syncing, - get_currently_syncing, -) +from singer.bookmarks import State if __name__ == "__main__": import doctest diff --git a/singer/bookmarks.py b/singer/bookmarks.py index fc6d7ca..2645c80 100644 --- a/singer/bookmarks.py +++ b/singer/bookmarks.py @@ -1,46 +1,96 @@ -def ensure_bookmark_path(state, path): - submap = state - for path_component in path: - if submap.get(path_component) is None: - submap[path_component] = {} - - submap = submap[path_component] - return state - -def write_bookmark(state, tap_stream_id, key, val): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id][key] = val - return state - -def clear_bookmark(state, tap_stream_id, key): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id].pop(key, None) - return state - -def reset_stream(state, tap_stream_id): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id]) - state['bookmarks'][tap_stream_id] = {} - return state - -def get_bookmark(state, tap_stream_id, key, default=None): - return state.get('bookmarks', {}).get(tap_stream_id, {}).get(key, default) - -def set_offset(state, tap_stream_id, offset_key, offset_value): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset", offset_key]) - state['bookmarks'][tap_stream_id]["offset"][offset_key] = offset_value - return state - -def clear_offset(state, tap_stream_id): - state = ensure_bookmark_path(state, ['bookmarks', tap_stream_id, "offset"]) - state['bookmarks'][tap_stream_id]["offset"] = {} - return state - -def get_offset(state, tap_stream_id, default=None): - return state.get('bookmarks', {}).get(tap_stream_id, {}).get("offset", default) - -def set_currently_syncing(state, tap_stream_id): - state['currently_syncing'] = tap_stream_id - return state - -def get_currently_syncing(state, default=None): - return state.get('currently_syncing', default) +import json +import sys +from typing import Any, Dict, Optional, Sequence, Union +from .logger import get_logger + + +LOGGER = get_logger() + +def write_state(state): + json.dump(state.to_dict(), sys.stdout, indent=2) + +class State: + def __init__( + self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None + ) -> None: + self._bookmarks = bookmarks or {} + self._currently_syncing = currently_syncing + + def __str__(self) -> str: + return str(self.__dict__) + + def __eq__(self, other: Any) -> bool: + return self.__dict__ == other.__dict__ + + @property + def bookmarks(self) -> Dict: + return self._bookmarks + + @classmethod + def load(cls, filename: str) -> "State": + with open(filename) as fp: # pylint: disable=invalid-name + return State.from_dict(json.load(fp)) + + @classmethod + def from_dict(cls, data: Dict) -> "State": + return State( + bookmarks=data.get("bookmarks"), + currently_syncing=data.get("currently_syncing"), + ) + + def to_dict(self) -> Dict: + state: Dict[str, Any] = {"bookmarks": self.bookmarks} + if self.get_currently_syncing(): + state["currently_syncing"] = self.get_currently_syncing() + return state + + def dump(self) -> None: + json.dump(self.to_dict(), sys.stdout, indent=2) + + def _ensure_bookmark_path(self, path: Sequence) -> None: + submap = self.bookmarks + for path_component in path: + if submap.get(path_component) is None: + submap[path_component] = {} + + submap = submap[path_component] + + def write_bookmark(self, tap_stream_id: str, key: str, val: Any) -> None: + self._ensure_bookmark_path((tap_stream_id,)) + self.bookmarks[tap_stream_id][key] = val + + def clear_bookmark(self, tap_stream_id: str, key: str) -> None: + self._ensure_bookmark_path((tap_stream_id,)) + self.bookmarks[tap_stream_id].pop(key, None) + + def reset_stream(self, tap_stream_id: str) -> None: + self._ensure_bookmark_path((tap_stream_id,)) + self.bookmarks[tap_stream_id] = {} + + def get_bookmark(self, tap_stream_id: str, key: str, default: Any = None) -> Any: + return self.bookmarks.get(tap_stream_id, {}).get(key, default) + + def set_offset( + self, tap_stream_id: str, offset_key: str, offset_value: Any + ) -> None: + self._ensure_bookmark_path((tap_stream_id, "offset", offset_key)) + self.bookmarks[tap_stream_id]["offset"][offset_key] = offset_value + + def clear_offset(self, tap_stream_id: str) -> None: + self._ensure_bookmark_path((tap_stream_id, "offset")) + self.bookmarks[tap_stream_id]["offset"] = {} + + def get_offset( + self, tap_stream_id: str, offset_key: str, default: Any = None + ) -> Any: + return ( + self.bookmarks.get(tap_stream_id, {}) + .get("offset", {}) + .get(offset_key, default) + ) + + def get_currently_syncing(self, default: Optional[str] = None) -> Optional[str]: + return self._currently_syncing or default + + def set_currently_syncing(self, value: Union[str, None]) -> None: + self._currently_syncing = value diff --git a/singer/catalog.py b/singer/catalog.py index 1767ff1..47afa0b 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -3,7 +3,6 @@ import sys from . import metadata as metadata_module -from .bookmarks import get_currently_syncing from .logger import get_logger from .schema import Schema @@ -132,7 +131,7 @@ def get_stream(self, tap_stream_id): return None def _shuffle_streams(self, state): - currently_syncing = get_currently_syncing(state) + currently_syncing = state.get_currently_syncing() if currently_syncing is None: return self.streams diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 4902105..4e691f4 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,52 +1,78 @@ +from copy import copy import unittest -from singer import bookmarks +from singer.bookmarks import State, write_state + + +class TestWriteState(unittest.TestCase): + def test_write_empty_state(self): + state = State() + write_state(state) + + def test_write_state_with_bookmarks(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_key = 'key' + offset_val = 'fizzy water' + + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : { + offset_key: offset_val + } + } + } + + state = State(bookmarks=bookmarks, currently_syncing='customers') + write_state(state) + class TestGetBookmark(unittest.TestCase): def test_empty_state(self): - empty_state = {} + empty_state = State() # Case with no value to fall back on - self.assertIsNone(bookmarks.get_bookmark(empty_state, 'some_stream', 'my_key')) + self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) # Case with a given default - self.assertEqual(bookmarks.get_bookmark(empty_state, 'some_stream', 'my_key', 'default_value'), + self.assertEqual(empty_state.get_bookmark('some_stream', 'my_key', 'default_value'), 'default_value') def test_empty_bookmark(self): - empty_bookmark = {'bookmarks':{}} + empty_bookmark = State(bookmarks={}) # Case with no value to fall back on - self.assertIsNone(bookmarks.get_bookmark(empty_bookmark, 'some_stream', 'my_key')) + self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) # Case with a given default - self.assertEqual(bookmarks.get_bookmark(empty_bookmark, 'some_stream', 'my_key', 'default_value'), + self.assertEqual(empty_bookmark.get_bookmark('some_stream', 'my_key', 'default_value'), 'default_value') def test_non_empty_state(self): stream_id_1 = 'customers' bookmark_key_1 = 'datetime' bookmark_val_1 = 123456789 - - non_empty_state = { - 'bookmarks' : { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1 - } + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1 } } + non_empty_state = State(bookmarks=bookmarks) + # # Cases with no value to fall back on # # Bad stream, bad key - self.assertIsNone(bookmarks.get_bookmark(non_empty_state, 'some_stream', 'my_key')) + self.assertIsNone(non_empty_state.get_bookmark('some_stream', 'my_key')) # Good stream, bad key - self.assertIsNone(bookmarks.get_bookmark(non_empty_state, stream_id_1, 'my_key')) + self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, 'my_key')) # Good stream, good key - self.assertEqual(bookmarks.get_bookmark(non_empty_state, stream_id_1, bookmark_key_1), + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) # @@ -54,67 +80,179 @@ def test_non_empty_state(self): # # Bad stream, bad key - self.assertEqual(bookmarks.get_bookmark(non_empty_state, 'some_stream', 'my_key', 'default_value'), + self.assertEqual(non_empty_state.get_bookmark('some_stream', 'my_key', 'default_value'), 'default_value') # Bad stream, good key - self.assertEqual(bookmarks.get_bookmark(non_empty_state, 'some_stream', bookmark_key_1, 'default_value'), + self.assertEqual(non_empty_state.get_bookmark('some_stream', bookmark_key_1, 'default_value'), 'default_value') # Good stream, bad key - self.assertEqual(bookmarks.get_bookmark(non_empty_state, stream_id_1, 'my_key', 'default_value'), + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, 'my_key', 'default_value'), 'default_value') # Good stream, good key - self.assertEqual(bookmarks.get_bookmark(non_empty_state, stream_id_1, bookmark_key_1, 'default_value'), + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1, 'default_value'), bookmark_val_1) +class TestWriteBookmark(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) + empty_state.write_bookmark('some_stream', 'my_key', 'val') + self.assertEqual(empty_state.get_bookmark('some_stream', 'my_key'), 'val') + + def test_empty_bookmark(self): + empty_bookmark = State(bookmarks={}) + self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) + empty_bookmark.write_bookmark('some_stream', 'my_key', 'val') + self.assertEqual(empty_bookmark.get_bookmark('some_stream', 'my_key'), 'val') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + bookmark_val_2 = 0 + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1 + } + } + + non_empty_state = State(bookmarks=bookmarks) + + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + non_empty_state.write_bookmark(stream_id_1, bookmark_key_1, bookmark_val_2) + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_2) + + +class TestClearBookmark(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + empty_state.clear_bookmark('some_stream', 'key') + self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) + + def test_empty_bookmark(self): + empty_bookmark = State(bookmarks={}) + empty_bookmark.clear_bookmark('some_stream', 'key') + self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1 + } + } + + # + # Cases with no value to fall back on + # + + # Bad stream, bad key + non_empty_state = State(bookmarks=bookmarks) + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + non_empty_state.clear_bookmark('some_stream', 'some_key') + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + + + # Good stream, bad key + non_empty_state = State(bookmarks=bookmarks) + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + non_empty_state.clear_bookmark(stream_id_1, 'some_key') + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + + # Good stream, good key + non_empty_state = State(bookmarks=bookmarks) + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + non_empty_state.clear_bookmark(stream_id_1, bookmark_key_1) + self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1)) + + +class TestClearStream(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + self.assertFalse('some_stream' in empty_state.bookmarks) + empty_state.reset_stream('some_stream') + self.assertTrue('some_stream' in empty_state.bookmarks) + self.assertIsNone(empty_state.bookmarks['some_stream'] or None) + + def test_empty_bookmark(self): + empty_bookmark = State(bookmarks={}) + self.assertFalse('some_stream' in empty_bookmark.bookmarks) + empty_bookmark.reset_stream('some_stream') + self.assertTrue('some_stream' in empty_bookmark.bookmarks) + self.assertIsNone(empty_bookmark.bookmarks['some_stream'] or None) + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1 + } + } + + non_empty_state = State(bookmarks=bookmarks) + + self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + non_empty_state.reset_stream(stream_id_1) + self.assertTrue(stream_id_1 in non_empty_state.bookmarks) + self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1)) + + class TestGetOffset(unittest.TestCase): def test_empty_state(self): - empty_state = {} + empty_state = State() # Case with no value to fall back on - self.assertIsNone(bookmarks.get_offset(empty_state, 'some_stream')) + self.assertIsNone(empty_state.get_offset('some_stream', 'offset_key')) # Case with a given default - self.assertEqual(bookmarks.get_offset(empty_state, 'some_stream', 'default_value'), + self.assertEqual(empty_state.get_offset('some_stream','offset_key', 'default_value'), 'default_value') def test_empty_bookmark(self): - empty_bookmark = {'bookmarks':{}} + empty_bookmark = State(bookmarks={}) # Case with no value to fall back on - self.assertIsNone(bookmarks.get_offset(empty_bookmark, 'some_stream')) + self.assertIsNone(empty_bookmark.get_offset('some_stream', 'offset_key')) # Case with a given default - self.assertEqual(bookmarks.get_offset(empty_bookmark, 'some_stream', 'default_value'), + self.assertEqual(empty_bookmark.get_offset('some_stream', 'offset_key', 'default_value'), 'default_value') def test_non_empty_state(self): stream_id_1 = 'customers' bookmark_key_1 = 'datetime' bookmark_val_1 = 123456789 + offset_key = 'key' offset_val = 'fizzy water' - non_empty_state = { - 'bookmarks' : { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : offset_val + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : { + offset_key: offset_val } } } + non_empty_state = State(bookmarks=bookmarks) + # # Cases with no value to fall back on # # Bad stream - self.assertIsNone(bookmarks.get_offset(non_empty_state, 'some_stream')) + self.assertIsNone(non_empty_state.get_offset('some_stream', offset_key)) # Good stream - self.assertEqual(bookmarks.get_offset(non_empty_state, stream_id_1), + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) # @@ -122,23 +260,106 @@ def test_non_empty_state(self): # # Bad stream - self.assertEqual(bookmarks.get_offset(non_empty_state, 'some_stream', 'default_value'), + self.assertEqual(non_empty_state.get_offset('some_stream', offset_key, 'default_value'), 'default_value') # Good stream - self.assertEqual(bookmarks.get_offset(non_empty_state, stream_id_1, 'default_value'), + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key, 'default_value'), offset_val) +class TestClearOffset(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + empty_state.clear_offset('some_stream') + self.assertIsNone(empty_state.get_offset('some_stream', 'offset_key')) + + def test_empty_bookmark(self): + empty_bookmark = State(bookmarks={}) + empty_bookmark.clear_offset('some_stream') + self.assertIsNone(empty_bookmark.get_offset('some_stream', 'offset_key')) + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_key = 'key' + offset_val = 'fizzy water' + + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : { + offset_key: offset_val + } + } + } + + non_empty_state = State(bookmarks=bookmarks) + + # Bad stream + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) + non_empty_state.clear_offset('some_stream') + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) + + # Good stream + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) + non_empty_state.clear_offset(stream_id_1) + self.assertIsNone(non_empty_state.get_offset(stream_id_1, offset_key)) + + +class TestSetOffset(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + empty_state.set_offset('some_stream', 'offset_key', 'offset_value') + self.assertEqual(empty_state.get_offset('some_stream', 'offset_key'), 'offset_value') + + def test_empty_bookmark(self): + empty_bookmark = State(bookmarks={}) + empty_bookmark.set_offset('some_stream', 'offset_key', 'offset_value') + self.assertEqual(empty_bookmark.get_offset('some_stream', 'offset_key'), 'offset_value') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_key_1 = 'offset_key_1' + offset_key_2 = 'offset_key_2' + offset_val_1 = 'fizzy water' + offset_val_2 = 'still water' + + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : { + 'offset_key_1': offset_val_1, + } + } + } + + + non_empty_state = State(bookmarks=bookmarks) + + # Test setting new key + non_empty_state.set_offset(stream_id_1, offset_key_2, offset_val_2) + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_1) + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2) + + # Test overwriting key + non_empty_state.set_offset(stream_id_1, offset_key_1, offset_val_2) + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_2) + self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2) + + class TestGetCurrentlySyncing(unittest.TestCase): def test_empty_state(self): - empty_state = {} + empty_state = State() # Case with no value to fall back on - self.assertIsNone(bookmarks.get_currently_syncing(empty_state)) + self.assertIsNone(empty_state.get_currently_syncing()) # Case with a given default - self.assertEqual(bookmarks.get_currently_syncing(empty_state, 'default_value'), + self.assertEqual(empty_state.get_currently_syncing('default_value'), 'default_value') def test_non_empty_state(self): @@ -147,20 +368,100 @@ def test_non_empty_state(self): bookmark_val_1 = 123456789 offset_val = 'fizzy water' - non_empty_state = { - 'bookmarks' : { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : offset_val - } - }, - 'currently_syncing' : stream_id_1 + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : offset_val + } } + non_empty_state = State(bookmarks=bookmarks, currently_syncing=stream_id_1) + # Case with no value to fall back on - self.assertEqual(bookmarks.get_currently_syncing(non_empty_state), + self.assertEqual(non_empty_state.get_currently_syncing(), stream_id_1) - # Case with a given default - self.assertEqual(bookmarks.get_currently_syncing(non_empty_state, 'default_value'), + # Case with no value to fall back on + self.assertEqual(non_empty_state.get_currently_syncing('default_value'), stream_id_1) + + +class TestSetCurrentlySyncing(unittest.TestCase): + def test_empty_state(self): + empty_state = State() + + self.assertIsNone(empty_state.get_currently_syncing()) + empty_state.set_currently_syncing('some_stream') + self.assertEqual(empty_state.get_currently_syncing(), 'some_stream') + + def test_non_empty_state(self): + stream_id_1 = 'customers' + bookmark_key_1 = 'datetime' + bookmark_val_1 = 123456789 + offset_key = 'key' + offset_val = 'fizzy water' + + bookmarks = { + stream_id_1 : { + bookmark_key_1 : bookmark_val_1, + 'offset' : { + offset_key: offset_val + } + } + } + + non_empty_state = State(bookmarks=bookmarks, currently_syncing=stream_id_1) + + self.assertEqual(non_empty_state.get_currently_syncing(), + stream_id_1) + + non_empty_state.set_currently_syncing('some_stream') + self.assertEqual(non_empty_state.get_currently_syncing(), 'some_stream') + + +class TestToDictAndFromDict(unittest.TestCase): + + bookmarks = { + 'stream_1': { + 'stream_1_key_1': 1, + 'stream_1_key_2': "2019-02-01T00:00:00Z" + }, + 'stream_2': { + 'stream_2_key_1': 2, + 'stream_2_key_2': "2019-03-01T00:00:00Z", + 'offset': { + 'offset_1': 1 + } + } + } + + dict_form = { + 'bookmarks': bookmarks, + 'currently_syncing': 'stream_1' + } + + obj_form = State(bookmarks=bookmarks, currently_syncing='stream_1') + + def test_from_dict(self): + dict_form = self.dict_form.copy() + obj_form = copy(self.obj_form) + + # With currently_syncing + self.assertEqual(obj_form, State.from_dict(dict_form)) + + # # Without currently_syncing + del dict_form['currently_syncing'] + obj_form.set_currently_syncing(None) + self.assertEqual(obj_form, State.from_dict(dict_form)) + + def test_to_dict(self): + dict_form = self.dict_form.copy() + obj_form = copy(self.obj_form) + + # With currently_syncing + self.assertEqual(self.dict_form, self.obj_form.to_dict()) + + # # Without currently_syncing + del dict_form['currently_syncing'] + obj_form.set_currently_syncing(None) + self.assertEqual(self.dict_form, self.obj_form.to_dict()) diff --git a/tests/test_catalog.py b/tests/test_catalog.py index cd6dc50..48746f3 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,5 +1,6 @@ import unittest +from singer.bookmarks import State from singer.schema import Schema from singer.catalog import Catalog, CatalogEntry, write_catalog @@ -23,7 +24,7 @@ def test_one_selected_stream(self): [selected_entry, CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]), CatalogEntry(tap_stream_id='c',schema=Schema(),metadata=[])]) - state = {} + state = State() selected_streams = catalog.get_selected_streams(state) self.assertEquals([e for e in selected_streams],[selected_entry]) @@ -42,7 +43,7 @@ def test_resumes_currently_syncing_stream(self): [selected_entry_a, CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]), selected_entry_c]) - state = {'currently_syncing': 'c'} + state = State(currently_syncing='c') selected_streams = catalog.get_selected_streams(state) self.assertEquals([e for e in selected_streams][0],selected_entry_c) From b450e4e02e4b86cbbe90e605751f110884474437 Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 18:15:12 +0000 Subject: [PATCH 3/6] Make Python 3.5 compatible --- singer/bookmarks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer/bookmarks.py b/singer/bookmarks.py index 2645c80..c3a632f 100644 --- a/singer/bookmarks.py +++ b/singer/bookmarks.py @@ -39,7 +39,7 @@ def from_dict(cls, data: Dict) -> "State": ) def to_dict(self) -> Dict: - state: Dict[str, Any] = {"bookmarks": self.bookmarks} + state = {"bookmarks": self.bookmarks} # type: Dict[str, Any] if self.get_currently_syncing(): state["currently_syncing"] = self.get_currently_syncing() return state From 73f70b2fe6af148dd5b56d3537d39cb6edb72d48 Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 18:21:07 +0000 Subject: [PATCH 4/6] Fix formatting --- singer/bookmarks.py | 8 +- tests/test_bookmarks.py | 397 +++++++++++++++++++++------------------- 2 files changed, 213 insertions(+), 192 deletions(-) diff --git a/singer/bookmarks.py b/singer/bookmarks.py index c3a632f..8746b43 100644 --- a/singer/bookmarks.py +++ b/singer/bookmarks.py @@ -6,12 +6,14 @@ LOGGER = get_logger() + def write_state(state): json.dump(state.to_dict(), sys.stdout, indent=2) + class State: def __init__( - self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None + self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None # pylint: disable=bad-continuation ) -> None: self._bookmarks = bookmarks or {} self._currently_syncing = currently_syncing @@ -71,7 +73,7 @@ def get_bookmark(self, tap_stream_id: str, key: str, default: Any = None) -> Any return self.bookmarks.get(tap_stream_id, {}).get(key, default) def set_offset( - self, tap_stream_id: str, offset_key: str, offset_value: Any + self, tap_stream_id: str, offset_key: str, offset_value: Any # pylint: disable=bad-continuation ) -> None: self._ensure_bookmark_path((tap_stream_id, "offset", offset_key)) self.bookmarks[tap_stream_id]["offset"][offset_key] = offset_value @@ -81,7 +83,7 @@ def clear_offset(self, tap_stream_id: str) -> None: self.bookmarks[tap_stream_id]["offset"] = {} def get_offset( - self, tap_stream_id: str, offset_key: str, default: Any = None + self, tap_stream_id: str, offset_key: str, default: Any = None # pylint: disable=bad-continuation ) -> Any: return ( self.bookmarks.get(tap_stream_id, {}) diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 4e691f4..24626b7 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -9,22 +9,20 @@ def test_write_empty_state(self): write_state(state) def test_write_state_with_bookmarks(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_key = 'key' - offset_val = 'fizzy water' + offset_key = "key" + offset_val = "fizzy water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : { - offset_key: offset_val - } + stream_id_1: { + bookmark_key_1: bookmark_val_1, + "offset": {offset_key: offset_val}, } } - state = State(bookmarks=bookmarks, currently_syncing='customers') + state = State(bookmarks=bookmarks, currently_syncing="customers") write_state(state) @@ -33,31 +31,31 @@ def test_empty_state(self): empty_state = State() # Case with no value to fall back on - self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) + self.assertIsNone(empty_state.get_bookmark("some_stream", "my_key")) # Case with a given default - self.assertEqual(empty_state.get_bookmark('some_stream', 'my_key', 'default_value'), - 'default_value') + self.assertEqual( + empty_state.get_bookmark("some_stream", "my_key", "default_value"), + "default_value", + ) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) # Case with no value to fall back on - self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) + self.assertIsNone(empty_bookmark.get_bookmark("some_stream", "my_key")) # Case with a given default - self.assertEqual(empty_bookmark.get_bookmark('some_stream', 'my_key', 'default_value'), - 'default_value') + self.assertEqual( + empty_bookmark.get_bookmark("some_stream", "my_key", "default_value"), + "default_value", + ) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1 - } - } + bookmarks = {stream_id_1: {bookmark_key_1: bookmark_val_1}} non_empty_state = State(bookmarks=bookmarks) @@ -66,87 +64,94 @@ def test_non_empty_state(self): # # Bad stream, bad key - self.assertIsNone(non_empty_state.get_bookmark('some_stream', 'my_key')) + self.assertIsNone(non_empty_state.get_bookmark("some_stream", "my_key")) # Good stream, bad key - self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, 'my_key')) + self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, "my_key")) # Good stream, good key - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), - bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) # # Cases with a given default # # Bad stream, bad key - self.assertEqual(non_empty_state.get_bookmark('some_stream', 'my_key', 'default_value'), - 'default_value') + self.assertEqual( + non_empty_state.get_bookmark("some_stream", "my_key", "default_value"), + "default_value", + ) # Bad stream, good key - self.assertEqual(non_empty_state.get_bookmark('some_stream', bookmark_key_1, 'default_value'), - 'default_value') + self.assertEqual( + non_empty_state.get_bookmark( + "some_stream", bookmark_key_1, "default_value" + ), + "default_value", + ) # Good stream, bad key - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, 'my_key', 'default_value'), - 'default_value') + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, "my_key", "default_value"), + "default_value", + ) # Good stream, good key - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1, 'default_value'), - bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1, "default_value"), + bookmark_val_1, + ) class TestWriteBookmark(unittest.TestCase): def test_empty_state(self): empty_state = State() - self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) - empty_state.write_bookmark('some_stream', 'my_key', 'val') - self.assertEqual(empty_state.get_bookmark('some_stream', 'my_key'), 'val') + self.assertIsNone(empty_state.get_bookmark("some_stream", "my_key")) + empty_state.write_bookmark("some_stream", "my_key", "val") + self.assertEqual(empty_state.get_bookmark("some_stream", "my_key"), "val") def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) - self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) - empty_bookmark.write_bookmark('some_stream', 'my_key', 'val') - self.assertEqual(empty_bookmark.get_bookmark('some_stream', 'my_key'), 'val') + self.assertIsNone(empty_bookmark.get_bookmark("some_stream", "my_key")) + empty_bookmark.write_bookmark("some_stream", "my_key", "val") + self.assertEqual(empty_bookmark.get_bookmark("some_stream", "my_key"), "val") def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 bookmark_val_2 = 0 - bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1 - } - } + bookmarks = {stream_id_1: {bookmark_key_1: bookmark_val_1}} non_empty_state = State(bookmarks=bookmarks) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) non_empty_state.write_bookmark(stream_id_1, bookmark_key_1, bookmark_val_2) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_2) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_2 + ) class TestClearBookmark(unittest.TestCase): def test_empty_state(self): empty_state = State() - empty_state.clear_bookmark('some_stream', 'key') - self.assertIsNone(empty_state.get_bookmark('some_stream', 'my_key')) + empty_state.clear_bookmark("some_stream", "key") + self.assertIsNone(empty_state.get_bookmark("some_stream", "my_key")) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) - empty_bookmark.clear_bookmark('some_stream', 'key') - self.assertIsNone(empty_bookmark.get_bookmark('some_stream', 'my_key')) + empty_bookmark.clear_bookmark("some_stream", "key") + self.assertIsNone(empty_bookmark.get_bookmark("some_stream", "my_key")) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1 - } - } + bookmarks = {stream_id_1: {bookmark_key_1: bookmark_val_1}} # # Cases with no value to fall back on @@ -154,20 +159,29 @@ def test_non_empty_state(self): # Bad stream, bad key non_empty_state = State(bookmarks=bookmarks) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) - non_empty_state.clear_bookmark('some_stream', 'some_key') - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) - + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) + non_empty_state.clear_bookmark("some_stream", "some_key") + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) # Good stream, bad key non_empty_state = State(bookmarks=bookmarks) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) - non_empty_state.clear_bookmark(stream_id_1, 'some_key') - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) + non_empty_state.clear_bookmark(stream_id_1, "some_key") + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) # Good stream, good key non_empty_state = State(bookmarks=bookmarks) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) non_empty_state.clear_bookmark(stream_id_1, bookmark_key_1) self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1)) @@ -175,31 +189,29 @@ def test_non_empty_state(self): class TestClearStream(unittest.TestCase): def test_empty_state(self): empty_state = State() - self.assertFalse('some_stream' in empty_state.bookmarks) - empty_state.reset_stream('some_stream') - self.assertTrue('some_stream' in empty_state.bookmarks) - self.assertIsNone(empty_state.bookmarks['some_stream'] or None) + self.assertFalse("some_stream" in empty_state.bookmarks) + empty_state.reset_stream("some_stream") + self.assertTrue("some_stream" in empty_state.bookmarks) + self.assertIsNone(empty_state.bookmarks["some_stream"] or None) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) - self.assertFalse('some_stream' in empty_bookmark.bookmarks) - empty_bookmark.reset_stream('some_stream') - self.assertTrue('some_stream' in empty_bookmark.bookmarks) - self.assertIsNone(empty_bookmark.bookmarks['some_stream'] or None) + self.assertFalse("some_stream" in empty_bookmark.bookmarks) + empty_bookmark.reset_stream("some_stream") + self.assertTrue("some_stream" in empty_bookmark.bookmarks) + self.assertIsNone(empty_bookmark.bookmarks["some_stream"] or None) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1 - } - } + bookmarks = {stream_id_1: {bookmark_key_1: bookmark_val_1}} non_empty_state = State(bookmarks=bookmarks) - self.assertEqual(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1) + self.assertEqual( + non_empty_state.get_bookmark(stream_id_1, bookmark_key_1), bookmark_val_1 + ) non_empty_state.reset_stream(stream_id_1) self.assertTrue(stream_id_1 in non_empty_state.bookmarks) self.assertIsNone(non_empty_state.get_bookmark(stream_id_1, bookmark_key_1)) @@ -210,35 +222,37 @@ def test_empty_state(self): empty_state = State() # Case with no value to fall back on - self.assertIsNone(empty_state.get_offset('some_stream', 'offset_key')) + self.assertIsNone(empty_state.get_offset("some_stream", "offset_key")) # Case with a given default - self.assertEqual(empty_state.get_offset('some_stream','offset_key', 'default_value'), - 'default_value') + self.assertEqual( + empty_state.get_offset("some_stream", "offset_key", "default_value"), + "default_value", + ) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) # Case with no value to fall back on - self.assertIsNone(empty_bookmark.get_offset('some_stream', 'offset_key')) + self.assertIsNone(empty_bookmark.get_offset("some_stream", "offset_key")) # Case with a given default - self.assertEqual(empty_bookmark.get_offset('some_stream', 'offset_key', 'default_value'), - 'default_value') + self.assertEqual( + empty_bookmark.get_offset("some_stream", "offset_key", "default_value"), + "default_value", + ) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_key = 'key' - offset_val = 'fizzy water' + offset_key = "key" + offset_val = "fizzy water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : { - offset_key: offset_val - } + stream_id_1: { + bookmark_key_1: bookmark_val_1, + "offset": {offset_key: offset_val}, } } @@ -249,61 +263,70 @@ def test_non_empty_state(self): # # Bad stream - self.assertIsNone(non_empty_state.get_offset('some_stream', offset_key)) + self.assertIsNone(non_empty_state.get_offset("some_stream", offset_key)) # Good stream - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), - offset_val) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key), offset_val + ) # # Case with a given default # # Bad stream - self.assertEqual(non_empty_state.get_offset('some_stream', offset_key, 'default_value'), - 'default_value') + self.assertEqual( + non_empty_state.get_offset("some_stream", offset_key, "default_value"), + "default_value", + ) # Good stream - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key, 'default_value'), - offset_val) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key, "default_value"), + offset_val, + ) class TestClearOffset(unittest.TestCase): def test_empty_state(self): empty_state = State() - empty_state.clear_offset('some_stream') - self.assertIsNone(empty_state.get_offset('some_stream', 'offset_key')) + empty_state.clear_offset("some_stream") + self.assertIsNone(empty_state.get_offset("some_stream", "offset_key")) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) - empty_bookmark.clear_offset('some_stream') - self.assertIsNone(empty_bookmark.get_offset('some_stream', 'offset_key')) + empty_bookmark.clear_offset("some_stream") + self.assertIsNone(empty_bookmark.get_offset("some_stream", "offset_key")) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_key = 'key' - offset_val = 'fizzy water' + offset_key = "key" + offset_val = "fizzy water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : { - offset_key: offset_val - } + stream_id_1: { + bookmark_key_1: bookmark_val_1, + "offset": {offset_key: offset_val}, } } non_empty_state = State(bookmarks=bookmarks) # Bad stream - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) - non_empty_state.clear_offset('some_stream') - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key), offset_val + ) + non_empty_state.clear_offset("some_stream") + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key), offset_val + ) # Good stream - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key), offset_val) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key), offset_val + ) non_empty_state.clear_offset(stream_id_1) self.assertIsNone(non_empty_state.get_offset(stream_id_1, offset_key)) @@ -311,44 +334,53 @@ def test_non_empty_state(self): class TestSetOffset(unittest.TestCase): def test_empty_state(self): empty_state = State() - empty_state.set_offset('some_stream', 'offset_key', 'offset_value') - self.assertEqual(empty_state.get_offset('some_stream', 'offset_key'), 'offset_value') + empty_state.set_offset("some_stream", "offset_key", "offset_value") + self.assertEqual( + empty_state.get_offset("some_stream", "offset_key"), "offset_value" + ) def test_empty_bookmark(self): empty_bookmark = State(bookmarks={}) - empty_bookmark.set_offset('some_stream', 'offset_key', 'offset_value') - self.assertEqual(empty_bookmark.get_offset('some_stream', 'offset_key'), 'offset_value') + empty_bookmark.set_offset("some_stream", "offset_key", "offset_value") + self.assertEqual( + empty_bookmark.get_offset("some_stream", "offset_key"), "offset_value" + ) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_key_1 = 'offset_key_1' - offset_key_2 = 'offset_key_2' - offset_val_1 = 'fizzy water' - offset_val_2 = 'still water' + offset_key_1 = "offset_key_1" + offset_key_2 = "offset_key_2" + offset_val_1 = "fizzy water" + offset_val_2 = "still water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : { - 'offset_key_1': offset_val_1, - } + stream_id_1: { + bookmark_key_1: bookmark_val_1, + "offset": {"offset_key_1": offset_val_1,}, } } - non_empty_state = State(bookmarks=bookmarks) # Test setting new key non_empty_state.set_offset(stream_id_1, offset_key_2, offset_val_2) - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_1) - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_1 + ) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2 + ) # Test overwriting key non_empty_state.set_offset(stream_id_1, offset_key_1, offset_val_2) - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_2) - self.assertEqual(non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key_1), offset_val_2 + ) + self.assertEqual( + non_empty_state.get_offset(stream_id_1, offset_key_2), offset_val_2 + ) class TestGetCurrentlySyncing(unittest.TestCase): @@ -359,31 +391,29 @@ def test_empty_state(self): self.assertIsNone(empty_state.get_currently_syncing()) # Case with a given default - self.assertEqual(empty_state.get_currently_syncing('default_value'), - 'default_value') + self.assertEqual( + empty_state.get_currently_syncing("default_value"), "default_value" + ) def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_val = 'fizzy water' + offset_val = "fizzy water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : offset_val - } + stream_id_1: {bookmark_key_1: bookmark_val_1, "offset": offset_val} } non_empty_state = State(bookmarks=bookmarks, currently_syncing=stream_id_1) # Case with no value to fall back on - self.assertEqual(non_empty_state.get_currently_syncing(), - stream_id_1) + self.assertEqual(non_empty_state.get_currently_syncing(), stream_id_1) # Case with no value to fall back on - self.assertEqual(non_empty_state.get_currently_syncing('default_value'), - stream_id_1) + self.assertEqual( + non_empty_state.get_currently_syncing("default_value"), stream_id_1 + ) class TestSetCurrentlySyncing(unittest.TestCase): @@ -391,56 +421,45 @@ def test_empty_state(self): empty_state = State() self.assertIsNone(empty_state.get_currently_syncing()) - empty_state.set_currently_syncing('some_stream') - self.assertEqual(empty_state.get_currently_syncing(), 'some_stream') + empty_state.set_currently_syncing("some_stream") + self.assertEqual(empty_state.get_currently_syncing(), "some_stream") def test_non_empty_state(self): - stream_id_1 = 'customers' - bookmark_key_1 = 'datetime' + stream_id_1 = "customers" + bookmark_key_1 = "datetime" bookmark_val_1 = 123456789 - offset_key = 'key' - offset_val = 'fizzy water' + offset_key = "key" + offset_val = "fizzy water" bookmarks = { - stream_id_1 : { - bookmark_key_1 : bookmark_val_1, - 'offset' : { - offset_key: offset_val - } + stream_id_1: { + bookmark_key_1: bookmark_val_1, + "offset": {offset_key: offset_val}, } } non_empty_state = State(bookmarks=bookmarks, currently_syncing=stream_id_1) - self.assertEqual(non_empty_state.get_currently_syncing(), - stream_id_1) + self.assertEqual(non_empty_state.get_currently_syncing(), stream_id_1) - non_empty_state.set_currently_syncing('some_stream') - self.assertEqual(non_empty_state.get_currently_syncing(), 'some_stream') + non_empty_state.set_currently_syncing("some_stream") + self.assertEqual(non_empty_state.get_currently_syncing(), "some_stream") class TestToDictAndFromDict(unittest.TestCase): bookmarks = { - 'stream_1': { - 'stream_1_key_1': 1, - 'stream_1_key_2': "2019-02-01T00:00:00Z" + "stream_1": {"stream_1_key_1": 1, "stream_1_key_2": "2019-02-01T00:00:00Z"}, + "stream_2": { + "stream_2_key_1": 2, + "stream_2_key_2": "2019-03-01T00:00:00Z", + "offset": {"offset_1": 1}, }, - 'stream_2': { - 'stream_2_key_1': 2, - 'stream_2_key_2': "2019-03-01T00:00:00Z", - 'offset': { - 'offset_1': 1 - } - } } - dict_form = { - 'bookmarks': bookmarks, - 'currently_syncing': 'stream_1' - } + dict_form = {"bookmarks": bookmarks, "currently_syncing": "stream_1"} - obj_form = State(bookmarks=bookmarks, currently_syncing='stream_1') + obj_form = State(bookmarks=bookmarks, currently_syncing="stream_1") def test_from_dict(self): dict_form = self.dict_form.copy() @@ -450,7 +469,7 @@ def test_from_dict(self): self.assertEqual(obj_form, State.from_dict(dict_form)) # # Without currently_syncing - del dict_form['currently_syncing'] + del dict_form["currently_syncing"] obj_form.set_currently_syncing(None) self.assertEqual(obj_form, State.from_dict(dict_form)) @@ -462,6 +481,6 @@ def test_to_dict(self): self.assertEqual(self.dict_form, self.obj_form.to_dict()) # # Without currently_syncing - del dict_form['currently_syncing'] + del dict_form["currently_syncing"] obj_form.set_currently_syncing(None) self.assertEqual(self.dict_form, self.obj_form.to_dict()) From 6a5a659c1ce14fafee2de3094bf17d935ba1d9ec Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 19:51:35 +0000 Subject: [PATCH 5/6] Remove more unnecessary methods --- singer/bookmarks.py | 14 -------------- tests/test_bookmarks.py | 25 +------------------------ 2 files changed, 1 insertion(+), 38 deletions(-) diff --git a/singer/bookmarks.py b/singer/bookmarks.py index 8746b43..28ac502 100644 --- a/singer/bookmarks.py +++ b/singer/bookmarks.py @@ -1,5 +1,3 @@ -import json -import sys from typing import Any, Dict, Optional, Sequence, Union from .logger import get_logger @@ -7,10 +5,6 @@ LOGGER = get_logger() -def write_state(state): - json.dump(state.to_dict(), sys.stdout, indent=2) - - class State: def __init__( self, bookmarks: Optional[Dict] = None, currently_syncing: Optional[str] = None # pylint: disable=bad-continuation @@ -28,11 +22,6 @@ def __eq__(self, other: Any) -> bool: def bookmarks(self) -> Dict: return self._bookmarks - @classmethod - def load(cls, filename: str) -> "State": - with open(filename) as fp: # pylint: disable=invalid-name - return State.from_dict(json.load(fp)) - @classmethod def from_dict(cls, data: Dict) -> "State": return State( @@ -46,9 +35,6 @@ def to_dict(self) -> Dict: state["currently_syncing"] = self.get_currently_syncing() return state - def dump(self) -> None: - json.dump(self.to_dict(), sys.stdout, indent=2) - def _ensure_bookmark_path(self, path: Sequence) -> None: submap = self.bookmarks for path_component in path: diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 24626b7..59846f5 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,29 +1,6 @@ from copy import copy import unittest -from singer.bookmarks import State, write_state - - -class TestWriteState(unittest.TestCase): - def test_write_empty_state(self): - state = State() - write_state(state) - - def test_write_state_with_bookmarks(self): - stream_id_1 = "customers" - bookmark_key_1 = "datetime" - bookmark_val_1 = 123456789 - offset_key = "key" - offset_val = "fizzy water" - - bookmarks = { - stream_id_1: { - bookmark_key_1: bookmark_val_1, - "offset": {offset_key: offset_val}, - } - } - - state = State(bookmarks=bookmarks, currently_syncing="customers") - write_state(state) +from singer.bookmarks import State class TestGetBookmark(unittest.TestCase): From fc672f3f41998dc01ed2897480cb3a4dde5c4309 Mon Sep 17 00:00:00 2001 From: Judah Rand Date: Sun, 19 Jan 2020 21:53:25 +0000 Subject: [PATCH 6/6] Parse args.state into Object --- singer/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/singer/utils.py b/singer/utils.py index 85f3d39..61b150c 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -10,6 +10,7 @@ import pytz import backoff as backoff_module +from singer.bookmarks import State from singer.catalog import Catalog DATETIME_PARSE = "%Y-%m-%dT%H:%M:%SZ" @@ -169,9 +170,9 @@ def parse_args(required_config_keys): args.config = load_json(args.config) if args.state: setattr(args, 'state_path', args.state) - args.state = load_json(args.state) + args.state = State.from_dict(load_json(args.state)) else: - args.state = {} + args.state = State() if args.properties: setattr(args, 'properties_path', args.properties) args.properties = load_json(args.properties)