diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f67c851..ad8173b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [Runtimes] Updated runtime images and related version references across backends. - [K8s] Added configuration for pod and container `securityContext`. - [Docs] Corrected MinIO/Ceph config template keys and removed obsolete Kubernetes image references. +- [GCP Functions] Updated `gcp_functions` backend to Google Cloud Run functions (Cloud Functions v2 API). ### Fixed - [K8s] Fixed default runtime builds impacted by Debian Buster end-of-life. diff --git a/docs/source/compute_config/gcp_functions.md b/docs/source/compute_config/gcp_functions.md index e1cc082e..c5749087 100644 --- a/docs/source/compute_config/gcp_functions.md +++ b/docs/source/compute_config/gcp_functions.md @@ -1,6 +1,6 @@ -# Google Cloud Functions +# Google Cloud Run functions (v2) -Lithops with *GCP Functions* as serverless compute backend. +Lithops with *GCP Functions* (Cloud Run functions v2 via the Cloud Functions v2 API) as serverless compute backend. ## Installation @@ -32,6 +32,26 @@ python3 -m pip install lithops[gcp] 10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*. +11. If you use `trigger: pub/sub`, enable the **Eventarc API** as well. Cloud Run functions v2 event triggers are managed through Eventarc. + +```bash +gcloud services enable eventarc.googleapis.com --project +``` + +For first-time setup, you can enable all commonly required APIs in one command: + +```bash +gcloud services enable \ + cloudfunctions.googleapis.com \ + run.googleapis.com \ + eventarc.googleapis.com \ + pubsub.googleapis.com \ + cloudbuild.googleapis.com \ + artifactregistry.googleapis.com \ + storage.googleapis.com \ + --project +``` + ## Configuration 1. Edit your lithops config and add the following keys: @@ -54,7 +74,7 @@ python3 -m pip install lithops[gcp] |gcp | region | |yes | Region name of the GCP services (e.g. `us-east1`) | |gcp | credentials_path | |yes | **Absolute** path of your JSON key file downloaded in step 7 (e.g. `/home/myuser/lithops-invoker1234567890.json`). Alternatively you can set `GOOGLE_APPLICATION_CREDENTIALS` environment variable. If not provided it will try to load the default credentials from the environment| -### Google Cloud Functions +### Google Cloud Run functions (v2) |Group|Key|Default|Mandatory|Additional info| |---|---|---|---|---| |gcp_functions | region | |no | Region name (e.g. `us-east1`). Functions and pub/sub queues will be created in the same region. Lithops will use the region set under the `gcp` section if it is not set here | @@ -74,6 +94,11 @@ Once you have your compute and storage backends configured, you can run a hello lithops hello -b gcp_functions -s gcp_storage ``` +## References + +- [Cloud Functions v2 REST API reference](https://docs.cloud.google.com/functions/docs/reference/rest) +- [Cloud Run functions best practices](https://docs.cloud.google.com/run/docs/tips/functions-best-practices) + ## Viewing the execution logs diff --git a/docs/source/supported_clouds.rst b/docs/source/supported_clouds.rst index fd30fb62..b9c85d0a 100644 --- a/docs/source/supported_clouds.rst +++ b/docs/source/supported_clouds.rst @@ -48,8 +48,8 @@ Currently, Lithops for Google Cloud Platform supports these backends: * - Compute - Storage - * - `Google Cloud Functions `_ - - `Google Cloud Storage `_ + * - `Google Cloud Run functions (v2) `_ + - `Google Cloud Storage `_ * - `Google Cloud Run `_ - diff --git a/lithops/executors.py b/lithops/executors.py index 2cf28f9c..e8343b24 100644 --- a/lithops/executors.py +++ b/lithops/executors.py @@ -627,7 +627,18 @@ def save_data_to_clean(data): spawn_cleaner = not (CLEANER_PROCESS and CLEANER_PROCESS.poll() is None) if (jobs_to_clean or cs) and spawn_cleaner: cmd = [sys.executable, '-m', 'lithops.scripts.cleaner'] - CLEANER_PROCESS = sp.Popen(cmd, start_new_session=True) + env = os.environ.copy() + # Cleaner is forked while gRPC clients may already be active in the parent process. + # Reduce noisy gRPC fork logs in the detached cleaner subprocess. + env.setdefault('GRPC_ENABLE_FORK_SUPPORT', '0') + env.setdefault('GRPC_VERBOSITY', 'ERROR') + CLEANER_PROCESS = sp.Popen( + cmd, + start_new_session=True, + env=env, + stdout=sp.DEVNULL, + stderr=sp.DEVNULL + ) def job_summary(self, cloud_objects_n: Optional[int] = 0): """ diff --git a/lithops/serverless/backends/gcp_functions/config.py b/lithops/serverless/backends/gcp_functions/config.py index 4d774b1d..fdb0a04f 100644 --- a/lithops/serverless/backends/gcp_functions/config.py +++ b/lithops/serverless/backends/gcp_functions/config.py @@ -22,12 +22,11 @@ FH_ZIP_LOCATION = os.path.join(TEMP_DIR, 'lithops_gcp_functions/{}.zip') SCOPES = ('https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/pubsub') -FUNCTIONS_API_VERSION = 'v1' -PUBSUB_API_VERSION = 'v1' +FUNCTIONS_API_VERSION = 'v2' AUDIENCE = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher" -RUNTIME_MEMORY_MAX = 8192 # 8GB -RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192} +RUNTIME_MEMORY_MAX = 32768 # 32GB +RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768} AVAILABLE_PY_RUNTIMES = { '3.8': 'python38', diff --git a/lithops/serverless/backends/gcp_functions/entry_point.py b/lithops/serverless/backends/gcp_functions/entry_point.py index 0419f42b..660790b5 100644 --- a/lithops/serverless/backends/gcp_functions/entry_point.py +++ b/lithops/serverless/backends/gcp_functions/entry_point.py @@ -44,7 +44,7 @@ def main(data, context=None): activation_id = uuid.uuid4().hex os.environ['__LITHOPS_ACTIVATION_ID'] = activation_id - os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Functions' + os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Run functions (v2)' if 'get_metadata' in args: runtime_meta = get_runtime_metadata() @@ -58,10 +58,10 @@ def main(data, context=None): else: return runtime_meta elif 'remote_invoker' in args: - logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions invoker") + logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) invoker") function_invoker(args) else: - logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions execution") + logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) execution") function_handler(args) return {"activationId": activation_id} diff --git a/lithops/serverless/backends/gcp_functions/gcp_functions.py b/lithops/serverless/backends/gcp_functions/gcp_functions.py index c0f64772..382f5460 100644 --- a/lithops/serverless/backends/gcp_functions/gcp_functions.py +++ b/lithops/serverless/backends/gcp_functions/gcp_functions.py @@ -23,9 +23,10 @@ import httplib2 import zipfile import time -import urllib +import requests import google.auth import google.oauth2.id_token +import google.auth.transport.requests from threading import Lock from google.cloud import pubsub_v1 from google.oauth2 import service_account @@ -58,13 +59,13 @@ def __init__(self, gcf_config, internal_storage): self.internal_storage = internal_storage self._build_api_resource() - - self._api_endpoint = f'https://{self.region}-{self.project_name}.cloudfunctions.net/' + self._function_url = None + self._function_name = None self._api_token = None logger.debug(f'Invocation trigger set to: {self.trigger}') - msg = COMPUTE_CLI_MSG.format('Google Cloud Functions') + msg = COMPUTE_CLI_MSG.format('Google Cloud Run functions (v2)') logger.info(f"{msg} - Region: {self.region} - Project: {self.project_name}") def _build_api_resource(self): @@ -128,6 +129,21 @@ def _get_runtime_bin_location(self, runtime_name): function_name = self._format_function_name(runtime_name) return config.USER_RUNTIMES_PREFIX + '/' + function_name + '_bin.zip' + def _memory_to_gcfv2(self, memory_mb): + return f'{memory_mb}Mi' + + def _memory_from_gcfv2(self, memory_value): + if isinstance(memory_value, int): + return memory_value + if isinstance(memory_value, str): + if memory_value.endswith('Mi'): + return int(memory_value[:-2]) + if memory_value.endswith('Gi'): + return int(memory_value[:-2]) * 1024 + if memory_value.endswith('M'): + return int(memory_value[:-1]) + raise ValueError(f'Unable to parse memory value: {memory_value}') + def _encode_payload(self, payload): return base64.b64encode(bytes(json.dumps(payload), 'utf-8')).decode('utf-8') @@ -137,9 +153,14 @@ def _get_token(self, function_name): """ invoke_mutex.acquire() - if not self._api_token or function_name not in self._function_url: + if not self._api_token or self._function_name != function_name: logger.debug('Getting authentication token') - self._function_url = self._api_endpoint + function_name + function_location = self._get_function_location(function_name) + response = self._api_resource.projects().locations().functions().get( + name=function_location + ).execute(num_retries=self.num_retries) + self._function_url = response['serviceConfig']['uri'] + self._function_name = function_name if self.credentials_path and os.path.isfile(self.credentials_path): os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credentials_path auth_req = google.auth.transport.requests.Request() @@ -168,14 +189,10 @@ def _wait_function_deleted(self, function_location): # Wait until function is completely deleted while True: try: - response = self._api_resource.projects().locations().functions().get( + self._api_resource.projects().locations().functions().get( name=function_location ).execute(num_retries=self.num_retries) - logger.debug(f'Function status is {response["status"]}') - if response['status'] == 'DELETE_IN_PROGRESS': - time.sleep(self.retry_sleep) - else: - raise Exception(f'Unknown status: {response["status"]}') + time.sleep(self.retry_sleep) except Exception: logger.debug('Function status is DELETED') break @@ -218,13 +235,25 @@ def _create_function(self, runtime_name, memory, timeout=60): cloud_function = { 'name': function_location, 'description': 'Lithops Worker for Lithops v' + __version__, - 'entryPoint': 'main', - 'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION], - 'timeout': str(timeout) + 's', - 'availableMemoryMb': memory, - 'serviceAccountEmail': self.service_account, - 'maxInstances': 0, - 'sourceArchiveUrl': f'gs://{self.internal_storage.bucket}/{bin_location}', + 'buildConfig': { + 'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION], + 'entryPoint': 'main', + 'source': { + 'storageSource': { + 'bucket': self.internal_storage.bucket, + 'object': bin_location + } + } + }, + 'serviceConfig': { + 'timeoutSeconds': timeout, + 'availableMemory': self._memory_to_gcfv2(memory), + 'serviceAccountEmail': self.service_account, + 'maxInstanceCount': self.gcf_config['max_workers'], + 'minInstanceCount': 0, + 'maxInstanceRequestConcurrency': 1, + 'allTrafficOnLatestRevision': True + }, 'labels': { 'type': 'lithops-runtime', 'lithops_version': __version__.replace('.', '-'), @@ -233,22 +262,24 @@ def _create_function(self, runtime_name, memory, timeout=60): } if self.trigger == 'https': - cloud_function['httpsTrigger'] = {} + pass elif self.trigger == 'pub/sub': topic_name = self._format_topic_name(function_name) topic_location = self._get_topic_location(topic_name) cloud_function['eventTrigger'] = { - 'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish', - 'resource': topic_location, - 'failurePolicy': {} + 'triggerRegion': self.region, + 'eventType': 'google.cloud.pubsub.topic.v1.messagePublished', + 'pubsubTopic': topic_location, + 'retryPolicy': 'RETRY_POLICY_RETRY' } logger.info(f'Deploying function {function_location}') for attempt in range(self.num_retries): try: operation = self._api_resource.projects().locations().functions().create( - location=self._default_location, + parent=self._default_location, + functionId=function_name, body=cloud_function ).execute() break @@ -256,18 +287,18 @@ def _create_function(self, runtime_name, memory, timeout=60): if attempt < self.num_retries - 1: time.sleep(self.retry_sleep) else: - raise Exception(f"Failed to create Cloud Function after {self.num_retries} attempts.") from e + raise Exception(f"Failed to create Cloud Run function (v2) after {self.num_retries} attempts.") from e # Wait until the function is completely deployed logger.info('Waiting for the function to be deployed') operation_name = operation['name'] while True: - op_status = self._api_resource.operations().get( + op_status = self._api_resource.projects().locations().operations().get( name=operation_name ).execute(num_retries=self.num_retries) if op_status.get('done'): if 'error' in op_status: - raise Exception(f'Error while deploying Cloud Function: {op_status["error"]}') + raise Exception(f'Error while deploying Cloud Run function (v2): {op_status["error"]}') logger.info("Deployment completed successfully.") break else: @@ -376,7 +407,7 @@ def list_runtimes(self, runtime_name='all'): fn_name = func['name'].rsplit('/', 1)[-1] version = func['labels']['lithops_version'].replace('-', '.') rt_name = func['labels']['runtime_name'] - memory = func['availableMemoryMb'] + memory = self._memory_from_gcfv2(func['serviceConfig']['availableMemory']) if runtime_name == rt_name or runtime_name == 'all': runtimes.append((rt_name, memory, version, fn_name)) @@ -401,18 +432,23 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False): if self.trigger == 'pub/sub': if return_result: - function_location = self._get_function_location(function_name) - response = self._api_resource.projects().locations().functions().call( - name=function_location, - body={'data': json.dumps({'data': self._encode_payload(payload)})} - ).execute(num_retries=self.num_retries) - if 'result' in response and response['result'] == 'OK': - object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta']) - runtime_meta = json.loads(self.internal_storage.get_data(object_key)) - self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key) - return runtime_meta - else: - raise Exception(f'Error at retrieving runtime metadata: {response}') + topic_location = self._get_topic_location(self._format_topic_name(function_name)) + fut = self._pub_client.publish( + topic_location, + bytes(json.dumps(payload, default=str).encode('utf-8')) + ) + invocation_id = fut.result() + object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta']) + for _ in range(max(1, self.num_retries * 4)): + try: + runtime_meta = json.loads(self.internal_storage.get_data(object_key)) + self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key) + return runtime_meta + except Exception: + time.sleep(self.retry_sleep) + raise Exception( + f'Timed out waiting for runtime metadata for invocation {invocation_id}' + ) else: topic_location = self._get_topic_location(self._format_topic_name(function_name)) fut = self._pub_client.publish( @@ -423,12 +459,19 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False): elif self.trigger == 'https': function_url, api_token = self._get_token(function_name) - req = urllib.request.Request(function_url, data=json.dumps(payload, default=str).encode('utf-8')) - req.add_header("Authorization", f"Bearer {api_token}") - res = urllib.request.urlopen(req) + headers = { + "Authorization": f"Bearer {api_token}", + "Content-Type": "application/json" + } + res = requests.post( + function_url, + data=json.dumps(payload, default=str), + headers=headers, + timeout=120 + ) - if res.getcode() in (200, 202): - data = json.loads(res.read()) + if res.status_code in (200, 202): + data = res.json() if return_result: return data return data["activationId"] diff --git a/runtime/gcp_functions/README.md b/runtime/gcp_functions/README.md index 8d0525dd..d63cbb64 100644 --- a/runtime/gcp_functions/README.md +++ b/runtime/gcp_functions/README.md @@ -1,8 +1,12 @@ -# Lithops runtime for Google Cloud Functions +# Lithops runtime for Google Cloud Run functions (v2) -Google Cloud Functions operate within a runtime environment distinct from other serverless platforms like Google Cloud Run, as they do not rely on containers from the user perspective. Consequently, specifying a container image as the function's runtime isn't feasible. However, you can enhance the default package set by providing a custom `requirements.txt` file, allowing for the inclusion of additional Python modules automatically installable via `pip`. +This backend targets Google Cloud Run functions (formerly Cloud Functions 2nd gen) through the Cloud Functions v2 API. Runtimes are deployed from source and built by Google-managed buildpacks, so you provide Python dependencies in `requirements.txt` instead of a custom container image. -Currently, Google Cloud Functions supports Python >= 3.8. You can find the list of pre-installed modules [here](https://cloud.google.com/functions/docs/writing/specifying-dependencies-python#pre-installed_packages). In addition, the Lithops default runtimes are built with the packages included in this [requirements.txt](requirements.txt) file: +You can check supported runtimes and language details in the Cloud Run functions docs: +- [Runtime support](https://docs.cloud.google.com/functions/docs/runtime-support) +- [Python dependencies](https://docs.cloud.google.com/run/docs/runtimes/python-dependencies) + +In addition, the Lithops default runtimes are built with the packages included in this [requirements.txt](requirements.txt) file: The default runtime is created automatically the first time you execute a function. Lithops automatically detects the Python version of your environment and deploys the default runtime based on it. In this sense, to run a function with the default runtime you don't need to specify anything in the code, since everything is managed internally by Lithops: @@ -26,7 +30,7 @@ pw = lithops.FunctionExecutor(runtime_memory=512) ## Custom runtime -**Build your own Lithops runtime for Google Cloud Functions** +**Build your own Lithops runtime for Google Cloud Run functions (v2)** If you require additional Python modules not included in the default runtime, you can create your own custom Lithops runtime incorporating them. To create a custom runtime, compile all the necessary modules into a `requirements.txt` file. @@ -38,7 +42,7 @@ After updating the file accordingly, you can proceed to build the custom runtime $ lithops runtime build -b gcp_functions -f requirements.txt my_matplotlib_runtime ``` -This command will add an extra runtime called `my_matplotlib_runtime` to the available Google Cloud Function runtimes. +This command will add an extra runtime called `my_matplotlib_runtime` to the available Google Cloud Run functions (v2) runtimes. Finally, you can specify this new runtime when creating a Lithops Function Executor: