Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions iotdb-core/ainode/iotdb/ainode/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
AINODE_CLUSTER_INGRESS_ADDRESS,
AINODE_CLUSTER_INGRESS_PASSWORD,
AINODE_CLUSTER_INGRESS_PORT,
AINODE_CLUSTER_INGRESS_TIME_ZONE,
AINODE_CLUSTER_INGRESS_USERNAME,
AINODE_CLUSTER_NAME,
AINODE_CONF_DIRECTORY_NAME,
Expand Down Expand Up @@ -69,7 +68,6 @@ def __init__(self):
self._ain_cluster_ingress_port = AINODE_CLUSTER_INGRESS_PORT
self._ain_cluster_ingress_username = AINODE_CLUSTER_INGRESS_USERNAME
self._ain_cluster_ingress_password = AINODE_CLUSTER_INGRESS_PASSWORD
self._ain_cluster_ingress_time_zone = AINODE_CLUSTER_INGRESS_TIME_ZONE

# Inference configuration
self._ain_inference_batch_interval_in_ms: int = (
Expand Down Expand Up @@ -287,14 +285,6 @@ def set_ain_cluster_ingress_password(
) -> None:
self._ain_cluster_ingress_password = ain_cluster_ingress_password

def get_ain_cluster_ingress_time_zone(self) -> str:
return self._ain_cluster_ingress_time_zone

def set_ain_cluster_ingress_time_zone(
self, ain_cluster_ingress_time_zone: str
) -> None:
self._ain_cluster_ingress_time_zone = ain_cluster_ingress_time_zone


@singleton
class AINodeDescriptor(object):
Expand Down Expand Up @@ -432,11 +422,6 @@ def _load_config_from_file(self) -> None:
file_configs["ain_cluster_ingress_password"]
)

if "ain_cluster_ingress_time_zone" in config_keys:
self._config.set_ain_cluster_ingress_time_zone(
file_configs["ain_cluster_ingress_time_zone"]
)

except BadNodeUrlException:
logger.warning("Cannot load AINode conf file, use default configuration.")

Expand Down
1 change: 0 additions & 1 deletion iotdb-core/ainode/iotdb/ainode/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
AINODE_CLUSTER_INGRESS_PORT = 6667
AINODE_CLUSTER_INGRESS_USERNAME = "root"
AINODE_CLUSTER_INGRESS_PASSWORD = "root"
AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8"

# RPC config
AINODE_THRIFT_COMPRESSION_ENABLED = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def _step(self):
for i in range(batch_inputs.size(0)):
batch_input_list.append({"targets": batch_inputs[i]})
batch_inputs = self._inference_pipeline.preprocess(
batch_input_list, output_length=requests[0].output_length
batch_input_list,
output_length=requests[0].output_length,
auto_adapt=True,
)
if isinstance(self._inference_pipeline, ForecastPipeline):
batch_output = self._inference_pipeline.forecast(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
from abc import ABC, abstractmethod

import torch
from torch.nn import functional as F

from iotdb.ainode.core.exception import InferenceModelInternalException
from iotdb.ainode.core.log import Logger
from iotdb.ainode.core.manager.device_manager import DeviceManager
from iotdb.ainode.core.model.model_info import ModelInfo
from iotdb.ainode.core.model.model_loader import load_model

BACKEND = DeviceManager()
logger = Logger()


class BasicPipeline(ABC):
Expand Down Expand Up @@ -70,6 +73,7 @@ def preprocess(

infer_kwargs (dict, optional): Additional keyword arguments for inference, such as:
- `output_length`(int): Used to check validation of 'future_covariates' if provided.
- `auto_adapt`(bool): Whether to automatically adapt the covariates.

Raises:
ValueError: If the input format is incorrect (e.g., missing keys, invalid tensor shapes).
Expand All @@ -80,6 +84,7 @@ def preprocess(

if isinstance(inputs, list):
output_length = infer_kwargs.get("output_length", 96)
auto_adapt = infer_kwargs.get("auto_adapt", True)
for idx, input_dict in enumerate(inputs):
# Check if the dictionary contains the expected keys
if not isinstance(input_dict, dict):
Expand Down Expand Up @@ -121,10 +126,30 @@ def preprocess(
raise ValueError(
f"Each value in 'past_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
if cov_value.ndim != 1 or cov_value.shape[0] != input_length:
if cov_value.ndim != 1:
raise ValueError(
f"Each covariate in 'past_covariates' must have shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}."
f"Individual `past_covariates` must be 1-d, found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
)
# If any past_covariate's length is not equal to input_length, process it accordingly.
if cov_value.shape[0] != input_length:
if auto_adapt:
if cov_value.shape[0] > input_length:
logger.warning(
f"Past covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (> {input_length}), which will be truncated from the beginning."
)
past_covariates[cov_key] = cov_value[-input_length:]
else:
logger.warning(
f"Past covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (< {input_length}), which will be padded with zeros at the beginning."
)
pad_size = input_length - cov_value.shape[0]
past_covariates[cov_key] = F.pad(
cov_value, (pad_size, 0)
)
else:
raise ValueError(
f"Individual `past_covariates` must be 1-d with length equal to the length of `target` (= {input_length}), found: {cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}."
)

# Check 'future_covariates' if it exists (optional)
future_covariates = input_dict.get("future_covariates", {})
Expand All @@ -134,19 +159,52 @@ def preprocess(
)
# If future_covariates exists, check if they are a subset of past_covariates
if future_covariates:
for cov_key, cov_value in future_covariates.items():
for cov_key, cov_value in list(future_covariates.items()):
# If any future_covariate not found in past_covariates, ignore it or raise an error.
if cov_key not in past_covariates:
raise ValueError(
f"Key '{cov_key}' in 'future_covariates' is not in 'past_covariates' at index {idx}."
)
if auto_adapt:
future_covariates.pop(cov_key)
logger.warning(
f"Future covariate {cov_key} not found in past_covariates {list(past_covariates.keys())}, which will be ignored when executing forecasting."
)
if not future_covariates:
input_dict.pop("future_covariates")
continue
else:
raise ValueError(
f"Expected keys in `future_covariates` to be a subset of `past_covariates` {list(past_covariates.keys())}, "
f"but found {cov_key} in element at index {idx}."
)
if not isinstance(cov_value, torch.Tensor):
raise ValueError(
f"Each value in 'future_covariates' must be torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
)
if cov_value.ndim != 1 or cov_value.shape[0] != output_length:
if cov_value.ndim != 1:
raise ValueError(
f"Each covariate in 'future_covariates' must have shape ({output_length},), but got shape {cov_value.shape} for key '{cov_key}' at index {idx}."
f"Individual `future_covariates` must be 1-d, found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
)
# If any future_covariate's length is not equal to output_length, process it accordingly.
if cov_value.shape[0] != output_length:
if auto_adapt:
if cov_value.shape[0] > output_length:
logger.warning(
f"Future covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (> {output_length}), which will be truncated from the end."
)
future_covariates[cov_key] = cov_value[
:output_length
]
else:
logger.warning(
f"Future covariate {cov_key} at index {idx} has length {cov_value.shape[0]} (< {output_length}), which will be padded with zeros at the end."
)
pad_size = output_length - cov_value.shape[0]
future_covariates[cov_key] = F.pad(
cov_value, (0, pad_size)
)
else:
raise ValueError(
f"Individual `future_covariates` must be 1-d with length equal to `output_length` (= {output_length}), found: {cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}."
)
else:
raise ValueError(
f"The inputs must be a list of dictionaries, but got {type(inputs)}."
Expand Down
8 changes: 0 additions & 8 deletions iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ def __init__(
password: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_password(),
time_zone: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_time_zone(),
use_rate: float = 1.0,
offset_rate: float = 0.0,
):
Expand All @@ -90,7 +87,6 @@ def __init__(
node_urls=[f"{ip}:{port}"],
user=username,
password=password,
zone_id=time_zone,
use_ssl=AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_ssl_enabled(),
Expand Down Expand Up @@ -258,9 +254,6 @@ def __init__(
password: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_password(),
time_zone: str = AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_time_zone(),
use_rate: float = 1.0,
offset_rate: float = 0.0,
):
Expand All @@ -272,7 +265,6 @@ def __init__(
node_urls=[f"{ip}:{port}"],
username=username,
password=password,
time_zone=time_zone,
use_ssl=AINodeDescriptor()
.get_config()
.get_ain_cluster_ingress_ssl_enabled(),
Expand Down
116 changes: 64 additions & 52 deletions iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,66 @@ def _process_request(self, req):
with self._result_wrapper_lock:
del self._result_wrapper_map[req_id]

def _do_inference_and_construct_resp(
self,
model_id: str,
model_inputs_list: list[dict[str, torch.Tensor | dict[str, torch.Tensor]]],
output_length: int,
inference_attrs: dict,
**kwargs,
) -> list[bytes]:
auto_adapt = kwargs.get("auto_adapt", True)
if (
output_length
> AINodeDescriptor().get_config().get_ain_inference_max_output_length()
):
raise NumericalRangeException(
"output_length",
output_length,
1,
AINodeDescriptor().get_config().get_ain_inference_max_output_length(),
)

if self._pool_controller.has_running_pools(model_id):
infer_req = InferenceRequest(
req_id=generate_req_id(),
model_id=model_id,
inputs=torch.stack(
[data["targets"] for data in model_inputs_list], dim=0
),
output_length=output_length,
)
outputs = self._process_request(infer_req)
else:
model_info = self._model_manager.get_model_info(model_id)
inference_pipeline = load_pipeline(
model_info, device=self._backend.torch_device("cpu")
)
inputs = inference_pipeline.preprocess(
model_inputs_list,
output_length=output_length,
auto_adapt=auto_adapt,
)
if isinstance(inference_pipeline, ForecastPipeline):
outputs = inference_pipeline.forecast(
inputs, output_length=output_length, **inference_attrs
)
elif isinstance(inference_pipeline, ClassificationPipeline):
outputs = inference_pipeline.classify(inputs)
elif isinstance(inference_pipeline, ChatPipeline):
outputs = inference_pipeline.chat(inputs)
else:
outputs = None
logger.error("[Inference] Unsupported pipeline type.")
outputs = inference_pipeline.postprocess(outputs)

# convert tensor into tsblock for the output in each batch
resp_list = []
for batch_idx, output in enumerate(outputs):
resp = convert_tensor_to_tsblock(output)
resp_list.append(resp)
return resp_list

def _run(
self,
req,
Expand All @@ -191,65 +251,17 @@ def _run(
inference_attrs = extract_attrs(req)
output_length = int(inference_attrs.pop("output_length", 96))

# model_inputs_list: Each element is a dict, which contains the following keys:
# `targets`: The input tensor for the target variable(s), whose shape is [target_count, input_length].
model_inputs_list: list[
dict[str, torch.Tensor | dict[str, torch.Tensor]]
] = [{"targets": inputs[0]}]

if (
output_length
> AINodeDescriptor().get_config().get_ain_inference_max_output_length()
):
raise NumericalRangeException(
"output_length",
output_length,
1,
AINodeDescriptor()
.get_config()
.get_ain_inference_max_output_length(),
)

if self._pool_controller.has_running_pools(model_id):
infer_req = InferenceRequest(
req_id=generate_req_id(),
model_id=model_id,
inputs=torch.stack(
[data["targets"] for data in model_inputs_list], dim=0
),
output_length=output_length,
)
outputs = self._process_request(infer_req)
else:
model_info = self._model_manager.get_model_info(model_id)
inference_pipeline = load_pipeline(
model_info, device=self._backend.torch_device("cpu")
)
inputs = inference_pipeline.preprocess(
model_inputs_list, output_length=output_length
)
if isinstance(inference_pipeline, ForecastPipeline):
outputs = inference_pipeline.forecast(
inputs, output_length=output_length, **inference_attrs
)
elif isinstance(inference_pipeline, ClassificationPipeline):
outputs = inference_pipeline.classify(inputs)
elif isinstance(inference_pipeline, ChatPipeline):
outputs = inference_pipeline.chat(inputs)
else:
outputs = None
logger.error("[Inference] Unsupported pipeline type.")
outputs = inference_pipeline.postprocess(outputs)

# convert tensor into tsblock for the output in each batch
output_list = []
for batch_idx, output in enumerate(outputs):
output = convert_tensor_to_tsblock(output)
output_list.append(output)
resp_list = self._do_inference_and_construct_resp(
model_id, model_inputs_list, output_length, inference_attrs
)

return resp_cls(
get_status(TSStatusCode.SUCCESS_STATUS),
[output_list[0]] if single_batch else output_list,
[resp_list[0]] if single_batch else resp_list,
)

except Exception as e:
Expand Down
4 changes: 0 additions & 4 deletions iotdb-core/ainode/resources/conf/iotdb-ainode.properties
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ ain_cluster_ingress_username=root
# Datatype: String
ain_cluster_ingress_password=root

# The time zone of the IoTDB cluster.
# Datatype: String
ain_cluster_ingress_time_zone=UTC+8

# The device space allocated for inference
# Datatype: Float
ain_inference_memory_usage_ratio=0.2
Expand Down
3 changes: 2 additions & 1 deletion iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ struct TForecastReq {
3: required i32 outputLength
4: optional string historyCovs
5: optional string futureCovs
6: optional map<string, string> options
6: optional bool autoAdapt
7: optional map<string, string> options
}

struct TForecastResp {
Expand Down