77
88import importlib
99import json
10+ import os
1011import subprocess
1112import sys
1213from typing import Any , List
@@ -20,25 +21,63 @@ class SparkService:
2021 data reading and package installation.
2122 """
2223
23- def __init__ (self ):
24- # First import necessary modules
25- from pyspark .sql import DataFrame , SparkSession
26- from pyspark .sql .functions import udf
27- from pyspark .sql .types import ArrayType , StringType
24+ def __init__ (self , master = None ):
25+ self .master = master
2826
29- # Assign fields
30- self .SparkSession = SparkSession
31- self .DataFrame = DataFrame
32- self .udf = udf
33- self .ArrayType = ArrayType
34- self .StringType = StringType
35-
36- # Now create spark session and ensure pyspark is installed
27+ # Ensure pyspark is installed first
3728 self .ensure_installed ("pyspark" )
38- self .spark = self .create_spark_session ()
29+
30+ # Now import necessary modules after ensuring pyspark is installed
31+ try :
32+ from pyspark .sql import DataFrame , SparkSession
33+ from pyspark .sql .functions import udf
34+ from pyspark .sql .types import ArrayType , StringType
35+
36+ # Assign fields
37+ self .SparkSession = SparkSession
38+ self .DataFrame = DataFrame
39+ self .udf = udf
40+ self .ArrayType = ArrayType
41+ self .StringType = StringType
42+
43+ # Create the spark session
44+ self .spark = self .create_spark_session ()
45+ except ImportError as e :
46+ raise ImportError (
47+ f"Failed to import PySpark modules: { e } . "
48+ f"Make sure PySpark is installed correctly."
49+ )
3950
4051 def create_spark_session (self ):
41- return self .SparkSession .builder .appName ("datafog" ).getOrCreate ()
52+ # Check if we're running in a test environment
53+ in_test_env = (
54+ "PYTEST_CURRENT_TEST" in os .environ or "TOX_ENV_NAME" in os .environ
55+ )
56+
57+ # Set Java system properties to handle security manager issues
58+ # This is needed for newer Java versions
59+ os .environ ["PYSPARK_SUBMIT_ARGS" ] = (
60+ "--conf spark.driver.allowMultipleContexts=true pyspark-shell"
61+ )
62+
63+ # Create a builder with the app name
64+ builder = self .SparkSession .builder .appName ("datafog" )
65+
66+ # Add configuration to work around security manager issues
67+ builder = builder .config ("spark.driver.allowMultipleContexts" , "true" )
68+ builder = builder .config (
69+ "spark.driver.extraJavaOptions" , "-Djava.security.manager=allow"
70+ )
71+
72+ # If master is specified, use it
73+ if self .master :
74+ builder = builder .master (self .master )
75+ # Otherwise, if we're in a test environment, use local mode
76+ elif in_test_env :
77+ builder = builder .master ("local[1]" )
78+
79+ # Create and return the session
80+ return builder .getOrCreate ()
4281
4382 def read_json (self , path : str ) -> List [dict ]:
4483 return self .spark .read .json (path ).collect ()
@@ -47,6 +86,15 @@ def ensure_installed(self, package_name):
4786 try :
4887 importlib .import_module (package_name )
4988 except ImportError :
50- subprocess .check_call (
51- [sys .executable , "-m" , "pip" , "install" , package_name ]
52- )
89+ print (f"Installing { package_name } ..." )
90+ try :
91+ subprocess .check_call (
92+ [sys .executable , "-m" , "pip" , "install" , package_name ]
93+ )
94+ print (f"{ package_name } installed successfully." )
95+ except subprocess .CalledProcessError as e :
96+ print (f"Failed to install { package_name } : { e } " )
97+ raise ImportError (
98+ f"Could not install { package_name } . "
99+ f"Please install it manually with 'pip install { package_name } '."
100+ )
0 commit comments