Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
# Changelog

## [v3.6.5.dev0]

### Added
- [Core] Added support for variable-length parameters in functions passed to the executor.

### Changed
- [K8s] Auto-detect cluster architecture when building runtimes.
- [Runtimes] Updated runtime images and related version references across backends.
- [K8s] Added configuration for pod and container `securityContext`.
- [Docs] Corrected MinIO/Ceph config template keys and removed obsolete Kubernetes image references.

### Fixed
- [K8s] Fixed default runtime builds impacted by Debian Buster end-of-life.
- [GCP Cloud Run] Added Artifact Registry (`pkg.dev`) runtime deployment support


## [v3.6.4]

### Fixed
- [Executor] Support use of `functools.partial` with FunctionExecutor's `call_async` and `map` methods


## [v3.6.3]

### Fixed
- Fixed memory available options for aws batch: 4 cpus
- Fixed race condition and improving monitor stability
- [AWS Batch] Fixed memory available options for aws batch: 4 cpus
- [Monitor] Fixed race condition and improving monitor stability


## [v3.6.2]
Expand Down Expand Up @@ -977,7 +994,7 @@
## [v1.7.2]

### Added
- [GCR] Added Google Cloud Run Backend
- [GCP Cloud Run] Added Google Cloud Run backend


### Changed
Expand Down
25 changes: 25 additions & 0 deletions docs/source/compute_config/gcp_cloudrun.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ python3 -m pip install lithops[gcp]

10. Enable the **Artifact Registry API**: Navigate to *APIs & services* tab on the menu. Click *ENABLE APIS AND SERVICES*. Look for "Artifact Registry API" at the search bar. Click *Enable*.

11. Create a **Docker** repository in Artifact Registry (in the same region as Cloud Run), for example named `lithops`, or set `artifact_registry_repository` in config to match your repository name. From a shell with `gcloud` and the correct project:

```bash
gcloud artifacts repositories create lithops \
--repository-format=docker \
--location=<REGION_NAME> \
--description="Lithops Cloud Run runtimes"
```

Grant the service account **Artifact Registry Writer** (or **Artifact Registry Create-on-push Writer** if you prefer) so it can push images.

Example command:

```bash
gcloud artifacts repositories add-iam-policy-binding gcf-artifacts \
--location=us-east1 \
--project=lithops-dev \
--member="serviceAccount:lithops-executor@lithops-dev.iam.gserviceaccount.com" \
--role="roles/artifactregistry.writer"
```

Replace `gcf-artifacts`, `us-east1`, `lithops-dev`, and the service account email with your own values.

## Configuration

1. Edit your lithops config and add the following keys:
Expand Down Expand Up @@ -67,6 +90,8 @@ python3 -m pip install lithops[gcp]
|gcp_cloudrun | trigger | https | no | Currently it supports 'https' trigger|
|gcp_cloudrun | invoke_pool_threads | 100 |no | Number of concurrent threads used for invocation |
|gcp_cloudrun | runtime_include_function | False | no | If set to true, Lithops will automatically build a new runtime, including the function's code, instead of transferring it through the storage backend at invocation time. This is useful when the function's code size is large (in the order of 10s of MB) and the code does not change frequently |
|gcp_cloudrun | docker_server | pkg.dev | no | Marker for [Artifact Registry](https://cloud.google.com/artifact-registry/docs/docker/names) default image names (`REGION-docker.pkg.dev/PROJECT/REPOSITORY/IMAGE`). |
|gcp_cloudrun | artifact_registry_repository | lithops | no | Docker repository name in Artifact Registry (must exist under your project in the configured region). |

## Test Lithops
Once you have your compute and storage backends configured, you can run a hello world function with:
Expand Down
196 changes: 175 additions & 21 deletions lithops/serverless/backends/gcp_cloudrun/cloudrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
import os
import time
import json
import urllib
import yaml
import hashlib
import logging
import httplib2
import requests
import google.auth
import google.oauth2.id_token
import google.auth.transport.requests
from threading import Lock
from google.oauth2 import service_account
from google_auth_httplib2 import AuthorizedHttp
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from lithops import utils
from lithops.constants import COMPUTE_CLI_MSG
Expand All @@ -51,6 +53,7 @@ def __init__(self, cloudrun_config, internal_storage):
self.credentials_path = cloudrun_config.get('credentials_path')

self._build_api_resource()
self._resolve_artifact_registry_repository_fallback()

self._service_url = None
self._id_token = None
Expand All @@ -75,16 +78,30 @@ def _get_default_runtime_image_name(self):
self.name, self.cr_config, 'lithops-cloudrun-default'
)

def _full_container_image_reference(self, name):
"""True if name already includes a registry hostname (user-supplied image)."""
if '/' not in name:
return False
host = name.split('/', 1)[0]
if host.startswith('./') or host.startswith('../'):
return False
# hostname or host:port (e.g. REGION-docker.pkg.dev, docker.io)
return '.' in host or (':' in host and not host.startswith('/'))

def _docker_login_registry_host(self):
"""Artifact Registry hostname for docker login / push."""
return f'{self.region}-docker.pkg.dev'

def _format_image_name(self, runtime_name):
"""
Formats GCR image name from runtime name
Formats Artifact Registry image reference from runtime name.
"""
if 'gcr.io' not in runtime_name:
country = self.region.split('-')[0]
return f'{country}.gcr.io/{self.project_name}/{runtime_name}'
else:
if self._full_container_image_reference(runtime_name):
return runtime_name

repo = self.cr_config.get('artifact_registry_repository', 'lithops')
return f'{self.region}-docker.pkg.dev/{self.project_name}/{repo}/{runtime_name}'

def _build_api_resource(self):
"""
Instantiate and authorize admin discovery API session
Expand All @@ -107,10 +124,122 @@ def _build_api_resource(self):
'api_endpoint': f'https://{self.region}-run.googleapis.com'
}
)
self._ar_resource = build(
'artifactregistry', 'v1',
http=http, cache_discovery=False
)

self.cr_config['project_name'] = self.project_name
self.cr_config['service_account'] = self.service_account

def _parse_artifact_registry_image_name(self, image_name):
"""
Parse Artifact Registry image format:
REGION-docker.pkg.dev/PROJECT/REPOSITORY/IMAGE[:TAG]
"""
parts = image_name.split('/')
if len(parts) < 4:
return None
host, project, repository = parts[0], parts[1], parts[2]
if not host.endswith('-docker.pkg.dev'):
return None
location = host.replace('-docker.pkg.dev', '')
return project, location, repository

def _artifact_registry_uploader_identity(self):
if self.credentials_path and os.path.isfile(self.credentials_path):
try:
with open(self.credentials_path, 'r') as f:
cred_data = json.load(f)
return cred_data.get('client_email')
except Exception:
return None
return self.service_account

def _list_docker_repositories(self):
parent = f'projects/{self.project_name}/locations/{self.region}'
repos = []
page_token = None
while True:
req = self._ar_resource.projects().locations().repositories().list(
parent=parent, pageToken=page_token
)
res = req.execute()
for repo in res.get('repositories', []):
if repo.get('format') == 'DOCKER':
repos.append(repo['name'].rsplit('/', 1)[-1])
page_token = res.get('nextPageToken')
if not page_token:
break
return repos

def _resolve_artifact_registry_repository_fallback(self):
"""
Minimal repository resolution:
- Use configured repository if accessible
- Otherwise fallback to an existing DOCKER repository (prefer gcf-artifacts)
"""
repository = self.cr_config.get('artifact_registry_repository', 'lithops')
name = f'projects/{self.project_name}/locations/{self.region}/repositories/{repository}'

try:
self._ar_resource.projects().locations().repositories().get(name=name).execute()
return
except Exception:
pass

try:
docker_repos = self._list_docker_repositories()
except Exception:
docker_repos = []

if not docker_repos:
return

fallback = 'gcf-artifacts' if 'gcf-artifacts' in docker_repos else sorted(docker_repos)[0]
if fallback != repository:
self.cr_config['artifact_registry_repository'] = fallback
logger.info(
f'Using Artifact Registry repository "{fallback}" '
f'(configured "{repository}" is not accessible).'
)

def _ensure_artifact_registry_upload_permission(self, image_name):
"""
Check that current principal can upload artifacts to the target repo.
"""
parsed = self._parse_artifact_registry_image_name(image_name)
if not parsed:
return

project, location, repository = parsed
resource = f'projects/{project}/locations/{location}/repositories/{repository}'

try:
result = self._ar_resource.projects().locations().repositories().testIamPermissions(
resource=resource,
body={
'permissions': ['artifactregistry.repositories.uploadArtifacts']
}
).execute()
except HttpError:
# If we cannot test permissions (forbidden/unavailable), fail later on push.
return

granted = set(result.get('permissions', []))
if 'artifactregistry.repositories.uploadArtifacts' in granted:
return

principal = self._artifact_registry_uploader_identity() or 'current credentials principal'
raise Exception(
'Missing Artifact Registry permission to push runtime image. '
f'Principal "{principal}" does not have '
f'"artifactregistry.repositories.uploadArtifacts" on "{resource}". '
'Grant role "roles/artifactregistry.writer" on the repository/project or configure '
'a repository where this principal has write access via '
'`gcp_cloudrun.artifact_registry_repository`.'
)

def _get_url_and_token(self, service_name):
"""
Generates a connection token
Expand Down Expand Up @@ -193,17 +322,24 @@ def invoke(self, runtime_name, runtime_memory, payload, return_result=False):
else:
logger.debug('Invoking function')

req = urllib.request.Request(service_url + route, data=json.dumps(payload, default=str).encode('utf-8'))
req.add_header("Authorization", f"Bearer {id_token}")
res = urllib.request.urlopen(req)
headers = {
"Authorization": f"Bearer {id_token}",
"Content-Type": "application/json"
}
response = requests.post(
service_url + route,
data=json.dumps(payload, default=str),
headers=headers,
timeout=120
)

if res.getcode() in (200, 202):
data = json.loads(res.read())
if response.status_code in (200, 202):
data = response.json()
if return_result:
return data
return data["activationId"]
else:
raise Exception(res.text)
raise Exception(response.text)

def build_runtime(self, runtime_name, dockerfile, extra_args=[]):
"""
Expand All @@ -229,21 +365,29 @@ def build_runtime(self, runtime_name, dockerfile, extra_args=[]):
finally:
os.remove(config.FH_ZIP_LOCATION)

logger.debug('Authorizing Docker client with GCR permissions')
country = self.region.split('-')[0]
cmd = f'cat {self.credentials_path} | {docker_path} login {country}.gcr.io -u _json_key --password-stdin'
registry_host = self._docker_login_registry_host()
logger.debug(f'Authorizing Docker client for registry {registry_host}')
cmd = f'cat {self.credentials_path} | {docker_path} login {registry_host} -u _json_key --password-stdin'
if logger.getEffectiveLevel() != logging.DEBUG:
cmd = cmd + f" >{os.devnull} 2>&1"
res = os.system(cmd)
if res != 0:
raise Exception('There was an error authorizing Docker for push to GCR')
raise Exception(f'There was an error authorizing Docker for push to {registry_host}')

logger.debug(f'Pushing runtime {image_name} to GCP Container Registry')
logger.debug(f'Pushing runtime {image_name} to {registry_host}')
self._ensure_artifact_registry_upload_permission(image_name)
if utils.is_podman(docker_path):
cmd = f'{docker_path} push {image_name} --format docker --remove-signatures'
else:
cmd = f'{docker_path} push {image_name}'
utils.run_command(cmd)
try:
utils.run_command(cmd)
except Exception as e:
raise Exception(
f'Unable to push runtime image to Artifact Registry ({image_name}). '
'Verify the repository exists and that your identity has Artifact Registry write permissions '
'(artifactregistry.repositories.uploadArtifacts).'
) from e

def _create_service(self, runtime_name, runtime_memory, timeout):
"""
Expand Down Expand Up @@ -278,9 +422,19 @@ def _create_service(self, runtime_name, runtime_memory, timeout):
container['resources']['requests']['cpu'] = str(self.cr_config['runtime_cpu'])

logger.debug(f"Creating service: {service_name}")
res = self._api_resource.namespaces().services().create(
parent=f'namespaces/{self.project_name}', body=svc_res
).execute()
try:
res = self._api_resource.namespaces().services().create(
parent=f'namespaces/{self.project_name}', body=svc_res
).execute()
except HttpError as e:
if e.resp.status == 409:
logger.debug(f'Service {service_name} already exists. Recreating it')
self._delete_service(service_name)
res = self._api_resource.namespaces().services().create(
parent=f'namespaces/{self.project_name}', body=svc_res
).execute()
else:
raise
logger.debug(f'Ok -- service created {service_name}')

# Wait until service is up
Expand Down
Loading
Loading