diff --git a/docs/source/compute_backends.rst b/docs/source/compute_backends.rst index 49c52f14..0148a661 100644 --- a/docs/source/compute_backends.rst +++ b/docs/source/compute_backends.rst @@ -44,3 +44,4 @@ Compute Backends compute_config/ibm_vpc.md compute_config/aws_ec2.md compute_config/azure_vms.md + compute_config/gcp_compute_engie.md diff --git a/docs/source/compute_config/gcp_compute_engie.md b/docs/source/compute_config/gcp_compute_engie.md new file mode 100644 index 00000000..07b6670e --- /dev/null +++ b/docs/source/compute_config/gcp_compute_engie.md @@ -0,0 +1,122 @@ +# Google Compute Engine (GCE) + +Lithops includes a standalone backend named `gcp_compute_engie` for running jobs on Google Compute Engine virtual machines. + +This backend supports: + +- **consume mode**: use an existing VM instance. +- **create mode**: create the network resources and master/worker VMs on demand. +- **reuse mode**: keep and reuse workers between executions, creating only the missing delta. + +> **Note** +> The backend key is intentionally `gcp_compute_engie` to match the current Lithops backend module name. + +## Installation + +Install Google Cloud dependencies: + +```bash +python3 -m pip install lithops[gcp] +``` + +## Required IAM/API setup + +Use a service account with permissions to manage Compute Engine resources (instances, networks, subnetworks, firewalls) and enable: + +- Compute Engine API (`compute.googleapis.com`) + +Optional command: + +```bash +gcloud services enable compute.googleapis.com --project +``` + +## Create and reuse modes + +In `create` mode, Lithops automatically provisions VPC/network resources plus worker VMs during execution and dismantles workers afterward. +In `reuse` mode, Lithops reuses existing workers and only creates additional workers when needed. + +### Configuration + +```yaml +lithops: + backend: gcp_compute_engie + mode: standalone + +gcp: + credentials_path: + +gcp_compute_engie: + project_name: + zone: # e.g. us-east1-b + region: # optional, derived from zone if omitted + exec_mode: reuse # create | reuse | consume + worker_instance_type: e2-standard-2 + master_instance_type: e2-small +``` + +### Summary of configuration keys (create/reuse) + +|Group|Key|Default|Mandatory|Additional info| +|---|---|---|---|---| +|gcp_compute_engie|project_name||yes|GCP project ID | +|gcp_compute_engie|zone||yes|Compute Engine zone, for example `us-east1-b` | +|gcp_compute_engie|region|derived from zone|no|Region used for subnet creation | +|gcp_compute_engie|credentials_path||no|Service account JSON path. If omitted, ADC is used | +|gcp_compute_engie|master_instance_type|e2-small|no|Master VM machine type | +|gcp_compute_engie|worker_instance_type|e2-standard-2|no|Worker VM machine type | +|gcp_compute_engie|source_image|ubuntu-2204-lts family|no|Boot image reference | +|gcp_compute_engie|boot_disk_size|50|no|Boot disk size (GB) | +|gcp_compute_engie|boot_disk_type|pd-standard|no|Boot disk type | +|gcp_compute_engie|network_cidr|10.0.0.0/16|no|CIDR for created network | +|gcp_compute_engie|subnet_cidr|10.0.0.0/24|no|CIDR for created subnet | +|gcp_compute_engie|request_spot_instances|False|no|Use Spot VMs for workers | +|gcp_compute_engie|max_workers|100|no|Max number of workers per `FunctionExecutor()` | +|gcp_compute_engie|worker_processes|AUTO|no|Worker process count | +|gcp_compute_engie|exec_mode|reuse|no|One of `consume`, `create`, `reuse` | + +## Consume mode + +In consume mode, Lithops uses an existing VM and does not provision network/worker resources. + +### Configuration + +```yaml +lithops: + backend: gcp_compute_engie + mode: standalone + +gcp: + credentials_path: + +gcp_compute_engie: + exec_mode: consume + project_name: + zone: + instance_name: + ssh_username: ubuntu + ssh_key_filename: ~/.ssh/id_rsa +``` + +### Summary of configuration keys (consume) + +|Group|Key|Default|Mandatory|Additional info| +|---|---|---|---|---| +|gcp_compute_engie|project_name||yes|GCP project ID | +|gcp_compute_engie|zone||yes|Compute Engine zone | +|gcp_compute_engie|instance_name||yes|Existing VM instance name | +|gcp_compute_engie|ssh_username|ubuntu|no|SSH user for the VM | +|gcp_compute_engie|ssh_key_filename|~/.ssh/id_rsa|no|Path to SSH private key | +|gcp_compute_engie|worker_processes|AUTO|no|Worker process count | + +## Test Lithops + +```bash +lithops hello -b gcp_compute_engie -s gcp_storage +``` + +## Viewing execution logs + +```bash +lithops logs poll +``` diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 273bb525..0f3df147 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -37,6 +37,7 @@ Choose your compute and storage engines from the table below: || `IBM Virtual Private Cloud `_ || | || `AWS Elastic Compute Cloud (EC2) `_ || | || `Azure Virtual Machines `_ || | +|| `Google Compute Engine `_ || | +--------------------------------------------------------------------+--------------------------------------------------------------------+ Configuration File diff --git a/docs/source/execution_modes.rst b/docs/source/execution_modes.rst index 1d280d88..5ff65c42 100644 --- a/docs/source/execution_modes.rst +++ b/docs/source/execution_modes.rst @@ -83,4 +83,4 @@ underlying infrastructure. fexec = lithops.StandaloneExecutor() -- Available backends: `IBM Virtual Private Cloud `_, `AWS Elastic Compute Cloud (EC2) `_, `Azure Virtual Machines `_, `Virtual Machine `_ +- Available backends: `IBM Virtual Private Cloud `_, `AWS Elastic Compute Cloud (EC2) `_, `Azure Virtual Machines `_, `Google Compute Engine `_, `Virtual Machine `_ diff --git a/docs/source/supported_clouds.rst b/docs/source/supported_clouds.rst index b9c85d0a..013768d1 100644 --- a/docs/source/supported_clouds.rst +++ b/docs/source/supported_clouds.rst @@ -52,6 +52,8 @@ Currently, Lithops for Google Cloud Platform supports these backends: - `Google Cloud Storage `_ * - `Google Cloud Run `_ - + * - `Google Compute Engine (GCE) `_ + - Microsoft Azure --------------- diff --git a/lithops/standalone/backends/gcp_compute_engie/__init__.py b/lithops/standalone/backends/gcp_compute_engie/__init__.py new file mode 100644 index 00000000..d7d7fb0f --- /dev/null +++ b/lithops/standalone/backends/gcp_compute_engie/__init__.py @@ -0,0 +1,3 @@ +from .gcp_compute_engie import GCPComputeEngieBackend as StandaloneBackend + +__all__ = ['StandaloneBackend'] diff --git a/lithops/standalone/backends/gcp_compute_engie/config.py b/lithops/standalone/backends/gcp_compute_engie/config.py new file mode 100644 index 00000000..e970b56c --- /dev/null +++ b/lithops/standalone/backends/gcp_compute_engie/config.py @@ -0,0 +1,90 @@ +# +# Copyright Cloudlab URV 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import copy +import os +import uuid + +from lithops.constants import SA_DEFAULT_CONFIG_KEYS + + +DEFAULT_CONFIG_KEYS = { + 'master_instance_type': 'e2-small', + 'worker_instance_type': 'e2-standard-2', + 'boot_disk_size': 50, + 'boot_disk_type': 'pd-standard', + 'network_cidr': '10.0.0.0/16', + 'subnet_cidr': '10.0.0.0/24', + 'source_image': 'projects/ubuntu-os-cloud/global/images/family/ubuntu-2204-lts', + 'ssh_username': 'ubuntu', + 'ssh_password': str(uuid.uuid4()), + 'ssh_key_filename': '~/.ssh/id_rsa', + 'delete_on_dismantle': False, + 'max_workers': 100, + 'request_spot_instances': False, + 'worker_processes': 'AUTO' +} + +MANDATORY_PARAMETERS_1 = ('project_name', 'zone', 'instance_name') +MANDATORY_PARAMETERS_2 = ('project_name', 'zone') + + +def load_config(config_data): + if 'gcp_compute_engie' not in config_data or not config_data['gcp_compute_engie']: + raise Exception("'gcp_compute_engie' section is mandatory in the configuration") + + if 'gcp' not in config_data: + config_data['gcp'] = {} + + temp = copy.deepcopy(config_data['gcp_compute_engie']) + config_data['gcp_compute_engie'].update(config_data['gcp']) + config_data['gcp_compute_engie'].update(temp) + + if 'credentials_path' in config_data['gcp_compute_engie']: + config_data['gcp_compute_engie']['credentials_path'] = os.path.expanduser( + config_data['gcp_compute_engie']['credentials_path'] + ) + + for key in DEFAULT_CONFIG_KEYS: + if key not in config_data['gcp_compute_engie']: + config_data['gcp_compute_engie'][key] = DEFAULT_CONFIG_KEYS[key] + + if 'standalone' not in config_data or config_data['standalone'] is None: + config_data['standalone'] = {} + + for key in SA_DEFAULT_CONFIG_KEYS: + if key in config_data['gcp_compute_engie']: + config_data['standalone'][key] = config_data['gcp_compute_engie'].pop(key) + elif key not in config_data['standalone']: + config_data['standalone'][key] = SA_DEFAULT_CONFIG_KEYS[key] + + if config_data['standalone']['exec_mode'] == 'consume': + params_to_check = MANDATORY_PARAMETERS_1 + config_data['gcp_compute_engie']['max_workers'] = 1 + else: + params_to_check = MANDATORY_PARAMETERS_2 + + for param in params_to_check: + if param not in config_data['gcp_compute_engie']: + msg = f"'{param}' is mandatory in the 'gcp_compute_engie' section of the configuration" + raise Exception(msg) + + if 'region' not in config_data['gcp_compute_engie']: + zone = config_data['gcp_compute_engie']['zone'] + config_data['gcp_compute_engie']['region'] = '-'.join(zone.split('-')[:-1]) + + if 'region' not in config_data['gcp'] and 'region' in config_data['gcp_compute_engie']: + config_data['gcp']['region'] = config_data['gcp_compute_engie']['region'] diff --git a/lithops/standalone/backends/gcp_compute_engie/gcp_compute_engie.py b/lithops/standalone/backends/gcp_compute_engie/gcp_compute_engie.py new file mode 100644 index 00000000..b88200fa --- /dev/null +++ b/lithops/standalone/backends/gcp_compute_engie/gcp_compute_engie.py @@ -0,0 +1,587 @@ +# +# Copyright Cloudlab URV 2021 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import re +import time +import uuid +import logging +import httplib2 +import google.auth +from google.oauth2 import service_account +from google_auth_httplib2 import AuthorizedHttp +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError + +from lithops.version import __version__ +from lithops.util.ssh_client import SSHClient +from lithops.constants import COMPUTE_CLI_MSG, CACHE_DIR +from lithops.config import load_yaml_config, dump_yaml_config +from lithops.standalone.utils import StandaloneMode +from lithops.standalone import LithopsValidationError + + +logger = logging.getLogger(__name__) + +INSTANCE_START_TIMEOUT = 180 + + +class GCPComputeEngieBackend: + + def __init__(self, config, mode): + logger.debug("Creating GCP Compute Engine client") + self.name = 'gcp_compute_engie' + self.config = config + self.mode = mode + self.project_name = self.config['project_name'] + self.zone = self.config['zone'] + self.region = self.config.get('region') or '-'.join(self.zone.split('-')[:-1]) + self.credentials_path = self.config.get('credentials_path') + + suffix = 'vm' if self.mode == StandaloneMode.CONSUME.value else 'vpc' + self.cache_dir = os.path.join(CACHE_DIR, self.name) + self.cache_file = os.path.join(self.cache_dir, f'{self.project_name}_{self.region}_{suffix}_data') + self.gce_data = {} + self.vpc_data_type = 'provided' if 'network_name' in self.config else 'created' + self.ssh_data_type = 'provided' if 'ssh_key_filename' in self.config else 'created' + self.network_name = self.config.get('network_name') + self.network_key = None + + self.compute_client = self._create_compute_client() + + self.master = None + self.workers = [] + + msg = COMPUTE_CLI_MSG.format('GCP Compute Engine') + logger.info(f"{msg} - Zone: {self.zone} - Project: {self.project_name}") + + def _wait_operation(self, operation_name, scope='zone'): + while True: + if scope == 'zone': + op = self.compute_client.zoneOperations().get( + project=self.project_name, zone=self.zone, operation=operation_name + ).execute() + elif scope == 'region': + op = self.compute_client.regionOperations().get( + project=self.project_name, region=self.region, operation=operation_name + ).execute() + else: + op = self.compute_client.globalOperations().get( + project=self.project_name, operation=operation_name + ).execute() + + if op['status'] == 'DONE': + if 'error' in op: + raise Exception(op['error']) + return + time.sleep(2) + + def _load_gce_data(self): + self.gce_data = load_yaml_config(self.cache_file) + if self.gce_data and 'network_name' in self.gce_data: + self.network_name = self.gce_data['network_name'] + self.network_key = self.gce_data.get('network_key') + + def _dump_gce_data(self): + dump_yaml_config(self.cache_file, self.gce_data) + + def _delete_vpc_data(self): + if os.path.exists(self.cache_file): + os.remove(self.cache_file) + + def _resource_exists(self, getter): + try: + getter() + return True + except HttpError as e: + if getattr(e.resp, 'status', None) == 404: + return False + raise + + def _create_network(self): + if 'network_name' in self.config: + return + + if 'network_name' in self.gce_data: + self.config['network_name'] = self.gce_data['network_name'] + self.config['subnet_name'] = self.gce_data['subnet_name'] + self.config['firewall_name'] = self.gce_data['firewall_name'] + return + + self.network_name = f'lithops-net-{str(uuid.uuid4())[-6:]}' + assert re.match("^[a-z0-9-]*$", self.network_name), f'Network name "{self.network_name}" not valid' + self.network_key = self.network_name[-6:] + subnet_name = f'{self.network_name}-subnet' + firewall_name = f'{self.network_name}-fw' + + body = { + 'name': self.network_name, + 'autoCreateSubnetworks': False + } + op = self.compute_client.networks().insert(project=self.project_name, body=body).execute() + self._wait_operation(op['name'], scope='global') + + body = { + 'name': subnet_name, + 'ipCidrRange': self.config['subnet_cidr'], + 'network': f'projects/{self.project_name}/global/networks/{self.network_name}', + 'region': self.region + } + op = self.compute_client.subnetworks().insert( + project=self.project_name, region=self.region, body=body + ).execute() + self._wait_operation(op['name'], scope='region') + + body = { + 'name': firewall_name, + 'network': f'projects/{self.project_name}/global/networks/{self.network_name}', + 'sourceRanges': ['0.0.0.0/0'], + 'allowed': [{'IPProtocol': 'tcp', 'ports': ['22']}] + } + op = self.compute_client.firewalls().insert(project=self.project_name, body=body).execute() + self._wait_operation(op['name'], scope='global') + + internal_fw_name = f'{self.network_name}-internal-fw' + body = { + 'name': internal_fw_name, + 'network': f'projects/{self.project_name}/global/networks/{self.network_name}', + 'sourceRanges': [self.config['network_cidr']], + 'allowed': [{'IPProtocol': 'tcp', 'ports': ['8080', '8081', '6379', '22']}] + } + op = self.compute_client.firewalls().insert(project=self.project_name, body=body).execute() + self._wait_operation(op['name'], scope='global') + + self.config['network_name'] = self.network_name + self.config['subnet_name'] = subnet_name + self.config['firewall_name'] = firewall_name + self.config['internal_firewall_name'] = internal_fw_name + + def _create_ssh_key(self): + if 'ssh_key_filename' in self.config: + return + + if 'ssh_key_filename' in self.gce_data and os.path.isfile(self.gce_data['ssh_key_filename']): + self.config['ssh_key_filename'] = self.gce_data['ssh_key_filename'] + return + + keyname = f'lithops-key-{str(uuid.uuid4())[-8:]}' + filename = os.path.join("~", ".ssh", f"{keyname}.{self.name}.id_rsa") + key_filename = os.path.expanduser(filename) + if not os.path.isfile(key_filename): + logger.debug("Generating new ssh key pair") + os.system(f'ssh-keygen -b 2048 -t rsa -f {key_filename} -q -N ""') + self.config['ssh_key_filename'] = key_filename + + def _instance_exists(self, instance_name): + return self._resource_exists( + lambda: self.compute_client.instances().get( + project=self.project_name, zone=self.zone, instance=instance_name + ).execute() + ) + + def _create_master_instance(self): + name = self.config.get('instance_name') or f'lithops-master-{self.network_key}' + self.master = GCPComputeEngieInstance(self.config, self.compute_client) + self.master.name = name + self.master.instance_type = self.config['master_instance_type'] + self.master.delete_on_dismantle = False + + if self.mode != StandaloneMode.CONSUME.value and not self._instance_exists(name): + self.master.create(public=True) + self.master.get_instance_data() + + def _create_compute_client(self): + if self.credentials_path and os.path.isfile(self.credentials_path): + credentials = service_account.Credentials.from_service_account_file( + self.credentials_path, + scopes=['https://www.googleapis.com/auth/cloud-platform'] + ) + else: + credentials, _ = google.auth.default( + scopes=['https://www.googleapis.com/auth/cloud-platform'] + ) + + http = AuthorizedHttp(credentials, http=httplib2.Http()) + return build('compute', 'v1', http=http, cache_discovery=False) + + def is_initialized(self): + if self.mode == StandaloneMode.CONSUME.value: + return True + return os.path.isfile(self.cache_file) + + def init(self): + logger.debug(f'Initializing GCP Compute Engine backend ({self.mode} mode)') + self._load_gce_data() + + if self.mode == StandaloneMode.CONSUME.value: + self._create_master_instance() + self.gce_data = { + 'mode': self.mode, + 'master_name': self.master.name, + 'master_id': self.master.get_instance_id() + } + self._dump_gce_data() + return + + self._create_network() + self._create_ssh_key() + if 'instance_name' not in self.config: + self.config['instance_name'] = f'lithops-master-{self.network_key}' + self._create_master_instance() + self.gce_data = { + 'mode': self.mode, + 'vpc_data_type': self.vpc_data_type, + 'ssh_data_type': self.ssh_data_type, + 'master_name': self.master.name, + 'master_id': self.master.get_instance_id(), + 'network_name': self.config['network_name'], + 'network_key': self.network_key, + 'subnet_name': self.config['subnet_name'], + 'firewall_name': self.config['firewall_name'], + 'internal_firewall_name': self.config['internal_firewall_name'], + 'ssh_key_filename': self.config['ssh_key_filename'] + } + self._dump_gce_data() + + def build_image(self, **kwargs): + raise NotImplementedError(f'{self.name}.build_image() is not implemented yet') + + def delete_image(self, **kwargs): + raise NotImplementedError(f'{self.name}.delete_image() is not implemented yet') + + def list_images(self, **kwargs): + raise NotImplementedError(f'{self.name}.list_images() is not implemented yet') + + def clean(self, **kwargs): + all_clean = kwargs.get('all', False) + if self.mode == StandaloneMode.CONSUME.value: + self._delete_vpc_data() + return + + self._delete_vm_instances(all=all_clean) + if all_clean: + self._delete_network_resources() + self._delete_ssh_key() + self._delete_vpc_data() + + def _delete_vm_instances(self, all=False): + prefixes = ('lithops-worker-', 'lithops-master-') if all else ('lithops-worker-',) + instances = self.compute_client.instances().list( + project=self.project_name, zone=self.zone + ).execute().get('items', []) + + for ins in instances: + name = ins.get('name', '') + if not name.startswith(prefixes): + continue + logger.debug(f"Deleting VM instance {name}") + op = self.compute_client.instances().delete( + project=self.project_name, zone=self.zone, instance=name + ).execute() + self._wait_operation(op['name'], scope='zone') + + def _delete_network_resources(self): + if self.gce_data.get('vpc_data_type') == 'provided': + return + + fw_names = [ + self.gce_data.get('firewall_name'), + self.gce_data.get('internal_firewall_name') + ] + for fw_name in fw_names: + if not fw_name: + continue + try: + op = self.compute_client.firewalls().delete( + project=self.project_name, firewall=fw_name + ).execute() + self._wait_operation(op['name'], scope='global') + except HttpError as e: + if getattr(e.resp, 'status', None) != 404: + raise + + subnet_name = self.gce_data.get('subnet_name') + if subnet_name: + try: + op = self.compute_client.subnetworks().delete( + project=self.project_name, region=self.region, subnetwork=subnet_name + ).execute() + self._wait_operation(op['name'], scope='region') + except HttpError as e: + if getattr(e.resp, 'status', None) != 404: + raise + + network_name = self.gce_data.get('network_name') + if network_name: + try: + op = self.compute_client.networks().delete( + project=self.project_name, network=network_name + ).execute() + self._wait_operation(op['name'], scope='global') + except HttpError as e: + if getattr(e.resp, 'status', None) != 404: + raise + + def _delete_ssh_key(self): + if self.gce_data.get('ssh_data_type') == 'provided': + return + key_filename = self.gce_data.get('ssh_key_filename') + if key_filename and "lithops-key-" in key_filename: + if os.path.isfile(key_filename): + os.remove(key_filename) + if os.path.isfile(f"{key_filename}.pub"): + os.remove(f"{key_filename}.pub") + + def clear(self, **kwargs): + return + + def dismantle(self, include_master=True): + if len(self.workers) > 0: + for worker in self.workers: + worker.stop() + self.workers = [] + if include_master and self.master: + self.master.stop() + + def get_instance(self, name, **kwargs): + instance = GCPComputeEngieInstance(self.config, self.compute_client) + instance.name = name + for key in kwargs: + if hasattr(instance, key) and kwargs[key] is not None: + setattr(instance, key, kwargs[key]) + return instance + + def get_worker_instance_type(self): + return self.config['worker_instance_type'] + + def get_worker_cpu_count(self): + return 1 + + def create_worker(self, name): + if self.mode == StandaloneMode.CONSUME.value: + raise NotImplementedError(f'{self.name}.create_worker() not available in consume mode') + + worker = GCPComputeEngieInstance(self.config, self.compute_client) + worker.name = name + worker.instance_type = self.config['worker_instance_type'] + worker.ssh_credentials['key_filename'] = '~/.ssh/lithops_id_rsa' + worker.ssh_credentials.pop('password', None) + worker.create(public=False) + self.workers.append(worker) + + def get_runtime_key(self, runtime_name, version=__version__): + runtime = runtime_name.replace('/', '-').replace(':', '-') + master_id = self.config['instance_name'] + return os.path.join(self.name, version, master_id, runtime) + + +class GCPComputeEngieInstance: + + def __init__(self, config, compute_client): + self.config = config + self.compute_client = compute_client + self.name = self.config['instance_name'] + self.project_name = self.config['project_name'] + self.zone = self.config['zone'] + + self.ssh_client = None + self.instance_data = None + self.instance_id = None + self.private_ip = None + self.public_ip = None + self.delete_on_dismantle = self.config['delete_on_dismantle'] + self.instance_type = self.config['worker_instance_type'] + self.home_dir = '/home/' + self.config['ssh_username'] + + self.ssh_credentials = { + 'username': self.config['ssh_username'], + 'password': self.config['ssh_password'], + 'key_filename': self.config.get('ssh_key_filename', '~/.ssh/id_rsa') + } + + def __str__(self): + ip = self.public_ip or self.private_ip + if ip: + return f'VM instance {self.name} ({ip})' + return f'VM instance {self.name}' + + def get_ssh_client(self): + ip_address = self.public_ip or self.private_ip + if not ip_address: + self.get_public_ip() + ip_address = self.public_ip or self.private_ip + + if not self.ssh_client or self.ssh_client.ip_address != ip_address: + self.ssh_client = SSHClient(ip_address, self.ssh_credentials) + return self.ssh_client + + def del_ssh_client(self): + if self.ssh_client: + try: + self.ssh_client.close() + except Exception: + pass + self.ssh_client = None + + def is_ready(self): + try: + self.get_ssh_client().run_remote_command('id') + except LithopsValidationError as err: + raise err + except Exception as err: + logger.debug(f'SSH to {self.public_ip or self.private_ip} failed: {err}') + self.del_ssh_client() + return False + return True + + def wait_ready(self, timeout=INSTANCE_START_TIMEOUT): + logger.debug(f'Waiting {self} to become ready') + start = time.time() + self.get_public_ip() + while (time.time() - start < timeout): + if self.is_ready(): + return True + time.sleep(5) + raise TimeoutError(f'Readiness probe expired on {self}') + + def get_instance_data(self): + res = self.compute_client.instances().get( + project=self.project_name, + zone=self.zone, + instance=self.name + ).execute() + self.instance_data = res + self.instance_id = str(res.get('id')) + + interfaces = res.get('networkInterfaces', []) + if interfaces: + self.private_ip = interfaces[0].get('networkIP') + access_cfg = interfaces[0].get('accessConfigs', []) + if access_cfg: + self.public_ip = access_cfg[0].get('natIP') + return self.instance_data + + def get_instance_id(self): + if not self.instance_id: + self.get_instance_data() + return self.instance_id + + def get_private_ip(self): + if not self.private_ip: + self.get_instance_data() + return self.private_ip + + def get_public_ip(self): + if not self.public_ip: + self.get_instance_data() + return self.public_ip + + def create(self, public=False, **kwargs): + if self.get_instance_data() if self._exists() else False: + return + + with open(os.path.expanduser(self.ssh_credentials['key_filename'] + '.pub'), 'r') as pkf: + public_key_data = pkf.read().strip() + + network_iface = { + 'subnetwork': ( + f'projects/{self.project_name}/regions/{self.config["region"]}' + f'/subnetworks/{self.config["subnet_name"]}' + ) + } + if public: + network_iface['accessConfigs'] = [{'name': 'External NAT', 'type': 'ONE_TO_ONE_NAT'}] + + body = { + 'name': self.name, + 'machineType': f'zones/{self.zone}/machineTypes/{self.instance_type}', + 'disks': [{ + 'boot': True, + 'autoDelete': True, + 'initializeParams': { + 'sourceImage': self.config['source_image'], + 'diskSizeGb': str(self.config['boot_disk_size']), + 'diskType': f'zones/{self.zone}/diskTypes/{self.config["boot_disk_type"]}' + } + }], + 'networkInterfaces': [network_iface], + 'metadata': { + 'items': [{ + 'key': 'ssh-keys', + 'value': f'{self.ssh_credentials["username"]}:{public_key_data}' + }] + }, + 'labels': { + 'type': 'lithops-runtime' + } + } + if self.config.get('request_spot_instances', False) and not public: + body['scheduling'] = { + 'provisioningModel': 'SPOT', + 'instanceTerminationAction': 'STOP' + } + op = self.compute_client.instances().insert( + project=self.project_name, + zone=self.zone, + body=body + ).execute() + self._wait_zone_operation(op['name']) + self.get_instance_data() + + def start(self): + op = self.compute_client.instances().start( + project=self.project_name, zone=self.zone, instance=self.name + ).execute() + self._wait_zone_operation(op['name']) + + def stop(self): + if self.delete_on_dismantle: + return self.delete() + op = self.compute_client.instances().stop( + project=self.project_name, zone=self.zone, instance=self.name + ).execute() + self._wait_zone_operation(op['name']) + + def delete(self): + op = self.compute_client.instances().delete( + project=self.project_name, zone=self.zone, instance=self.name + ).execute() + self._wait_zone_operation(op['name']) + + def validate_capabilities(self): + pass + + def _exists(self): + try: + self.compute_client.instances().get( + project=self.project_name, zone=self.zone, instance=self.name + ).execute() + return True + except HttpError as e: + if getattr(e.resp, 'status', None) == 404: + return False + raise + + def _wait_zone_operation(self, operation_name): + while True: + op = self.compute_client.zoneOperations().get( + project=self.project_name, zone=self.zone, operation=operation_name + ).execute() + if op['status'] == 'DONE': + if 'error' in op: + raise Exception(op['error']) + return + time.sleep(2)