-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransform
More file actions
executable file
·205 lines (168 loc) · 6.72 KB
/
transform
File metadata and controls
executable file
·205 lines (168 loc) · 6.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python
#
# Usage:
# ./transform --transformspec /path/to/transform-spec.json --dataset /path/to/dataset.csv
import argparse
import csv
import json
from datetime import datetime
from time import mktime
from pytz import timezone
from slugify import slugify
class TransformationException(Exception):
pass
class OperationNotSupportedException(Exception):
pass
class ApplyTransformations(object):
def __init__(self, json_transform_spec, dataset):
self.json_transform_spec = json_transform_spec
self.dataset = dataset
self.column_operations = {}
self.operation_transformation_functions = {
'slugify': self.slugify_string,
'f-to-c': self.fahrenheit_to_celsius,
'hst-to-unix': self.hst_to_unix
}
def slugify_string(self, value):
"""
`value` must be a string. Removes all punctuation,
converts all whitespace into hyphens, and lowercases all letters.
Raises an exception if `value` cannot be interpolated as a float.
"""
if not isinstance(value, basestring):
raise TransformationException(
('Cannot sluggify {0} with type {1}. '
'Only strings may be converted.').format(
value, type(value)
)
)
return slugify(value, to_lower=True)
def fahrenheit_to_celsius(self, value):
"""
Assumes `value` is in Fahrenheit.
Returns the temperature converted to Celsius,
rounded to 1 decimal place.
Raises an exception if `value` cannot be interpolated as a float.
"""
try:
v = float(value)
except ValueError:
raise TransformationException(
('Cannot convert "{0}" to celsius. Unable to interpolate '
'value as a float.').format(
value
)
)
return '{:.1f}'.format(((v - 32)/1.8))
def hst_to_unix(self, date, time):
"""
Assumes `date` and `time` are in Hawaii Standard Time (UTC-10).
Returns a UNIX timestamp in the UTC time zone.
Raises an exception if `date` and `time` cannot be interpolated
as a datetime type.
"""
# combine date and time to HST
date_with_time = '{0} {1}'.format(date, time)
try:
datetime_obj_naive = datetime.strptime(
date_with_time, '%m/%d/%y %H:%M:%S'
)
except ValueError:
raise TransformationException(
('Cannot convert RecordedDate {0} and RecordedTime {1} to '
'datetime object.').format(date, time)
)
datetime_obj_hawaii = timezone('US/Hawaii').localize(
datetime_obj_naive
)
datetime_obj_utc = datetime_obj_hawaii.astimezone(timezone('UTC'))
return int(mktime(datetime_obj_utc.timetuple()))
def transform_data_set(self):
"""
Calls all necessary functions to transform `dataset` values
according to the `json_transform_spec`.
"""
self.build_column_operations(self.json_transform_spec)
return self.transform_data_and_create_new_csv(self.dataset)
def get_column_operation_func(self, column_name):
"""
Returns the operation function for `column_name` if it exists.
If no operation was specified in the JSON transform spec,
returns `None`.
"""
return self.column_operations.get(column_name)
def extract_json(self, json_file):
""" Returns the contects of `json_file` as a python dict. """
f = open(json_file)
return json.load(f)
def build_column_operations(self, json_spec):
"""
Updates self.column_operations by building a python dict mapping
column name to desired operation's function name.
If multiple operations are specified for the same column,
only the latest operation will be saved. No return value.
"""
operation_methods_not_supported = []
transforms_array = self.extract_json(json_spec).get('transforms')
if not transforms_array:
raise Exception(
'Invalid json transformation spec. Missing "transforms" array.'
)
for transform_spec in transforms_array:
column_name = transform_spec.get('column')
operation = transform_spec.get('operation')
if not column_name or not operation:
raise ValueError(
('"Column" and "Operation" are required for each '
'transform specification. Spec with issue: {0}').format(
json.dumps(transform_spec)
)
)
# find the associated function for this operation
operation_func = self.operation_transformation_functions.get(
operation
)
if operation_func:
self.column_operations[column_name] = operation_func
else:
operation_methods_not_supported.append(operation)
if operation_methods_not_supported:
raise OperationNotSupportedException(
'Transformation operations not available for: {0}'.format(
','.join(operation_methods_not_supported)
)
)
def transform_data_and_create_new_csv(self, dataset):
""" Returns a csv file with applied transformations. """
with open(dataset, 'rb') as csv_file, \
open('transformed_dataset.csv', 'wb') as modified_file:
reader = csv.DictReader(csv_file)
headers = ['RecordedUTCTimestamp'] + reader.fieldnames
writer = csv.DictWriter(modified_file, headers)
writer.writeheader() # write headers to the new file
for row in reader:
output_row = row.copy()
for column_name, value in row.iteritems():
column_operation_func = self.get_column_operation_func(
column_name
)
if column_operation_func == self.hst_to_unix:
output_row['RecordedUTCTimestamp'] = self.hst_to_unix(
row['RecordedDate'], row['RecordedTime']
)
elif column_operation_func:
output_row[column_name] = column_operation_func(value)
writer.writerow(output_row)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--transformspec', default=None,
required=True, help='json transform spec file'
)
parser.add_argument(
'--dataset', default=None,
required=True, help='csv dataset file'
)
args = parser.parse_args()
transformations = ApplyTransformations(args.transformspec, args.dataset)
transformations.transform_data_set()