From fb7336f0ae59f64847e35ac02788bb20b32c796a Mon Sep 17 00:00:00 2001 From: Liu Zhengyun Date: Mon, 2 Feb 2026 12:34:46 +0800 Subject: [PATCH] sync codes for ainode --- iotdb-core/ainode/iotdb/ainode/core/config.py | 15 --- .../ainode/iotdb/ainode/core/constant.py | 1 - .../core/inference/inference_request_pool.py | 4 +- .../core/inference/pipeline/basic_pipeline.py | 74 +++++++++-- .../ainode/iotdb/ainode/core/ingress/iotdb.py | 8 -- .../ainode/core/manager/inference_manager.py | 116 ++++++++++-------- .../resources/conf/iotdb-ainode.properties | 4 - .../src/main/thrift/ainode.thrift | 3 +- 8 files changed, 135 insertions(+), 90 deletions(-) diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py b/iotdb-core/ainode/iotdb/ainode/core/config.py index b14efa3bedffa..4995dda7bf337 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/config.py +++ b/iotdb-core/ainode/iotdb/ainode/core/config.py @@ -23,7 +23,6 @@ AINODE_CLUSTER_INGRESS_ADDRESS, AINODE_CLUSTER_INGRESS_PASSWORD, AINODE_CLUSTER_INGRESS_PORT, - AINODE_CLUSTER_INGRESS_TIME_ZONE, AINODE_CLUSTER_INGRESS_USERNAME, AINODE_CLUSTER_NAME, AINODE_CONF_DIRECTORY_NAME, @@ -69,7 +68,6 @@ def __init__(self): self._ain_cluster_ingress_port = AINODE_CLUSTER_INGRESS_PORT self._ain_cluster_ingress_username = AINODE_CLUSTER_INGRESS_USERNAME self._ain_cluster_ingress_password = AINODE_CLUSTER_INGRESS_PASSWORD - self._ain_cluster_ingress_time_zone = AINODE_CLUSTER_INGRESS_TIME_ZONE # Inference configuration self._ain_inference_batch_interval_in_ms: int = ( @@ -287,14 +285,6 @@ def set_ain_cluster_ingress_password( ) -> None: self._ain_cluster_ingress_password = ain_cluster_ingress_password - def get_ain_cluster_ingress_time_zone(self) -> str: - return self._ain_cluster_ingress_time_zone - - def set_ain_cluster_ingress_time_zone( - self, ain_cluster_ingress_time_zone: str - ) -> None: - self._ain_cluster_ingress_time_zone = ain_cluster_ingress_time_zone - @singleton class AINodeDescriptor(object): @@ -432,11 +422,6 @@ def _load_config_from_file(self) -> None: file_configs["ain_cluster_ingress_password"] ) - if "ain_cluster_ingress_time_zone" in config_keys: - self._config.set_ain_cluster_ingress_time_zone( - file_configs["ain_cluster_ingress_time_zone"] - ) - except BadNodeUrlException: logger.warning("Cannot load AINode conf file, use default configuration.") diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py b/iotdb-core/ainode/iotdb/ainode/core/constant.py index b0019722630fa..4a2ee543d1f8d 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/constant.py +++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py @@ -39,7 +39,6 @@ AINODE_CLUSTER_INGRESS_PORT = 6667 AINODE_CLUSTER_INGRESS_USERNAME = "root" AINODE_CLUSTER_INGRESS_PASSWORD = "root" -AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8" # RPC config AINODE_THRIFT_COMPRESSION_ENABLED = False diff --git a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py index dcfa4528fce97..8121d4fecd8c6 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py +++ b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py @@ -127,7 +127,9 @@ def _step(self): for i in range(batch_inputs.size(0)): batch_input_list.append({"targets": batch_inputs[i]}) batch_inputs = self._inference_pipeline.preprocess( - batch_input_list, output_length=requests[0].output_length + batch_input_list, + output_length=requests[0].output_length, + auto_adapt=True, ) if isinstance(self._inference_pipeline, ForecastPipeline): batch_output = self._inference_pipeline.forecast( diff --git a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py index ece395bf6978f..5d0026522a136 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py +++ b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py @@ -19,13 +19,16 @@ from abc import ABC, abstractmethod import torch +from torch.nn import functional as F from iotdb.ainode.core.exception import InferenceModelInternalException +from iotdb.ainode.core.log import Logger from iotdb.ainode.core.manager.device_manager import DeviceManager from iotdb.ainode.core.model.model_info import ModelInfo from iotdb.ainode.core.model.model_loader import load_model BACKEND = DeviceManager() +logger = Logger() class BasicPipeline(ABC): @@ -70,6 +73,7 @@ def preprocess( infer_kwargs (dict, optional): Additional keyword arguments for inference, such as: - `output_length`(int): Used to check validation of 'future_covariates' if provided. + - `auto_adapt`(bool): Whether to automatically adapt the covariates. Raises: ValueError: If the input format is incorrect (e.g., missing keys, invalid tensor shapes). @@ -80,6 +84,7 @@ def preprocess( if isinstance(inputs, list): output_length = infer_kwargs.get("output_length", 96) + auto_adapt = infer_kwargs.get("auto_adapt", True) for idx, input_dict in enumerate(inputs): # Check if the dictionary contains the expected keys if not isinstance(input_dict, dict): @@ -121,10 +126,30 @@ def preprocess( raise ValueError( f"Each value in 'past_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}." ) - if cov_value.ndim != 1 or cov_value.shape[0] != input_length: + if cov_value.ndim != 1: raise ValueError( - f"Each covariate in 'past_covariates' must have shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}." + f"Individual `past_covariates` must be 1-d, found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}." ) + # If any past_covariate's length is not equal to input_length, process it accordingly. + if cov_value.shape[0] != input_length: + if auto_adapt: + if cov_value.shape[0] > input_length: + logger.warning( + f"Past covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (> {input_length}), which will be truncated from the beginning." + ) + past_covariates[cov_key] = cov_value[-input_length:] + else: + logger.warning( + f"Past covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (< {input_length}), which will be padded with zeros at the beginning." + ) + pad_size = input_length - cov_value.shape[0] + past_covariates[cov_key] = F.pad( + cov_value, (pad_size, 0) + ) + else: + raise ValueError( + f"Individual `past_covariates` must be 1-d with length equal to the length of `target` (= {input_length}), found: {cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}." + ) # Check 'future_covariates' if it exists (optional) future_covariates = input_dict.get("future_covariates", {}) @@ -134,19 +159,52 @@ def preprocess( ) # If future_covariates exists, check if they are a subset of past_covariates if future_covariates: - for cov_key, cov_value in future_covariates.items(): + for cov_key, cov_value in list(future_covariates.items()): + # If any future_covariate not found in past_covariates, ignore it or raise an error. if cov_key not in past_covariates: - raise ValueError( - f"Key '{cov_key}' in 'future_covariates' is not in 'past_covariates' at index {idx}." - ) + if auto_adapt: + future_covariates.pop(cov_key) + logger.warning( + f"Future covariate {cov_key} not found in past_covariates {list(past_covariates.keys())}, which will be ignored when executing forecasting." + ) + if not future_covariates: + input_dict.pop("future_covariates") + continue + else: + raise ValueError( + f"Expected keys in `future_covariates` to be a subset of `past_covariates` {list(past_covariates.keys())}, " + f"but found {cov_key} in element at index {idx}." + ) if not isinstance(cov_value, torch.Tensor): raise ValueError( f"Each value in 'future_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}." ) - if cov_value.ndim != 1 or cov_value.shape[0] != output_length: + if cov_value.ndim != 1: raise ValueError( - f"Each covariate in 'future_covariates' must have shape ({output_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}." + f"Individual `future_covariates` must be 1-d, found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}." ) + # If any future_covariate's length is not equal to output_length, process it accordingly. + if cov_value.shape[0] != output_length: + if auto_adapt: + if cov_value.shape[0] > output_length: + logger.warning( + f"Future covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (> {output_length}), which will be truncated from the end." + ) + future_covariates[cov_key] = cov_value[ + :output_length + ] + else: + logger.warning( + f"Future covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (< {output_length}), which will be padded with zeros at the end." + ) + pad_size = output_length - cov_value.shape[0] + future_covariates[cov_key] = F.pad( + cov_value, (0, pad_size) + ) + else: + raise ValueError( + f"Individual `future_covariates` must be 1-d with length equal to `output_length` (= {output_length}), found: {cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}." + ) else: raise ValueError( f"The inputs must be a list of dictionaries, but got {type(inputs)}." diff --git a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py index 13c56ca9d2d53..be1dd9bf1c2b3 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py +++ b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py @@ -69,9 +69,6 @@ def __init__( password: str = AINodeDescriptor() .get_config() .get_ain_cluster_ingress_password(), - time_zone: str = AINodeDescriptor() - .get_config() - .get_ain_cluster_ingress_time_zone(), use_rate: float = 1.0, offset_rate: float = 0.0, ): @@ -90,7 +87,6 @@ def __init__( node_urls=[f"{ip}:{port}"], user=username, password=password, - zone_id=time_zone, use_ssl=AINodeDescriptor() .get_config() .get_ain_cluster_ingress_ssl_enabled(), @@ -258,9 +254,6 @@ def __init__( password: str = AINodeDescriptor() .get_config() .get_ain_cluster_ingress_password(), - time_zone: str = AINodeDescriptor() - .get_config() - .get_ain_cluster_ingress_time_zone(), use_rate: float = 1.0, offset_rate: float = 0.0, ): @@ -272,7 +265,6 @@ def __init__( node_urls=[f"{ip}:{port}"], username=username, password=password, - time_zone=time_zone, use_ssl=AINodeDescriptor() .get_config() .get_ain_cluster_ingress_ssl_enabled(), diff --git a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py index 2ad25ad052954..07ca8a63bce02 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py +++ b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py @@ -175,6 +175,66 @@ def _process_request(self, req): with self._result_wrapper_lock: del self._result_wrapper_map[req_id] + def _do_inference_and_construct_resp( + self, + model_id: str, + model_inputs_list: list[dict[str, torch.Tensor | dict[str, torch.Tensor]]], + output_length: int, + inference_attrs: dict, + **kwargs, + ) -> list[bytes]: + auto_adapt = kwargs.get("auto_adapt", True) + if ( + output_length + > AINodeDescriptor().get_config().get_ain_inference_max_output_length() + ): + raise NumericalRangeException( + "output_length", + output_length, + 1, + AINodeDescriptor().get_config().get_ain_inference_max_output_length(), + ) + + if self._pool_controller.has_running_pools(model_id): + infer_req = InferenceRequest( + req_id=generate_req_id(), + model_id=model_id, + inputs=torch.stack( + [data["targets"] for data in model_inputs_list], dim=0 + ), + output_length=output_length, + ) + outputs = self._process_request(infer_req) + else: + model_info = self._model_manager.get_model_info(model_id) + inference_pipeline = load_pipeline( + model_info, device=self._backend.torch_device("cpu") + ) + inputs = inference_pipeline.preprocess( + model_inputs_list, + output_length=output_length, + auto_adapt=auto_adapt, + ) + if isinstance(inference_pipeline, ForecastPipeline): + outputs = inference_pipeline.forecast( + inputs, output_length=output_length, **inference_attrs + ) + elif isinstance(inference_pipeline, ClassificationPipeline): + outputs = inference_pipeline.classify(inputs) + elif isinstance(inference_pipeline, ChatPipeline): + outputs = inference_pipeline.chat(inputs) + else: + outputs = None + logger.error("[Inference] Unsupported pipeline type.") + outputs = inference_pipeline.postprocess(outputs) + + # convert tensor into tsblock for the output in each batch + resp_list = [] + for batch_idx, output in enumerate(outputs): + resp = convert_tensor_to_tsblock(output) + resp_list.append(resp) + return resp_list + def _run( self, req, @@ -191,65 +251,17 @@ def _run( inference_attrs = extract_attrs(req) output_length = int(inference_attrs.pop("output_length", 96)) - # model_inputs_list: Each element is a dict, which contains the following keys: - # `targets`: The input tensor for the target variable(s), whose shape is [target_count, input_length]. model_inputs_list: list[ dict[str, torch.Tensor | dict[str, torch.Tensor]] ] = [{"targets": inputs[0]}] - if ( - output_length - > AINodeDescriptor().get_config().get_ain_inference_max_output_length() - ): - raise NumericalRangeException( - "output_length", - output_length, - 1, - AINodeDescriptor() - .get_config() - .get_ain_inference_max_output_length(), - ) - - if self._pool_controller.has_running_pools(model_id): - infer_req = InferenceRequest( - req_id=generate_req_id(), - model_id=model_id, - inputs=torch.stack( - [data["targets"] for data in model_inputs_list], dim=0 - ), - output_length=output_length, - ) - outputs = self._process_request(infer_req) - else: - model_info = self._model_manager.get_model_info(model_id) - inference_pipeline = load_pipeline( - model_info, device=self._backend.torch_device("cpu") - ) - inputs = inference_pipeline.preprocess( - model_inputs_list, output_length=output_length - ) - if isinstance(inference_pipeline, ForecastPipeline): - outputs = inference_pipeline.forecast( - inputs, output_length=output_length, **inference_attrs - ) - elif isinstance(inference_pipeline, ClassificationPipeline): - outputs = inference_pipeline.classify(inputs) - elif isinstance(inference_pipeline, ChatPipeline): - outputs = inference_pipeline.chat(inputs) - else: - outputs = None - logger.error("[Inference] Unsupported pipeline type.") - outputs = inference_pipeline.postprocess(outputs) - - # convert tensor into tsblock for the output in each batch - output_list = [] - for batch_idx, output in enumerate(outputs): - output = convert_tensor_to_tsblock(output) - output_list.append(output) + resp_list = self._do_inference_and_construct_resp( + model_id, model_inputs_list, output_length, inference_attrs + ) return resp_cls( get_status(TSStatusCode.SUCCESS_STATUS), - [output_list[0]] if single_batch else output_list, + [resp_list[0]] if single_batch else resp_list, ) except Exception as e: diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties index fc569b27807ce..5b653d678a684 100644 --- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties +++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties @@ -52,10 +52,6 @@ ain_cluster_ingress_username=root # Datatype: String ain_cluster_ingress_password=root -# The time zone of the IoTDB cluster. -# Datatype: String -ain_cluster_ingress_time_zone=UTC+8 - # The device space allocated for inference # Datatype: Float ain_inference_memory_usage_ratio=0.2 diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift index 68347b89203ca..0416f3c69cb18 100644 --- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift +++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift @@ -87,7 +87,8 @@ struct TForecastReq { 3: required i32 outputLength 4: optional string historyCovs 5: optional string futureCovs - 6: optional map options + 6: optional bool autoAdapt + 7: optional map options } struct TForecastResp {