forked from anirudhagar13/chicago-crime-analysis-bigdata
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingestion.py
More file actions
77 lines (64 loc) · 1.92 KB
/
ingestion.py
File metadata and controls
77 lines (64 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
'''
Modules to perform data ingestion
'''
import threading
from couchbase.cluster import Cluster
from couchbase.cluster import PasswordAuthenticator
def push_data(data, cb_conn, thread_id):
'''
Push data using parallel threads
'''
# Creating key using thread name
bulk_data = dict()
for index, doc in enumerate(data):
if 'case_number' in doc:
# Hardcoded ID, as model requires this as doc-id
doc_id = doc.get('case_number')
else:
doc_id = 'k' + str(index) + '_' + thread_id
bulk_data[doc_id] = doc
try:
# Bulk uploading of documents
cb_conn.upsert_multi(bulk_data)
except:
print ('Data Insertion failed!')
print ('Thread completed ingestion: ', thread_id)
def sequential_run(cb_conn, data):
'''
Pushes data sequentially
'''
bulk_data = dict()
for index, doc in enumerate(data.T.to_dict().values()):
doc_id = 'k' + str(index)
bulk_data[doc_id] = doc
# API for bulk ingestion
cb_conn.upsert_multi(bulk_data)
def parallel_run(cb_conn, data, threads=10):
'''
Pushes data parallely
'''
jobs = list()
# Hardcoded chunk of data by each thread
chunk_size = 1000
for i in range(0, threads):
chunk = data[chunk_size*i:chunk_size*(i+1)].T.to_dict().values()
thread = threading.Thread(target=push_data(chunk, cb_conn, str(i+1)))
jobs.append(thread)
# Start the threads (i.e. calculate the random number lists)
for j in jobs:
j.start()
# Ensure all of the threads have finished
for j in jobs:
j.join()
def data_ingestion(data, cb_user, cb_pwd, cb_host, cb_bucket):
'''
Ingests data into pipeline
'''
cluster = Cluster('couchbase://' + cb_host)
authenticator = PasswordAuthenticator(cb_user, cb_pwd)
cluster.authenticate(authenticator)
cb_conn = cluster.open_bucket(cb_bucket)
# Pushing entire data sequentially
# sequential_run(cb_conn, data)
# Pushing entire data parallely
parallel_run(cb_conn, data)