Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions adapta/storage/delta_lake/v3/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def load( # pylint: disable=R0913
columns: Optional[List[str]] = None,
batch_size: Optional[int] = None,
partition_filter_expressions: Optional[List[Tuple]] = None,
limit: Optional[int] = None,
) -> Union[MetaFrame, Iterator[MetaFrame]]:
"""
Loads Delta Lake table from Azure or AWS storage and converts it to a pandas dataframe.
Expand All @@ -60,6 +61,7 @@ def load( # pylint: disable=R0913

:param columns: Optional list of columns to select when reading. Defaults to all columns of not provided.
:param batch_size: Optional batch size when reading in batches. If not set, whole table will be loaded into memory.
:param limit: Optional limit on number of rows to read.
:param partition_filter_expressions: Optional partitions filters. Examples:

partition_filter_expressions = [("day", "=", "3")]
Expand All @@ -82,6 +84,9 @@ def load( # pylint: disable=R0913
filesystem=auth_client.get_pyarrow_filesystem(path),
)

if limit:
pyarrow_ds = pyarrow_ds.head(limit)

row_filter = (
compile_expression(row_filter, ArrowFilterExpression) if isinstance(row_filter, Expression) else row_filter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ def _map_to_column(
polars.Datetime(time_unit="us"): (columns.DateTime,),
polars.Datetime(time_unit="ns"): (columns.DateTime,),
polars.Datetime(time_unit="ms"): (columns.DateTime,),
polars.Datetime(time_unit="ms", time_zone="UTC"): (columns.DateTime,),
polars.Datetime(time_unit="us", time_zone="UTC"): (columns.DateTime,),
}

column_type = mapping.get(type_to_map, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from dataclasses import asdict
from typing import Optional, Dict, TypeVar, Callable, Type, List, Any, Union

from polars.polars import ComputeError

try:
from _socket import IPPROTO_TCP, TCP_NODELAY, TCP_USER_TIMEOUT
except ImportError:
Expand Down Expand Up @@ -62,7 +64,7 @@
from adapta.storage.distributed_object_store.v3.datastax_astra._models import SimilarityFunction, VectorSearchQuery
from adapta.storage.models.filter_expression import Expression, AstraFilterExpression, compile_expression
from adapta.utils import chunk_list, rate_limit
from adapta.utils.metaframe import MetaFrame, concat
from adapta.utils.metaframe import MetaFrame, concat, PolarsOptions
from adapta.storage.distributed_object_store.v3.datastax_astra._model_mappers import get_mapper
from adapta.schema_management.schema_entity import PythonSchemaEntity
from adapta.storage.models.enum import QueryEnabledStoreOptions
Expand Down Expand Up @@ -241,6 +243,7 @@ def filter_entities(
deduplicate=False,
num_threads: Optional[int] = None,
options: dict[QueryEnabledStoreOptions, any] = None,
limit: Optional[int] = None,
) -> MetaFrame:
"""
Run a filter query on the entity of type TModel backed by table `table_name`.
Expand All @@ -265,6 +268,7 @@ class Test:
:param: custom_indexes: An optional list of custom indexes, if it cannot be inferred, if it cannot be inferred from the data model.
:param: deduplicate: Optionally deduplicate query result, for example when only the partition key part of a primary key is used to fetch results.
:param: num_threads: Optionally run filtering using multiple threads. Setting this to -1 will cause this method to automatically evaluate number of threads based on filter expression size.
:param: limit: Optionally limit the number of results returned. NOTE the limit works per call to Astra and not on the final result.
"""

@on_exception(
Expand All @@ -278,10 +282,11 @@ class Test:
raise_on_giveup=True,
)
def apply(model: Type[Model], key_column_filter: Dict[str, Any], columns_to_select: Optional[List[str]]):
model = model.filter(**key_column_filter).limit(limit)
if columns_to_select:
return model.filter(**key_column_filter).only(select_columns)
return model.only(select_columns)

return model.filter(**key_column_filter)
return model

def normalize_column_name(column_name: str) -> str:
filter_suffix = re.findall(self._filter_pattern, column_name)
Expand All @@ -293,9 +298,16 @@ def normalize_column_name(column_name: str) -> str:
def to_frame(
model: Type[Model], key_column_filter: Dict[str, Any], columns_to_select: Optional[List[str]]
) -> MetaFrame:
def convert_to_polars(x: list[dict]) -> polars.DataFrame:
try:
return polars.DataFrame(x, schema=select_columns)
except ComputeError:
# Catches errors related to incorrect schema inference and tries again with unlimited schema inference length
return polars.DataFrame(x, schema=select_columns, infer_schema_length=None)

return MetaFrame(
[dict(v.items()) for v in list(apply(model, key_column_filter, columns_to_select))],
convert_to_polars=lambda x: polars.DataFrame(x, schema=select_columns),
convert_to_polars=convert_to_polars,
convert_to_pandas=lambda x: pandas.DataFrame(x, columns=select_columns),
)

Expand Down
13 changes: 10 additions & 3 deletions adapta/storage/query_enabled_store/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ def _apply_filter(
filter_expression: Expression,
columns: list[str],
options: dict[QueryEnabledStoreOptions, any] | None = None,
limit: Optional[int] = 10000,
) -> Union[MetaFrame, Iterator[MetaFrame]]:
"""
Applies the provided filter expression to this Store and returns the result in a pandas DataFrame
Applies the provided filter expression to this Store and returns the result in a MetaFrame
"""

@abstractmethod
def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]:
"""
Applies a plaintext query to this Store and returns the result in a pandas DataFrame
Applies a plaintext query to this Store and returns the result in a MetaFrame
"""

@classmethod
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(self, store: QueryEnabledStore, path: DataPath):
self._filter_expression: Optional[Expression] = None
self._columns: list[str] = []
self._options: dict[QueryEnabledStoreOptions, any] = {}
self._limit = 10000

def filter(self, filter_expression: Expression) -> "QueryConfigurationBuilder":
"""
Expand All @@ -170,6 +172,11 @@ def add_options(self, option_key: QueryEnabledStoreOptions, option_value: any) -

self._options[option_key] = option_value

def limit(self, limit: int) -> "QueryConfigurationBuilder":
"""
Limit the number of results returned by the underlying store.
"""
self._limit = limit
return self

def read(self) -> Union[MetaFrame, Iterator[MetaFrame]]:
Expand All @@ -180,5 +187,5 @@ def read(self) -> Union[MetaFrame, Iterator[MetaFrame]]:
path=self._path,
filter_expression=self._filter_expression,
columns=self._columns,
options=self._options,
limit=self._limit,
)
3 changes: 3 additions & 0 deletions adapta/storage/query_enabled_store/_qes_astra.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def _apply_filter(
filter_expression: Expression,
columns: list[str],
options: dict[QueryEnabledStoreOptions, any] | None = None,
limit: Optional[int] = 10000,
) -> Union[MetaFrame, Iterator[MetaFrame]]:
assert isinstance(path, AstraPath)
astra_path: AstraPath = path
Expand All @@ -100,6 +101,7 @@ def _apply_filter(
select_columns=columns,
num_threads=-1, # auto-infer, see method documentation
options=options,
limit=limit,
)

return self._astra_client.filter_entities(
Expand All @@ -110,6 +112,7 @@ def _apply_filter(
select_columns=columns,
num_threads=-1, # auto-infer, see method documentation
options=options,
limit=limit,
)

def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]:
Expand Down
2 changes: 2 additions & 0 deletions adapta/storage/query_enabled_store/_qes_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ def _apply_filter(
filter_expression: Expression,
columns: list[str],
options: dict[QueryEnabledStoreOptions, any] | None = None,
limit: Optional[int] = 10000,
) -> Union[MetaFrame, Iterator[MetaFrame]]:
return load(
auth_client=self.credentials.auth_client(credentials=self.credentials.auth_client_credentials()),
path=path,
row_filter=filter_expression,
columns=columns if columns else None,
limit=limit,
)

def _apply_query(self, query: str) -> Union[MetaFrame, Iterator[MetaFrame]]:
Expand Down
Loading