diff --git a/tap_purecloud/__init__.py b/tap_purecloud/__init__.py index 988a6c9..e47fe13 100755 --- a/tap_purecloud/__init__.py +++ b/tap_purecloud/__init__.py @@ -182,7 +182,8 @@ def stream_results(generator, entity_names: tuple, transform_record, record_name records = [transform_record(k, v) for (k,v) in entity_page.items()] else: records = [transform_record(record) for record in entity_page] - valid_records = [r for r in records if r is not None] + with singer.Transformer() as transformer: + valid_records = [ transformer.transform(r, schema, {}) for r in records if r is not None] singer.write_records(record_name, valid_records) all_records.extend(valid_records) return all_records @@ -202,16 +203,16 @@ def stream_results_list(generator, entity_names: tuple, transform_record, record singer.write_records(record_name, records) -def sync_users(api_client: ApiClient): +def sync_users(api_client: ApiClient, schema): logger.info("Fetching users") api_instance = UsersApi(api_client) body = FakeBody() entity_names = ('entities', ) gen_users = fetch_all_records(api_instance.get_users, entity_names, body, {'expand': ['locations']}) - stream_results(gen_users, entity_names, handle_object, 'users', schemas.user, ['id'], True) + stream_results(gen_users, entity_names, handle_object, 'users', schema, ['id'], True) -def sync_groups(api_client: ApiClient): +def sync_groups(api_client: ApiClient, schema): logger.info("Fetching groups") api_instance = GroupsApi(api_client) body = FakeBody() @@ -220,7 +221,7 @@ def sync_groups(api_client: ApiClient): stream_results(gen_groups, entity_names, handle_object, 'groups', schemas.group, ['id'], True) -def sync_locations(api_client: ApiClient): +def sync_locations(api_client: ApiClient, schema): logger.info("Fetching locations") api_instance = LocationsApi(api_client) body = LocationSearchRequest() @@ -229,7 +230,7 @@ def sync_locations(api_client: ApiClient): stream_results(gen_locations, entity_names, handle_object, 'location', schemas.location, ['id'], True) -def sync_presence_definitions(api_client: ApiClient): +def sync_presence_definitions(api_client: ApiClient, schema): logger.info("Fetching presence definitions") api_instance = PresenceApi(api_client) body = FakeBody() @@ -238,7 +239,7 @@ def sync_presence_definitions(api_client: ApiClient): stream_results(gen_presences, entity_names, handle_object, 'presence', schemas.presence, ['id'], True) -def sync_queues(api_client: ApiClient): +def sync_queues(api_client: ApiClient, schema): logger.info("Fetching queues") api_instance = RoutingApi(api_client) body = FakeBody() @@ -487,7 +488,7 @@ def sync_historical_adherence(api_instance: WorkforceManagementApi, config, unit sync_date = next_date first_page = False -def sync_management_units(api_client: ApiClient, config): +def sync_management_units(api_client: ApiClient, config, catalog): logger.info("Fetching management units") api_instance = WorkforceManagementApi(api_client) body = FakeBody() @@ -525,7 +526,7 @@ def sync_management_units(api_client: ApiClient, config): sync_historical_adherence(api_instance, config, unit_id, unit_users[unit_id], first_page) -def sync_conversations(api_client: ApiClient, config): +def sync_conversations(api_client: ApiClient, config, catalog): logger.info("Fetching conversations") api_instance = ConversationsApi(api_client) @@ -677,7 +678,7 @@ def handle_user_details(user_details_record): return presences + statuses -def sync_user_state(api_client: ApiClient, config): +def sync_user_state(api_client: ApiClient, config, catalog): logger.info("Fetching user state") api_instance = UsersApi(api_client) @@ -728,6 +729,7 @@ def parse_to_date(date_string: str) -> 'datetime.date': def do_sync(args): config: dict = args.config state: dict = args.state + catalog: dict = args.catalog # grab start date from state file. If not found # default to value in config file @@ -752,16 +754,30 @@ def do_sync(args): logger.info(f"Successfully got access token. Starting sync from {start_date}") - # https://developer.genesys.cloud/devapps/sdk/docexplorer/purecloudpython/ - sync_users(api_client) - sync_groups(api_client) - sync_locations(api_client) - sync_presence_definitions(api_client) - sync_queues(api_client) - - sync_management_units(api_client, config) - sync_conversations(api_client, config) - sync_user_state(api_client, config) + for stream in catalog.streams: + stream_name = stream.tap_stream_id + mdata = singer.metadata.to_map(stream.metadata) + if not mdata.get((), {}).get('selected', False): + logger.info("%s: Skipping - not selected", stream_name) + continue + + # https://developer.genesys.cloud/devapps/sdk/docexplorer/purecloudpython/ + if stream_name == 'users': + sync_users(api_client, stream.schema.to_dict()) + elif stream_name == 'groups': + sync_groups(api_client, stream.schema.to_dict()) + elif stream_name == 'location': + sync_locations(api_client, stream.schema.to_dict()) + elif stream_name == 'presence': + sync_presence_definitions(api_client, stream.schema.to_dict()) + elif stream_name == 'queues': + sync_queues(api_client, stream.schema.to_dict()) + elif stream_name == 'management_unit': + sync_management_units(api_client, config, stream.schema.to_dict()) + elif stream_name == 'conversation': + sync_conversations(api_client, config, stream.schema.to_dict()) + elif stream_name == 'user_state': + sync_user_state(api_client, config, stream.schema.to_dict()) new_state = { 'start_date': datetime.date.today().strftime('%Y-%m-%d')