Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [Runtimes] Updated runtime images and related version references across backends.
- [K8s] Added configuration for pod and container `securityContext`.
- [Docs] Corrected MinIO/Ceph config template keys and removed obsolete Kubernetes image references.
- [GCP Functions] Updated `gcp_functions` backend to Google Cloud Run functions (Cloud Functions v2 API).

### Fixed
- [K8s] Fixed default runtime builds impacted by Debian Buster end-of-life.
Expand Down
31 changes: 28 additions & 3 deletions docs/source/compute_config/gcp_functions.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Google Cloud Functions
# Google Cloud Run functions (v2)

Lithops with *GCP Functions* as serverless compute backend.
Lithops with *GCP Functions* (Cloud Run functions v2 via the Cloud Functions v2 API) as serverless compute backend.

## Installation

Expand Down Expand Up @@ -32,6 +32,26 @@ python3 -m pip install lithops[gcp]

10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*.

11. If you use `trigger: pub/sub`, enable the **Eventarc API** as well. Cloud Run functions v2 event triggers are managed through Eventarc.

```bash
gcloud services enable eventarc.googleapis.com --project <PROJECT_ID>
```

For first-time setup, you can enable all commonly required APIs in one command:

```bash
gcloud services enable \
cloudfunctions.googleapis.com \
run.googleapis.com \
eventarc.googleapis.com \
pubsub.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com \
--project <PROJECT_ID>
```

## Configuration

1. Edit your lithops config and add the following keys:
Expand All @@ -54,7 +74,7 @@ python3 -m pip install lithops[gcp]
|gcp | region | |yes | Region name of the GCP services (e.g. `us-east1`) |
|gcp | credentials_path | |yes | **Absolute** path of your JSON key file downloaded in step 7 (e.g. `/home/myuser/lithops-invoker1234567890.json`). Alternatively you can set `GOOGLE_APPLICATION_CREDENTIALS` environment variable. If not provided it will try to load the default credentials from the environment|

### Google Cloud Functions
### Google Cloud Run functions (v2)
|Group|Key|Default|Mandatory|Additional info|
|---|---|---|---|---|
|gcp_functions | region | |no | Region name (e.g. `us-east1`). Functions and pub/sub queues will be created in the same region. Lithops will use the region set under the `gcp` section if it is not set here |
Expand All @@ -74,6 +94,11 @@ Once you have your compute and storage backends configured, you can run a hello
lithops hello -b gcp_functions -s gcp_storage
```

## References

- [Cloud Functions v2 REST API reference](https://docs.cloud.google.com/functions/docs/reference/rest)
- [Cloud Run functions best practices](https://docs.cloud.google.com/run/docs/tips/functions-best-practices)


## Viewing the execution logs

Expand Down
4 changes: 2 additions & 2 deletions docs/source/supported_clouds.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ Currently, Lithops for Google Cloud Platform supports these backends:

* - Compute
- Storage
* - `Google Cloud Functions <https://cloud.google.com/functions/docs>`_
- `Google Cloud Storage <ttps://cloud.google.com/storage/docs>`_
* - `Google Cloud Run functions (v2) <https://docs.cloud.google.com/functions/docs/>`_
- `Google Cloud Storage <https://cloud.google.com/storage/docs>`_
* - `Google Cloud Run <https://cloud.google.com/run/docs>`_
-

Expand Down
13 changes: 12 additions & 1 deletion lithops/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,18 @@ def save_data_to_clean(data):
spawn_cleaner = not (CLEANER_PROCESS and CLEANER_PROCESS.poll() is None)
if (jobs_to_clean or cs) and spawn_cleaner:
cmd = [sys.executable, '-m', 'lithops.scripts.cleaner']
CLEANER_PROCESS = sp.Popen(cmd, start_new_session=True)
env = os.environ.copy()
# Cleaner is forked while gRPC clients may already be active in the parent process.
# Reduce noisy gRPC fork logs in the detached cleaner subprocess.
env.setdefault('GRPC_ENABLE_FORK_SUPPORT', '0')
env.setdefault('GRPC_VERBOSITY', 'ERROR')
CLEANER_PROCESS = sp.Popen(
cmd,
start_new_session=True,
env=env,
stdout=sp.DEVNULL,
stderr=sp.DEVNULL
)

def job_summary(self, cloud_objects_n: Optional[int] = 0):
"""
Expand Down
7 changes: 3 additions & 4 deletions lithops/serverless/backends/gcp_functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
FH_ZIP_LOCATION = os.path.join(TEMP_DIR, 'lithops_gcp_functions/{}.zip')
SCOPES = ('https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/pubsub')
FUNCTIONS_API_VERSION = 'v1'
PUBSUB_API_VERSION = 'v1'
FUNCTIONS_API_VERSION = 'v2'
AUDIENCE = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"

RUNTIME_MEMORY_MAX = 8192 # 8GB
RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192}
RUNTIME_MEMORY_MAX = 32768 # 32GB
RUNTIME_MEMORY_OPTIONS = {128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768}

AVAILABLE_PY_RUNTIMES = {
'3.8': 'python38',
Expand Down
6 changes: 3 additions & 3 deletions lithops/serverless/backends/gcp_functions/entry_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def main(data, context=None):

activation_id = uuid.uuid4().hex
os.environ['__LITHOPS_ACTIVATION_ID'] = activation_id
os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Functions'
os.environ['__LITHOPS_BACKEND'] = 'Google Cloud Run functions (v2)'

if 'get_metadata' in args:
runtime_meta = get_runtime_metadata()
Expand All @@ -58,10 +58,10 @@ def main(data, context=None):
else:
return runtime_meta
elif 'remote_invoker' in args:
logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions invoker")
logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) invoker")
function_invoker(args)
else:
logger.info(f"Lithops v{__version__} - Starting Google Cloud Functions execution")
logger.info(f"Lithops v{__version__} - Starting Google Cloud Run functions (v2) execution")
function_handler(args)

return {"activationId": activation_id}
133 changes: 88 additions & 45 deletions lithops/serverless/backends/gcp_functions/gcp_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import httplib2
import zipfile
import time
import urllib
import requests
import google.auth
import google.oauth2.id_token
import google.auth.transport.requests
from threading import Lock
from google.cloud import pubsub_v1
from google.oauth2 import service_account
Expand Down Expand Up @@ -58,13 +59,13 @@ def __init__(self, gcf_config, internal_storage):
self.internal_storage = internal_storage

self._build_api_resource()

self._api_endpoint = f'https://{self.region}-{self.project_name}.cloudfunctions.net/'
self._function_url = None
self._function_name = None
self._api_token = None

logger.debug(f'Invocation trigger set to: {self.trigger}')

msg = COMPUTE_CLI_MSG.format('Google Cloud Functions')
msg = COMPUTE_CLI_MSG.format('Google Cloud Run functions (v2)')
logger.info(f"{msg} - Region: {self.region} - Project: {self.project_name}")

def _build_api_resource(self):
Expand Down Expand Up @@ -128,6 +129,21 @@ def _get_runtime_bin_location(self, runtime_name):
function_name = self._format_function_name(runtime_name)
return config.USER_RUNTIMES_PREFIX + '/' + function_name + '_bin.zip'

def _memory_to_gcfv2(self, memory_mb):
return f'{memory_mb}Mi'

def _memory_from_gcfv2(self, memory_value):
if isinstance(memory_value, int):
return memory_value
if isinstance(memory_value, str):
if memory_value.endswith('Mi'):
return int(memory_value[:-2])
if memory_value.endswith('Gi'):
return int(memory_value[:-2]) * 1024
if memory_value.endswith('M'):
return int(memory_value[:-1])
raise ValueError(f'Unable to parse memory value: {memory_value}')

def _encode_payload(self, payload):
return base64.b64encode(bytes(json.dumps(payload), 'utf-8')).decode('utf-8')

Expand All @@ -137,9 +153,14 @@ def _get_token(self, function_name):
"""
invoke_mutex.acquire()

if not self._api_token or function_name not in self._function_url:
if not self._api_token or self._function_name != function_name:
logger.debug('Getting authentication token')
self._function_url = self._api_endpoint + function_name
function_location = self._get_function_location(function_name)
response = self._api_resource.projects().locations().functions().get(
name=function_location
).execute(num_retries=self.num_retries)
self._function_url = response['serviceConfig']['uri']
self._function_name = function_name
if self.credentials_path and os.path.isfile(self.credentials_path):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.credentials_path
auth_req = google.auth.transport.requests.Request()
Expand Down Expand Up @@ -168,14 +189,10 @@ def _wait_function_deleted(self, function_location):
# Wait until function is completely deleted
while True:
try:
response = self._api_resource.projects().locations().functions().get(
self._api_resource.projects().locations().functions().get(
name=function_location
).execute(num_retries=self.num_retries)
logger.debug(f'Function status is {response["status"]}')
if response['status'] == 'DELETE_IN_PROGRESS':
time.sleep(self.retry_sleep)
else:
raise Exception(f'Unknown status: {response["status"]}')
time.sleep(self.retry_sleep)
except Exception:
logger.debug('Function status is DELETED')
break
Expand Down Expand Up @@ -218,13 +235,25 @@ def _create_function(self, runtime_name, memory, timeout=60):
cloud_function = {
'name': function_location,
'description': 'Lithops Worker for Lithops v' + __version__,
'entryPoint': 'main',
'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION],
'timeout': str(timeout) + 's',
'availableMemoryMb': memory,
'serviceAccountEmail': self.service_account,
'maxInstances': 0,
'sourceArchiveUrl': f'gs://{self.internal_storage.bucket}/{bin_location}',
'buildConfig': {
'runtime': config.AVAILABLE_PY_RUNTIMES[utils.CURRENT_PY_VERSION],
'entryPoint': 'main',
'source': {
'storageSource': {
'bucket': self.internal_storage.bucket,
'object': bin_location
}
}
},
'serviceConfig': {
'timeoutSeconds': timeout,
'availableMemory': self._memory_to_gcfv2(memory),
'serviceAccountEmail': self.service_account,
'maxInstanceCount': self.gcf_config['max_workers'],
'minInstanceCount': 0,
'maxInstanceRequestConcurrency': 1,
'allTrafficOnLatestRevision': True
},
'labels': {
'type': 'lithops-runtime',
'lithops_version': __version__.replace('.', '-'),
Expand All @@ -233,41 +262,43 @@ def _create_function(self, runtime_name, memory, timeout=60):
}

if self.trigger == 'https':
cloud_function['httpsTrigger'] = {}
pass

elif self.trigger == 'pub/sub':
topic_name = self._format_topic_name(function_name)
topic_location = self._get_topic_location(topic_name)
cloud_function['eventTrigger'] = {
'eventType': 'providers/cloud.pubsub/eventTypes/topic.publish',
'resource': topic_location,
'failurePolicy': {}
'triggerRegion': self.region,
'eventType': 'google.cloud.pubsub.topic.v1.messagePublished',
'pubsubTopic': topic_location,
'retryPolicy': 'RETRY_POLICY_RETRY'
}

logger.info(f'Deploying function {function_location}')
for attempt in range(self.num_retries):
try:
operation = self._api_resource.projects().locations().functions().create(
location=self._default_location,
parent=self._default_location,
functionId=function_name,
body=cloud_function
).execute()
break
except Exception as e:
if attempt < self.num_retries - 1:
time.sleep(self.retry_sleep)
else:
raise Exception(f"Failed to create Cloud Function after {self.num_retries} attempts.") from e
raise Exception(f"Failed to create Cloud Run function (v2) after {self.num_retries} attempts.") from e

# Wait until the function is completely deployed
logger.info('Waiting for the function to be deployed')
operation_name = operation['name']
while True:
op_status = self._api_resource.operations().get(
op_status = self._api_resource.projects().locations().operations().get(
name=operation_name
).execute(num_retries=self.num_retries)
if op_status.get('done'):
if 'error' in op_status:
raise Exception(f'Error while deploying Cloud Function: {op_status["error"]}')
raise Exception(f'Error while deploying Cloud Run function (v2): {op_status["error"]}')
logger.info("Deployment completed successfully.")
break
else:
Expand Down Expand Up @@ -376,7 +407,7 @@ def list_runtimes(self, runtime_name='all'):
fn_name = func['name'].rsplit('/', 1)[-1]
version = func['labels']['lithops_version'].replace('-', '.')
rt_name = func['labels']['runtime_name']
memory = func['availableMemoryMb']
memory = self._memory_from_gcfv2(func['serviceConfig']['availableMemory'])
if runtime_name == rt_name or runtime_name == 'all':
runtimes.append((rt_name, memory, version, fn_name))

Expand All @@ -401,18 +432,23 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False):

if self.trigger == 'pub/sub':
if return_result:
function_location = self._get_function_location(function_name)
response = self._api_resource.projects().locations().functions().call(
name=function_location,
body={'data': json.dumps({'data': self._encode_payload(payload)})}
).execute(num_retries=self.num_retries)
if 'result' in response and response['result'] == 'OK':
object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta'])
runtime_meta = json.loads(self.internal_storage.get_data(object_key))
self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key)
return runtime_meta
else:
raise Exception(f'Error at retrieving runtime metadata: {response}')
topic_location = self._get_topic_location(self._format_topic_name(function_name))
fut = self._pub_client.publish(
topic_location,
bytes(json.dumps(payload, default=str).encode('utf-8'))
)
invocation_id = fut.result()
object_key = '/'.join([JOBS_PREFIX, runtime_name + '.meta'])
for _ in range(max(1, self.num_retries * 4)):
try:
runtime_meta = json.loads(self.internal_storage.get_data(object_key))
self.internal_storage.storage.delete_object(self.internal_storage.bucket, object_key)
return runtime_meta
except Exception:
time.sleep(self.retry_sleep)
raise Exception(
f'Timed out waiting for runtime metadata for invocation {invocation_id}'
)
else:
topic_location = self._get_topic_location(self._format_topic_name(function_name))
fut = self._pub_client.publish(
Expand All @@ -423,12 +459,19 @@ def invoke(self, runtime_name, runtime_memory, payload={}, return_result=False):

elif self.trigger == 'https':
function_url, api_token = self._get_token(function_name)
req = urllib.request.Request(function_url, data=json.dumps(payload, default=str).encode('utf-8'))
req.add_header("Authorization", f"Bearer {api_token}")
res = urllib.request.urlopen(req)
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
res = requests.post(
function_url,
data=json.dumps(payload, default=str),
headers=headers,
timeout=120
)

if res.getcode() in (200, 202):
data = json.loads(res.read())
if res.status_code in (200, 202):
data = res.json()
if return_result:
return data
return data["activationId"]
Expand Down
Loading
Loading