Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions tap_purecloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ def stream_results(generator, entity_names: tuple, transform_record, record_name
records = [transform_record(k, v) for (k,v) in entity_page.items()]
else:
records = [transform_record(record) for record in entity_page]
valid_records = [r for r in records if r is not None]
with singer.Transformer() as transformer:
valid_records = [ transformer.transform(r, schema, {}) for r in records if r is not None]
singer.write_records(record_name, valid_records)
all_records.extend(valid_records)
return all_records
Expand All @@ -202,16 +203,16 @@ def stream_results_list(generator, entity_names: tuple, transform_record, record
singer.write_records(record_name, records)


def sync_users(api_client: ApiClient):
def sync_users(api_client: ApiClient, schema):
logger.info("Fetching users")
api_instance = UsersApi(api_client)
body = FakeBody()
entity_names = ('entities', )
gen_users = fetch_all_records(api_instance.get_users, entity_names, body, {'expand': ['locations']})
stream_results(gen_users, entity_names, handle_object, 'users', schemas.user, ['id'], True)
stream_results(gen_users, entity_names, handle_object, 'users', schema, ['id'], True)


def sync_groups(api_client: ApiClient):
def sync_groups(api_client: ApiClient, schema):
logger.info("Fetching groups")
api_instance = GroupsApi(api_client)
body = FakeBody()
Expand All @@ -220,7 +221,7 @@ def sync_groups(api_client: ApiClient):
stream_results(gen_groups, entity_names, handle_object, 'groups', schemas.group, ['id'], True)


def sync_locations(api_client: ApiClient):
def sync_locations(api_client: ApiClient, schema):
logger.info("Fetching locations")
api_instance = LocationsApi(api_client)
body = LocationSearchRequest()
Expand All @@ -229,7 +230,7 @@ def sync_locations(api_client: ApiClient):
stream_results(gen_locations, entity_names, handle_object, 'location', schemas.location, ['id'], True)


def sync_presence_definitions(api_client: ApiClient):
def sync_presence_definitions(api_client: ApiClient, schema):
logger.info("Fetching presence definitions")
api_instance = PresenceApi(api_client)
body = FakeBody()
Expand All @@ -238,7 +239,7 @@ def sync_presence_definitions(api_client: ApiClient):
stream_results(gen_presences, entity_names, handle_object, 'presence', schemas.presence, ['id'], True)


def sync_queues(api_client: ApiClient):
def sync_queues(api_client: ApiClient, schema):
logger.info("Fetching queues")
api_instance = RoutingApi(api_client)
body = FakeBody()
Expand Down Expand Up @@ -487,7 +488,7 @@ def sync_historical_adherence(api_instance: WorkforceManagementApi, config, unit
sync_date = next_date
first_page = False

def sync_management_units(api_client: ApiClient, config):
def sync_management_units(api_client: ApiClient, config, catalog):
logger.info("Fetching management units")
api_instance = WorkforceManagementApi(api_client)
body = FakeBody()
Expand Down Expand Up @@ -525,7 +526,7 @@ def sync_management_units(api_client: ApiClient, config):
sync_historical_adherence(api_instance, config, unit_id, unit_users[unit_id], first_page)


def sync_conversations(api_client: ApiClient, config):
def sync_conversations(api_client: ApiClient, config, catalog):
logger.info("Fetching conversations")
api_instance = ConversationsApi(api_client)

Expand Down Expand Up @@ -677,7 +678,7 @@ def handle_user_details(user_details_record):
return presences + statuses


def sync_user_state(api_client: ApiClient, config):
def sync_user_state(api_client: ApiClient, config, catalog):
logger.info("Fetching user state")
api_instance = UsersApi(api_client)

Expand Down Expand Up @@ -728,6 +729,7 @@ def parse_to_date(date_string: str) -> 'datetime.date':
def do_sync(args):
config: dict = args.config
state: dict = args.state
catalog: dict = args.catalog

# grab start date from state file. If not found
# default to value in config file
Expand All @@ -752,16 +754,30 @@ def do_sync(args):

logger.info(f"Successfully got access token. Starting sync from {start_date}")

# https://developer.genesys.cloud/devapps/sdk/docexplorer/purecloudpython/
sync_users(api_client)
sync_groups(api_client)
sync_locations(api_client)
sync_presence_definitions(api_client)
sync_queues(api_client)

sync_management_units(api_client, config)
sync_conversations(api_client, config)
sync_user_state(api_client, config)
for stream in catalog.streams:
stream_name = stream.tap_stream_id
mdata = singer.metadata.to_map(stream.metadata)
if not mdata.get((), {}).get('selected', False):
logger.info("%s: Skipping - not selected", stream_name)
continue

# https://developer.genesys.cloud/devapps/sdk/docexplorer/purecloudpython/
if stream_name == 'users':
sync_users(api_client, stream.schema.to_dict())
elif stream_name == 'groups':
sync_groups(api_client, stream.schema.to_dict())
elif stream_name == 'location':
sync_locations(api_client, stream.schema.to_dict())
elif stream_name == 'presence':
sync_presence_definitions(api_client, stream.schema.to_dict())
elif stream_name == 'queues':
sync_queues(api_client, stream.schema.to_dict())
elif stream_name == 'management_unit':
sync_management_units(api_client, config, stream.schema.to_dict())
elif stream_name == 'conversation':
sync_conversations(api_client, config, stream.schema.to_dict())
elif stream_name == 'user_state':
sync_user_state(api_client, config, stream.schema.to_dict())

new_state = {
'start_date': datetime.date.today().strftime('%Y-%m-%d')
Expand Down