@@ -21,19 +21,24 @@ use std::sync::Arc;
2121use arrow:: datatypes:: SchemaRef ;
2222use arrow:: pyarrow:: ToPyArrow ;
2323use async_trait:: async_trait;
24- use datafusion:: catalog:: Session ;
24+ use datafusion:: catalog:: { Session , TableProviderFactory } ;
2525use datafusion:: common:: Column ;
2626use datafusion:: datasource:: { TableProvider , TableType } ;
27- use datafusion:: logical_expr:: { Expr , LogicalPlanBuilder , TableProviderFilterPushDown } ;
27+ use datafusion:: logical_expr:: {
28+ CreateExternalTable , Expr , LogicalPlanBuilder , TableProviderFilterPushDown ,
29+ } ;
2830use datafusion:: physical_plan:: ExecutionPlan ;
2931use datafusion:: prelude:: DataFrame ;
32+ use datafusion_ffi:: proto:: logical_extension_codec:: FFI_LogicalExtensionCodec ;
3033use pyo3:: IntoPyObjectExt ;
3134use pyo3:: prelude:: * ;
3235
3336use crate :: context:: PySessionContext ;
3437use crate :: dataframe:: PyDataFrame ;
3538use crate :: dataset:: Dataset ;
36- use crate :: utils:: table_provider_from_pycapsule;
39+ use crate :: errors;
40+ use crate :: expr:: create_external_table:: PyCreateExternalTable ;
41+ use crate :: utils;
3742
3843/// This struct is used as a common method for all TableProviders,
3944/// whether they refer to an FFI provider, an internally known
@@ -91,7 +96,7 @@ impl PyTable {
9196 Some ( session) => session,
9297 None => PySessionContext :: global_ctx ( ) ?. into_bound_py_any ( obj. py ( ) ) ?,
9398 } ;
94- table_provider_from_pycapsule ( obj. clone ( ) , session) ?
99+ utils :: table_provider_from_pycapsule ( obj. clone ( ) , session) ?
95100 } {
96101 Ok ( PyTable :: from ( provider) )
97102 } else {
@@ -206,3 +211,51 @@ impl TableProvider for TempViewTable {
206211 Ok ( vec ! [ TableProviderFilterPushDown :: Exact ; filters. len( ) ] )
207212 }
208213}
214+
215+ #[ derive( Debug ) ]
216+ pub ( crate ) struct RustWrappedPyTableProviderFactory {
217+ pub ( crate ) table_provider_factory : Py < PyAny > ,
218+ pub ( crate ) codec : Arc < FFI_LogicalExtensionCodec > ,
219+ }
220+
221+ impl RustWrappedPyTableProviderFactory {
222+ pub fn new ( table_provider_factory : Py < PyAny > , codec : Arc < FFI_LogicalExtensionCodec > ) -> Self {
223+ Self {
224+ table_provider_factory,
225+ codec,
226+ }
227+ }
228+
229+ fn create_inner (
230+ & self ,
231+ cmd : CreateExternalTable ,
232+ codec : Bound < PyAny > ,
233+ ) -> PyResult < Arc < dyn TableProvider > > {
234+ Python :: attach ( |py| {
235+ let provider = self . table_provider_factory . bind ( py) ;
236+ let cmd = PyCreateExternalTable :: from ( cmd) ;
237+
238+ provider
239+ . call_method1 ( "create" , ( cmd, ) )
240+ . and_then ( |t| PyTable :: new ( t, Some ( codec) ) )
241+ . map ( |t| t. table ( ) )
242+ } )
243+ }
244+ }
245+
246+ #[ async_trait]
247+ impl TableProviderFactory for RustWrappedPyTableProviderFactory {
248+ async fn create (
249+ & self ,
250+ _: & dyn Session ,
251+ cmd : & CreateExternalTable ,
252+ ) -> datafusion:: common:: Result < Arc < dyn TableProvider > > {
253+ Python :: attach ( |py| {
254+ let codec = utils:: create_logical_extension_capsule ( py, self . codec . as_ref ( ) )
255+ . map_err ( errors:: to_datafusion_err) ?;
256+
257+ self . create_inner ( cmd. clone ( ) , codec. into_any ( ) )
258+ . map_err ( errors:: to_datafusion_err)
259+ } )
260+ }
261+ }
0 commit comments