@@ -23,51 +23,59 @@ class SparkService:
2323
2424 def __init__ (self , master = None ):
2525 self .master = master
26-
26+
2727 # Ensure pyspark is installed first
2828 self .ensure_installed ("pyspark" )
29-
29+
3030 # Now import necessary modules after ensuring pyspark is installed
3131 try :
3232 from pyspark .sql import DataFrame , SparkSession
3333 from pyspark .sql .functions import udf
3434 from pyspark .sql .types import ArrayType , StringType
35-
35+
3636 # Assign fields
3737 self .SparkSession = SparkSession
3838 self .DataFrame = DataFrame
3939 self .udf = udf
4040 self .ArrayType = ArrayType
4141 self .StringType = StringType
42-
42+
4343 # Create the spark session
4444 self .spark = self .create_spark_session ()
4545 except ImportError as e :
46- raise ImportError (f"Failed to import PySpark modules: { e } . "
47- f"Make sure PySpark is installed correctly." )
46+ raise ImportError (
47+ f"Failed to import PySpark modules: { e } . "
48+ f"Make sure PySpark is installed correctly."
49+ )
4850
4951 def create_spark_session (self ):
5052 # Check if we're running in a test environment
51- in_test_env = 'PYTEST_CURRENT_TEST' in os .environ or 'TOX_ENV_NAME' in os .environ
52-
53+ in_test_env = (
54+ "PYTEST_CURRENT_TEST" in os .environ or "TOX_ENV_NAME" in os .environ
55+ )
56+
5357 # Set Java system properties to handle security manager issues
5458 # This is needed for newer Java versions
55- os .environ ['PYSPARK_SUBMIT_ARGS' ] = '--conf spark.driver.allowMultipleContexts=true pyspark-shell'
56-
59+ os .environ ["PYSPARK_SUBMIT_ARGS" ] = (
60+ "--conf spark.driver.allowMultipleContexts=true pyspark-shell"
61+ )
62+
5763 # Create a builder with the app name
5864 builder = self .SparkSession .builder .appName ("datafog" )
59-
65+
6066 # Add configuration to work around security manager issues
6167 builder = builder .config ("spark.driver.allowMultipleContexts" , "true" )
62- builder = builder .config ("spark.driver.extraJavaOptions" , "-Djava.security.manager=allow" )
63-
68+ builder = builder .config (
69+ "spark.driver.extraJavaOptions" , "-Djava.security.manager=allow"
70+ )
71+
6472 # If master is specified, use it
6573 if self .master :
6674 builder = builder .master (self .master )
6775 # Otherwise, if we're in a test environment, use local mode
6876 elif in_test_env :
6977 builder = builder .master ("local[1]" )
70-
78+
7179 # Create and return the session
7280 return builder .getOrCreate ()
7381
@@ -86,6 +94,7 @@ def ensure_installed(self, package_name):
8694 print (f"{ package_name } installed successfully." )
8795 except subprocess .CalledProcessError as e :
8896 print (f"Failed to install { package_name } : { e } " )
89- raise ImportError (f"Could not install { package_name } . "
90- f"Please install it manually with 'pip install { package_name } '." )
91-
97+ raise ImportError (
98+ f"Could not install { package_name } . "
99+ f"Please install it manually with 'pip install { package_name } '."
100+ )
0 commit comments