From 23b316cf874cca19ee2bf3e96c58a2ea2d0c4f5e Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 09:50:44 -0500 Subject: [PATCH 01/26] Adding time_extracted and bookmark_properties --- singer/catalog.py | 6 +++++- singer/messages.py | 27 +++++++++++++++++++-------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/singer/catalog.py b/singer/catalog.py index 9cbbe52..4572dca 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -11,7 +11,7 @@ class CatalogEntry(object): def __init__(self, tap_stream_id=None, stream=None, key_properties=None, schema=None, replication_key=None, is_view=None, database=None, table=None, row_count=None, - stream_alias=None, metadata=None): + stream_alias=None, metadata=None, bookmark_properties=None): self.tap_stream_id = tap_stream_id self.stream = stream @@ -24,6 +24,7 @@ def __init__(self, tap_stream_id=None, stream=None, self.row_count = row_count self.stream_alias = stream_alias self.metadata = metadata + self.bookmark_properties = bookmark_properties def __str__(self): return str(self.__dict__) @@ -57,6 +58,8 @@ def to_dict(self): result['row_count'] = self.row_count if self.metadata is not None: result['metadata'] = self.metadata + if self.bookmark_properties is not None: + result['bookmark_properties'] = self.bookmark_properties return result @@ -95,6 +98,7 @@ def from_dict(cls, data): entry.schema = Schema.from_dict(stream.get('schema')) entry.is_view = stream.get('is_view') entry.stream_alias = stream.get('stream_alias') + entry.bookmark_properties = stream.get('bookmark_properties') streams.append(entry) return Catalog(streams) diff --git a/singer/messages.py b/singer/messages.py index fddd1de..7fc73cf 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -37,9 +37,10 @@ class RecordMessage(Message): ''' - def __init__(self, stream, record, version=None): + def __init__(self, stream, record, time_extracted=None, version=None): self.stream = stream self.record = record + self.time_extracted = time_extracted self.version = version def asdict(self): @@ -48,6 +49,8 @@ def asdict(self): 'stream': self.stream, 'record': self.record, } + if self.time_extracted is not None: + result['time_extracted'] = self.time_extracted if self.version is not None: result['version'] = self.version return result @@ -76,18 +79,22 @@ class SchemaMessage(Message): >>> key_properties=['id']) ''' - def __init__(self, stream, schema, key_properties): + def __init__(self, stream, schema, key_properties, bookmark_properties=None): self.stream = stream self.schema = schema self.key_properties = key_properties + self.bookmark_properties = bookmark_properties def asdict(self): - return { + result = { 'type': 'SCHEMA', 'stream': self.stream, 'schema': self.schema, 'key_properties': self.key_properties } + if self.bookmark_properties is not None: + result['bookmark_properties'] = self.bookmark_properties + return result class StateMessage(Message): @@ -159,12 +166,15 @@ def parse_message(msg): if msg_type == 'RECORD': return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), + time_extracted= obj.get('time_extracted') version=obj.get('version')) + elif msg_type == 'SCHEMA': return SchemaMessage(stream=_required_key(obj, 'stream'), schema=_required_key(obj, 'schema'), - key_properties=_required_key(obj, 'key_properties')) + key_properties=_required_key(obj, 'key_properties') + bookmark_properties= obj.get('bookmark_properties') ) elif msg_type == 'STATE': return StateMessage(value=_required_key(obj, 'value')) @@ -183,12 +193,12 @@ def write_message(message): sys.stdout.flush() -def write_record(stream_name, record, stream_alias=None): +def write_record(stream_name, record, stream_alias=None, time_extracted=None): """Write a single record for the given stream. >>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"}) """ - write_message(RecordMessage(stream=(stream_alias or stream_name), record=record)) + write_message(RecordMessage(stream=(stream_alias or stream_name), record=record, time_extracted=time_extracted)) def write_records(stream_name, records): @@ -202,7 +212,7 @@ def write_records(stream_name, records): write_record(stream_name, record) -def write_schema(stream_name, schema, key_properties, stream_alias=None): +def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None): """Write a schema message. >>> stream = 'test' @@ -218,7 +228,8 @@ def write_schema(stream_name, schema, key_properties, stream_alias=None): SchemaMessage( stream=(stream_alias or stream_name), schema=schema, - key_properties=key_properties)) + key_properties=key_properties, + bookmark_properties=bookmark_properties)) def write_state(value): From 1d0416c7275d5fef804d4f9a4ae4464cb1ee8daa Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 15:28:39 -0500 Subject: [PATCH 02/26] Adding commas to fix error --- singer/messages.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index 7fc73cf..78f3577 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -166,15 +166,15 @@ def parse_message(msg): if msg_type == 'RECORD': return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), - time_extracted= obj.get('time_extracted') - version=obj.get('version')) + version=obj.get('version'), + time_extracted= obj.get('time_extracted')) elif msg_type == 'SCHEMA': return SchemaMessage(stream=_required_key(obj, 'stream'), schema=_required_key(obj, 'schema'), - key_properties=_required_key(obj, 'key_properties') - bookmark_properties= obj.get('bookmark_properties') ) + key_properties=_required_key(obj, 'key_properties'), + bookmark_properties= obj.get('bookmark_properties')) elif msg_type == 'STATE': return StateMessage(value=_required_key(obj, 'value')) From 3524f407edca81affaf68e7190fc113d073d13ed Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 15:40:25 -0500 Subject: [PATCH 03/26] Fixing space errors --- singer/messages.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index 78f3577..f58ecd5 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -167,14 +167,14 @@ def parse_message(msg): return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), version=obj.get('version'), - time_extracted= obj.get('time_extracted')) + time_extracted=obj.get('time_extracted')) elif msg_type == 'SCHEMA': return SchemaMessage(stream=_required_key(obj, 'stream'), schema=_required_key(obj, 'schema'), key_properties=_required_key(obj, 'key_properties'), - bookmark_properties= obj.get('bookmark_properties')) + bookmark_properties=obj.get('bookmark_properties')) elif msg_type == 'STATE': return StateMessage(value=_required_key(obj, 'value')) @@ -198,7 +198,8 @@ def write_record(stream_name, record, stream_alias=None, time_extracted=None): >>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"}) """ - write_message(RecordMessage(stream=(stream_alias or stream_name), record=record, time_extracted=time_extracted)) + write_message(RecordMessage(stream=(stream_alias or stream_name), record=record, + time_extracted=time_extracted)) def write_records(stream_name, records): From 90cc986c3ffcf2eac9b6a052271fe0fd95d8a790 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 15:49:11 -0500 Subject: [PATCH 04/26] Fixing pylint warnings --- singer/messages.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index f58ecd5..5980e3b 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -198,8 +198,9 @@ def write_record(stream_name, record, stream_alias=None, time_extracted=None): >>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"}) """ - write_message(RecordMessage(stream=(stream_alias or stream_name), record=record, - time_extracted=time_extracted)) + write_message(RecordMessage(stream=(stream_alias or stream_name), + record=record, + time_extracted=time_extracted)) def write_records(stream_name, records): From 9cbdfa25bf485aad146ebd05d2aa868ed34d672d Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 16:14:29 -0500 Subject: [PATCH 05/26] validating schema in discovery mode Added write_catalog function --- singer/catalog.py | 27 +++++++++++++++++++++++++++ tests/test_catalog.py | 19 +++++++++++++------ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/singer/catalog.py b/singer/catalog.py index 9cbbe52..fbd9720 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -4,6 +4,7 @@ import sys from singer.schema import Schema +from jsonschema import ValidationError, Draft4Validator, FormatChecker # pylint: disable=too-many-instance-attributes class CatalogEntry(object): @@ -109,3 +110,29 @@ def get_stream(self, tap_stream_id): if stream.tap_stream_id == tap_stream_id: return stream return None + + +CATALOG_SCHEMA = {'type': 'object', + 'required': ['streams'], + 'properties': { + 'streams' : { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': ['stream', 'tap_stream_id', 'schema'], + 'properties': { + 'stream': {'type': 'string'}, + 'tap_stream_id': {'type': 'string'}, + 'schema': {'type': 'object'} + } + } + } + } + } + +CATALOG_VALIDATOR = Draft4Validator(CATALOG_SCHEMA, + format_checker=FormatChecker()) + +def write_catalog(streams): + CATALOG_VALIDATOR.validate(streams) + json.dump(streams, sys.stdout, indent=2) diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 61a3901..585a4c7 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,11 +1,10 @@ import unittest +import singer.catalog from singer.schema import Schema from singer.catalog import Catalog, CatalogEntry -class TestToDictAndFromDict(unittest.TestCase): - - dict_form = { +dict_form = { 'streams': [ { 'stream': 'users', @@ -38,7 +37,7 @@ class TestToDictAndFromDict(unittest.TestCase): ] } - obj_form = Catalog(streams=[ +obj_form = Catalog(streams=[ CatalogEntry( stream='users', tap_stream_id='prod_users', @@ -62,11 +61,15 @@ class TestToDictAndFromDict(unittest.TestCase): 'id': Schema(type='integer', selected=True), 'amount': Schema(type='number', selected=True)}))]) + + + +class TestToDictAndFromDict(unittest.TestCase): def test_from_dict(self): - self.assertEqual(self.obj_form, Catalog.from_dict(self.dict_form)) + self.assertEqual(obj_form, Catalog.from_dict(dict_form)) def test_to_dict(self): - self.assertEqual(self.dict_form, self.obj_form.to_dict()) + self.assertEqual(dict_form, obj_form.to_dict()) class TestGetStream(unittest.TestCase): @@ -77,3 +80,7 @@ def test(self): CatalogEntry(tap_stream_id='c')]) entry = catalog.get_stream('b') self.assertEquals('b', entry.tap_stream_id) + +class TestWriteCatalog(unittest.TestCase): + def test(self): + singer.catalog.write_catalog(dict_form) From 25b19d2a4e8a2c277b8485b219eb6aa44317847a Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Mon, 27 Nov 2017 14:56:51 -0500 Subject: [PATCH 06/26] Check for time_extracted being an aware datetime and ensure timezone is set to UTC for time_extracted --- singer/catalog.py | 6 +----- singer/messages.py | 15 ++++++++++----- singer/utils.py | 8 ++++++++ tests/test_singer.py | 5 +++++ 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/singer/catalog.py b/singer/catalog.py index 4572dca..9cbbe52 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -11,7 +11,7 @@ class CatalogEntry(object): def __init__(self, tap_stream_id=None, stream=None, key_properties=None, schema=None, replication_key=None, is_view=None, database=None, table=None, row_count=None, - stream_alias=None, metadata=None, bookmark_properties=None): + stream_alias=None, metadata=None): self.tap_stream_id = tap_stream_id self.stream = stream @@ -24,7 +24,6 @@ def __init__(self, tap_stream_id=None, stream=None, self.row_count = row_count self.stream_alias = stream_alias self.metadata = metadata - self.bookmark_properties = bookmark_properties def __str__(self): return str(self.__dict__) @@ -58,8 +57,6 @@ def to_dict(self): result['row_count'] = self.row_count if self.metadata is not None: result['metadata'] = self.metadata - if self.bookmark_properties is not None: - result['bookmark_properties'] = self.bookmark_properties return result @@ -98,7 +95,6 @@ def from_dict(cls, data): entry.schema = Schema.from_dict(stream.get('schema')) entry.is_view = stream.get('is_view') entry.stream_alias = stream.get('stream_alias') - entry.bookmark_properties = stream.get('bookmark_properties') streams.append(entry) return Catalog(streams) diff --git a/singer/messages.py b/singer/messages.py index 5980e3b..4145e56 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -1,6 +1,6 @@ import sys import simplejson as json - +import singer.utils as u class Message(object): '''Base class for messages.''' @@ -37,11 +37,16 @@ class RecordMessage(Message): ''' - def __init__(self, stream, record, time_extracted=None, version=None): + def __init__(self, stream, record, version=None, time_extracted=None): self.stream = stream self.record = record - self.time_extracted = time_extracted self.version = version + if time_extracted is not None: + if u.is_aware_datetime(time_extracted): + self.time_extracted = time_extracted + else: + raise Exception("'time_extracted' must be an aware " + "datetime (with a time zone)") def asdict(self): result = { @@ -49,10 +54,10 @@ def asdict(self): 'stream': self.stream, 'record': self.record, } - if self.time_extracted is not None: - result['time_extracted'] = self.time_extracted if self.version is not None: result['version'] = self.version + if self.time_extracted is not None: + result['time_extracted'] = u.convert_string_timezone_to_utc(self.time_extracted) return result def __str__(self): diff --git a/singer/utils.py b/singer/utils.py index 5315484..f596732 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -34,6 +34,14 @@ def strftime(dtime): raise Exception("datetime must be pegged at UTC tzoneinfo") return dtime.strftime(DATETIME_FMT) +def is_aware_datetime(dtime): + d_object = dateutil.parser.parse(dtime) + return(d_object.tzinfo is not None) + +def convert_string_timezone_to_utc(dtime): + d_object = dateutil.parser.parse(dtime) + return d_object.astimezone(pytz.utc).strftime(DATETIME_FMT) + def ratelimit(limit, every): def limitdecorator(func): times = collections.deque() diff --git a/tests/test_singer.py b/tests/test_singer.py index 252f294..38eb0e4 100644 --- a/tests/test_singer.py +++ b/tests/test_singer.py @@ -17,6 +17,11 @@ def test_parse_message_record_with_version_good(self): message, singer.RecordMessage(record={'name': 'foo'}, stream='users', version=2)) + def test_parse_message_record_naive_extraction_time(self): + with self.assertRaises(Exception): + message = singer.parse_message( + '{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00"}') + def test_parse_message_record_missing_record(self): with self.assertRaises(Exception): singer.parse_message('{"type": "RECORD", "stream": "users"}') From d297a0eb5b4cdc0f8caa2c7b4c74433f1dcbbe7d Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Mon, 27 Nov 2017 15:16:24 -0500 Subject: [PATCH 07/26] Fixing pylint errors --- singer/messages.py | 7 +++---- singer/utils.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index 4145e56..679831f 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -41,12 +41,11 @@ def __init__(self, stream, record, version=None, time_extracted=None): self.stream = stream self.record = record self.version = version + self.time_extracted = time_extracted if time_extracted is not None: - if u.is_aware_datetime(time_extracted): - self.time_extracted = time_extracted - else: + if not u.is_aware_datetime(time_extracted): raise Exception("'time_extracted' must be an aware " - "datetime (with a time zone)") + "datetime (with a time zone)") def asdict(self): result = { diff --git a/singer/utils.py b/singer/utils.py index f596732..3d077ce 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -36,7 +36,7 @@ def strftime(dtime): def is_aware_datetime(dtime): d_object = dateutil.parser.parse(dtime) - return(d_object.tzinfo is not None) + return d_object.tzinfo is not None def convert_string_timezone_to_utc(dtime): d_object = dateutil.parser.parse(dtime) From 247d4d366358b8512f14ebc96945ed8ef69965f6 Mon Sep 17 00:00:00 2001 From: Mike DeLaurentis Date: Tue, 28 Nov 2017 09:11:18 -0500 Subject: [PATCH 08/26] Wrapping up --- singer/messages.py | 20 ++++++++++++-------- singer/utils.py | 9 --------- tests/test_singer.py | 17 +++++++++++++++-- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index 679831f..9ceaa53 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -1,6 +1,8 @@ import sys import simplejson as json import singer.utils as u +import dateutil +import pytz class Message(object): '''Base class for messages.''' @@ -42,10 +44,9 @@ def __init__(self, stream, record, version=None, time_extracted=None): self.record = record self.version = version self.time_extracted = time_extracted - if time_extracted is not None: - if not u.is_aware_datetime(time_extracted): - raise Exception("'time_extracted' must be an aware " - "datetime (with a time zone)") + if time_extracted and not time_extracted.tzinfo: + raise ValueError("'time_extracted' must be either None " + + "or an aware datetime (with a time zone)") def asdict(self): result = { @@ -55,8 +56,8 @@ def asdict(self): } if self.version is not None: result['version'] = self.version - if self.time_extracted is not None: - result['time_extracted'] = u.convert_string_timezone_to_utc(self.time_extracted) + if self.time_extracted: + result['time_extracted'] = self.time_extracted.astimezone(pytz.utc).strftime(u.DATETIME_FMT) return result def __str__(self): @@ -96,7 +97,7 @@ def asdict(self): 'schema': self.schema, 'key_properties': self.key_properties } - if self.bookmark_properties is not None: + if self.bookmark_properties: result['bookmark_properties'] = self.bookmark_properties return result @@ -168,10 +169,13 @@ def parse_message(msg): msg_type = _required_key(obj, 'type') if msg_type == 'RECORD': + time_extracted = obj.get('time_extracted') + if time_extracted: + time_extracted = dateutil.parser.parse(time_extracted) return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), version=obj.get('version'), - time_extracted=obj.get('time_extracted')) + time_extracted=time_extracted) elif msg_type == 'SCHEMA': diff --git a/singer/utils.py b/singer/utils.py index 3d077ce..60e34d0 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -5,7 +5,6 @@ import json import time import dateutil -import pytz import backoff as backoff_module from singer.catalog import Catalog @@ -34,14 +33,6 @@ def strftime(dtime): raise Exception("datetime must be pegged at UTC tzoneinfo") return dtime.strftime(DATETIME_FMT) -def is_aware_datetime(dtime): - d_object = dateutil.parser.parse(dtime) - return d_object.tzinfo is not None - -def convert_string_timezone_to_utc(dtime): - d_object = dateutil.parser.parse(dtime) - return d_object.astimezone(pytz.utc).strftime(DATETIME_FMT) - def ratelimit(limit, every): def limitdecorator(func): times = collections.deque() diff --git a/tests/test_singer.py b/tests/test_singer.py index 38eb0e4..f820b37 100644 --- a/tests/test_singer.py +++ b/tests/test_singer.py @@ -1,6 +1,7 @@ import singer import unittest - +import datetime +import dateutil class TestSinger(unittest.TestCase): def test_parse_message_record_good(self): @@ -18,10 +19,22 @@ def test_parse_message_record_with_version_good(self): singer.RecordMessage(record={'name': 'foo'}, stream='users', version=2)) def test_parse_message_record_naive_extraction_time(self): - with self.assertRaises(Exception): + with self.assertRaisesRegex(ValueError, "must be either None or an aware datetime"): message = singer.parse_message( '{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00"}') + def test_parse_message_record_aware_extraction_time(self): + message = singer.parse_message( + '{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00.000Z"}') + expected = singer.RecordMessage( + record={'name': 'foo'}, + stream='users', + version=2, + time_extracted=dateutil.parser.parse("1970-01-02T00:00:00.000Z")) + print(message) + print(expected) + self.assertEqual(message, expected) + def test_parse_message_record_missing_record(self): with self.assertRaises(Exception): singer.parse_message('{"type": "RECORD", "stream": "users"}') From e56e488d6b84cc44d6f7ac8aa358816d6180145e Mon Sep 17 00:00:00 2001 From: Mike DeLaurentis Date: Tue, 28 Nov 2017 09:12:32 -0500 Subject: [PATCH 09/26] Pylint --- singer/messages.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/singer/messages.py b/singer/messages.py index 9ceaa53..b6fb90d 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -57,7 +57,8 @@ def asdict(self): if self.version is not None: result['version'] = self.version if self.time_extracted: - result['time_extracted'] = self.time_extracted.astimezone(pytz.utc).strftime(u.DATETIME_FMT) + as_utc = self.time_extracted.astimezone(pytz.utc) + result['time_extracted'] = as_utc.strftime(u.DATETIME_FMT) return result def __str__(self): From c53912fc3a65fd5e0bc1b87b567814b72851272a Mon Sep 17 00:00:00 2001 From: Chris Capurso Date: Tue, 28 Nov 2017 11:27:25 -0500 Subject: [PATCH 10/26] import pytz --- singer/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singer/utils.py b/singer/utils.py index 60e34d0..5315484 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -5,6 +5,7 @@ import json import time import dateutil +import pytz import backoff as backoff_module from singer.catalog import Catalog From 7db81df18d89c5e458016395701e7c0823e4c904 Mon Sep 17 00:00:00 2001 From: Chris Capurso Date: Tue, 28 Nov 2017 13:34:55 -0500 Subject: [PATCH 11/26] handle string or array bookmark_properties --- singer/messages.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/singer/messages.py b/singer/messages.py index b6fb90d..d57797e 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -235,6 +235,12 @@ def write_schema(stream_name, schema, key_properties, bookmark_properties=None, key_properties = [key_properties] if not isinstance(key_properties, list): raise Exception("key_properties must be a string or list of strings") + + if isinstance(bookmark_properties, (str, bytes)): + bookmark_properties = [bookmark_properties] + if not isinstance(bookmark_properties, list): + raise Exception("bookmark_properties must be a string or list of strings") + write_message( SchemaMessage( stream=(stream_alias or stream_name), From 42fe69a2f53b8d80875c8b20b75a50bf34a67fa7 Mon Sep 17 00:00:00 2001 From: Chris Capurso Date: Tue, 28 Nov 2017 13:44:42 -0500 Subject: [PATCH 12/26] allow bookmark_properties to be None --- singer/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer/messages.py b/singer/messages.py index d57797e..dbb3838 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -238,7 +238,7 @@ def write_schema(stream_name, schema, key_properties, bookmark_properties=None, if isinstance(bookmark_properties, (str, bytes)): bookmark_properties = [bookmark_properties] - if not isinstance(bookmark_properties, list): + if bookmark_properties and not isinstance(bookmark_properties, list): raise Exception("bookmark_properties must be a string or list of strings") write_message( From 1b1f32c84588e20dd19e6bccff22272c05e01a6a Mon Sep 17 00:00:00 2001 From: Chris Capurso Date: Tue, 28 Nov 2017 14:26:02 -0500 Subject: [PATCH 13/26] bump version to 5.0.0 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4d658d3..819f518 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import subprocess setup(name="singer-python", - version='4.1.0', + version='5.0.0', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], From 5a89d5d1bd77ce3082f31dc67eef7c812912e15f Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 29 Nov 2017 14:32:53 -0500 Subject: [PATCH 14/26] move bookmark property handling from write_schema into SchemaMessage __init__ --- singer/messages.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/singer/messages.py b/singer/messages.py index dbb3838..465f0c0 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -91,6 +91,11 @@ def __init__(self, stream, schema, key_properties, bookmark_properties=None): self.key_properties = key_properties self.bookmark_properties = bookmark_properties + if isinstance(bookmark_properties, (str, bytes)): + bookmark_properties = [bookmark_properties] + if bookmark_properties and not isinstance(bookmark_properties, list): + raise Exception("bookmark_properties must be a string or list of strings") + def asdict(self): result = { 'type': 'SCHEMA', @@ -236,11 +241,6 @@ def write_schema(stream_name, schema, key_properties, bookmark_properties=None, if not isinstance(key_properties, list): raise Exception("key_properties must be a string or list of strings") - if isinstance(bookmark_properties, (str, bytes)): - bookmark_properties = [bookmark_properties] - if bookmark_properties and not isinstance(bookmark_properties, list): - raise Exception("bookmark_properties must be a string or list of strings") - write_message( SchemaMessage( stream=(stream_alias or stream_name), From d3dd5ba3d21ec21eaf44fb1f8db60f7fb2482687 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 29 Nov 2017 14:42:07 -0500 Subject: [PATCH 15/26] bump version to 5.0.1 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 819f518..3713b12 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import subprocess setup(name="singer-python", - version='5.0.0', + version='5.0.1', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], From 231dd45769d858b257b035dd157a762d9583b5bf Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Thu, 30 Nov 2017 15:50:36 -0500 Subject: [PATCH 16/26] Forcing SchemaMessage so that bookmark_properties is always an array --- singer/messages.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/singer/messages.py b/singer/messages.py index 465f0c0..add3877 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -89,13 +89,14 @@ def __init__(self, stream, schema, key_properties, bookmark_properties=None): self.stream = stream self.schema = schema self.key_properties = key_properties - self.bookmark_properties = bookmark_properties if isinstance(bookmark_properties, (str, bytes)): bookmark_properties = [bookmark_properties] if bookmark_properties and not isinstance(bookmark_properties, list): raise Exception("bookmark_properties must be a string or list of strings") + self.bookmark_properties = bookmark_properties + def asdict(self): result = { 'type': 'SCHEMA', From 23c389dba58b2e3d184378f154f5b5ac24f51db3 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Thu, 30 Nov 2017 16:00:51 -0500 Subject: [PATCH 17/26] Pylint fix --- singer/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer/messages.py b/singer/messages.py index add3877..330605f 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -95,7 +95,7 @@ def __init__(self, stream, schema, key_properties, bookmark_properties=None): if bookmark_properties and not isinstance(bookmark_properties, list): raise Exception("bookmark_properties must be a string or list of strings") - self.bookmark_properties = bookmark_properties + self.bookmark_properties = bookmark_properties def asdict(self): result = { From bdbd7763d4e050a3916e306492d382626a21759c Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Thu, 30 Nov 2017 16:17:50 -0500 Subject: [PATCH 18/26] Bumping singer-python version to 5.0.2 and deploying --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3713b12..63dc8ab 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import subprocess setup(name="singer-python", - version='5.0.1', + version='5.0.2', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], From 65bda5b4b0516df79afc06a4c512947d0c9765e3 Mon Sep 17 00:00:00 2001 From: Jeff Gordon Date: Sun, 10 Dec 2017 13:31:29 -0800 Subject: [PATCH 19/26] Add metadata assignment to Catalog.from_dict. Update test_catalog with metadata. --- .gitignore | 3 +++ singer/catalog.py | 1 + tests/test_catalog.py | 22 +++++++++++++++++++++- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 14cb723..9aa5b22 100644 --- a/.gitignore +++ b/.gitignore @@ -90,3 +90,6 @@ ENV/ .ropeproject .pypirc + +# PyCharm +.idea/ diff --git a/singer/catalog.py b/singer/catalog.py index 9cbbe52..cff25ea 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -95,6 +95,7 @@ def from_dict(cls, data): entry.schema = Schema.from_dict(stream.get('schema')) entry.is_view = stream.get('is_view') entry.stream_alias = stream.get('stream_alias') + entry.metadata = stream.get('metadata') streams.append(entry) return Catalog(streams) diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 61a3901..195db9e 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -20,6 +20,17 @@ class TestToDictAndFromDict(unittest.TestCase): 'name': {'type': 'string', 'selected': True} } }, + 'metadata': [ + { + 'metadata': { + 'metadata-key': 'metadata-value' + }, + 'breadcrumb': [ + 'properties', + 'name', + ], + }, + ], }, { 'stream': 'orders', @@ -49,7 +60,16 @@ class TestToDictAndFromDict(unittest.TestCase): selected=True, properties={ 'id': Schema(type='integer', selected=True), - 'name': Schema(type='string', selected=True)})), + 'name': Schema(type='string', selected=True)}), + metadata=[{ + 'metadata': { + 'metadata-key': 'metadata-value' + }, + 'breadcrumb': [ + 'properties', + 'name', + ], + }]), CatalogEntry( stream='orders', tap_stream_id='prod_orders', From 219b8eb6d582139e6eb576d2cb3e3fcad253e32b Mon Sep 17 00:00:00 2001 From: Jeff Gordon Date: Sun, 10 Dec 2017 15:47:40 -0800 Subject: [PATCH 20/26] Add stream_alias to Catalog.to_dict. Add to unit test. --- singer/catalog.py | 2 ++ tests/test_catalog.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/singer/catalog.py b/singer/catalog.py index 9cbbe52..5e2b2e8 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -55,6 +55,8 @@ def to_dict(self): result['stream'] = self.stream if self.row_count is not None: result['row_count'] = self.row_count + if self.stream_alias is not None: + result['stream_alias'] = self.stream_alias if self.metadata is not None: result['metadata'] = self.metadata return result diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 61a3901..6c1d2aa 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -10,6 +10,7 @@ class TestToDictAndFromDict(unittest.TestCase): { 'stream': 'users', 'tap_stream_id': 'prod_users', + 'stream_alias': 'users_alias', 'database_name': 'prod', 'table_name': 'users', 'schema': { @@ -42,6 +43,7 @@ class TestToDictAndFromDict(unittest.TestCase): CatalogEntry( stream='users', tap_stream_id='prod_users', + stream_alias='users_alias', database='prod', table='users', schema=Schema( From be8b565273d08c14716ff9e3956f7fb02b5e370f Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Thu, 14 Dec 2017 11:10:15 -0500 Subject: [PATCH 21/26] changing strftime format string --- singer/utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/singer/utils.py b/singer/utils.py index 5315484..048c138 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -11,7 +11,7 @@ from singer.catalog import Catalog DATETIME_PARSE = "%Y-%m-%dT%H:%M:%SZ" -DATETIME_FMT = "%04Y-%m-%dT%H:%M:%S.%fZ" +DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" def now(): return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) @@ -29,10 +29,10 @@ def strptime(dtime): except Exception: return datetime.datetime.strptime(dtime, DATETIME_PARSE) -def strftime(dtime): +def strftime(dtime, format_str=DATETIME_FMT): if dtime.utcoffset() != datetime.timedelta(0): raise Exception("datetime must be pegged at UTC tzoneinfo") - return dtime.strftime(DATETIME_FMT) + return dtime.strftime(format_str) def ratelimit(limit, every): def limitdecorator(func): From 215171b445771623053ea649d8192e4bf4ad4a24 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Thu, 14 Dec 2017 15:42:41 -0500 Subject: [PATCH 22/26] fixing strftime format string --- tests/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index cd7fc42..0b89448 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,5 +6,5 @@ class TestFormat(unittest.TestCase): def test_small_years(self): - self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc)), + self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc), '%04Y-%m-%dT%H:%M:%S.%fZ'), "0090-01-01T00:00:00.000000Z") From cdbf7ed768ee943e3d0717ec57cfc0a8dcb44455 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Fri, 15 Dec 2017 10:09:18 -0500 Subject: [PATCH 23/26] bumping version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 63dc8ab..4a881f7 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import subprocess setup(name="singer-python", - version='5.0.2', + version='5.0.3', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], From 89c64668c0264fcced0d67852c609ab971791abb Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Wed, 22 Nov 2017 16:14:29 -0500 Subject: [PATCH 24/26] validating schema in discovery mode Added write_catalog function --- singer/catalog.py | 27 +++++++++++++++++++++++++++ tests/test_catalog.py | 19 +++++++++++++------ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/singer/catalog.py b/singer/catalog.py index 192eea0..234e791 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -4,6 +4,7 @@ import sys from singer.schema import Schema +from jsonschema import ValidationError, Draft4Validator, FormatChecker # pylint: disable=too-many-instance-attributes class CatalogEntry(object): @@ -112,3 +113,29 @@ def get_stream(self, tap_stream_id): if stream.tap_stream_id == tap_stream_id: return stream return None + + +CATALOG_SCHEMA = {'type': 'object', + 'required': ['streams'], + 'properties': { + 'streams' : { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': ['stream', 'tap_stream_id', 'schema'], + 'properties': { + 'stream': {'type': 'string'}, + 'tap_stream_id': {'type': 'string'}, + 'schema': {'type': 'object'} + } + } + } + } + } + +CATALOG_VALIDATOR = Draft4Validator(CATALOG_SCHEMA, + format_checker=FormatChecker()) + +def write_catalog(streams): + CATALOG_VALIDATOR.validate(streams) + json.dump(streams, sys.stdout, indent=2) diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 4cb9390..271f993 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,11 +1,10 @@ import unittest +import singer.catalog from singer.schema import Schema from singer.catalog import Catalog, CatalogEntry -class TestToDictAndFromDict(unittest.TestCase): - - dict_form = { +dict_form = { 'streams': [ { 'stream': 'users', @@ -50,7 +49,7 @@ class TestToDictAndFromDict(unittest.TestCase): ] } - obj_form = Catalog(streams=[ +obj_form = Catalog(streams=[ CatalogEntry( stream='users', tap_stream_id='prod_users', @@ -84,11 +83,15 @@ class TestToDictAndFromDict(unittest.TestCase): 'id': Schema(type='integer', selected=True), 'amount': Schema(type='number', selected=True)}))]) + + + +class TestToDictAndFromDict(unittest.TestCase): def test_from_dict(self): - self.assertEqual(self.obj_form, Catalog.from_dict(self.dict_form)) + self.assertEqual(obj_form, Catalog.from_dict(dict_form)) def test_to_dict(self): - self.assertEqual(self.dict_form, self.obj_form.to_dict()) + self.assertEqual(dict_form, obj_form.to_dict()) class TestGetStream(unittest.TestCase): @@ -99,3 +102,7 @@ def test(self): CatalogEntry(tap_stream_id='c')]) entry = catalog.get_stream('b') self.assertEquals('b', entry.tap_stream_id) + +class TestWriteCatalog(unittest.TestCase): + def test(self): + singer.catalog.write_catalog(dict_form) From c8c507bf7c758adb44c9747ec44c95708d9a0538 Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Fri, 15 Dec 2017 13:07:27 -0500 Subject: [PATCH 25/26] pylint --- singer/catalog.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/singer/catalog.py b/singer/catalog.py index 234e791..17e5668 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -115,23 +115,24 @@ def get_stream(self, tap_stream_id): return None -CATALOG_SCHEMA = {'type': 'object', - 'required': ['streams'], - 'properties': { - 'streams' : { - 'type': 'array', - 'items': { - 'type': 'object', - 'required': ['stream', 'tap_stream_id', 'schema'], - 'properties': { - 'stream': {'type': 'string'}, - 'tap_stream_id': {'type': 'string'}, - 'schema': {'type': 'object'} - } - } - } - } - } +CATALOG_SCHEMA = { + 'type': 'object', + 'required': ['streams'], + 'properties': { + 'streams' : { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': ['stream', 'tap_stream_id', 'schema'], + 'properties': { + 'stream': {'type': 'string'}, + 'tap_stream_id': {'type': 'string'}, + 'schema': {'type': 'object'} + } + } + } + } +} CATALOG_VALIDATOR = Draft4Validator(CATALOG_SCHEMA, format_checker=FormatChecker()) From b82f0f116bda7c3134f60970e3c9d6fe91a3f0ab Mon Sep 17 00:00:00 2001 From: Nick McCoy Date: Mon, 18 Dec 2017 11:34:22 -0500 Subject: [PATCH 26/26] pylint fixes --- singer/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer/catalog.py b/singer/catalog.py index 17e5668..7807033 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -4,7 +4,7 @@ import sys from singer.schema import Schema -from jsonschema import ValidationError, Draft4Validator, FormatChecker +from jsonschema import Draft4Validator, FormatChecker # pylint: disable=too-many-instance-attributes class CatalogEntry(object):