-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
42 lines (33 loc) · 1.44 KB
/
main.py
File metadata and controls
42 lines (33 loc) · 1.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
from utils.shared_utils import spark_session, read_data_csv, save_df_as_csv
from preprocessing.generate_city_offsets import generate_city_offsets
from tasks import task1, task2, task3
def main():
# Instantiate spark session
spark = spark_session()
# Read data
cities = read_data_csv(spark, "city_attributes.csv")
humidity = read_data_csv(spark, "humidity.csv")
pressure = read_data_csv(spark, "pressure.csv")
temperature = read_data_csv(spark, "temperature.csv")
weather = read_data_csv(spark, "weather_description.csv")
# Preprocessing step: generate offset mapping for task 3
if not os.path.exists(os.path.join(os.getcwd(), "preprocessing", "city_offsets.json")):
generate_city_offsets()
# Run tasks
res1 = task1.run(weather_df=weather)
res2 = task2.run(temperature_df=temperature, pressure_df=pressure, humidity_df=humidity, city_attr_df=cities)
res3 = task3.run(temperature_df=temperature, city_attr_df=cities)
# Output
print("<===== RESULTS TASK 1 =====>")
res1.show()
save_df_as_csv(res1, os.path.join(os.getcwd(), "results", "task1.csv"))
print("<===== RESULTS TASK 2 =====>")
res2.show()
save_df_as_csv(res2, os.path.join(os.getcwd(), "results", "task2.csv"))
print("<===== RESULTS TASK 3 =====>")
res3.show()
save_df_as_csv(res3, os.path.join(os.getcwd(), "results", "task3.csv"))
spark.stop()
if __name__ == '__main__':
main()