From ddf31fd4cc91896ea670fd5d18ca3cdf2eda8d48 Mon Sep 17 00:00:00 2001 From: liuyang Date: Fri, 16 Jan 2026 14:30:57 +0800 Subject: [PATCH] fix ext func caching --- native-engine/auron-planner/src/planner.rs | 9 +-- .../datafusion-ext-functions/src/lib.rs | 67 +++++++++++++++++-- 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/native-engine/auron-planner/src/planner.rs b/native-engine/auron-planner/src/planner.rs index 2a34e7cf9..128a89016 100644 --- a/native-engine/auron-planner/src/planner.rs +++ b/native-engine/auron-planner/src/planner.rs @@ -906,17 +906,12 @@ impl PhysicalPlanner { let fun_name = &e.name; let fun = datafusion_ext_functions::create_auron_ext_function( fun_name, - self.partition_id, - )?; - Arc::new(create_udf( - &format!("spark_ext_function_{fun_name}"), args.iter() .map(|e| e.data_type(input_schema)) .collect::, _>>()?, convert_required!(e.return_type)?, - Volatility::Volatile, - fun, - )) + )?; + Arc::new(ScalarUDF::from(fun)) } else { let scalar_udf: Arc = scalar_function.into(); scalar_udf diff --git a/native-engine/datafusion-ext-functions/src/lib.rs b/native-engine/datafusion-ext-functions/src/lib.rs index a65dc0d44..bcdde400b 100644 --- a/native-engine/datafusion-ext-functions/src/lib.rs +++ b/native-engine/datafusion-ext-functions/src/lib.rs @@ -13,9 +13,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{any::Any, fmt, sync::Arc}; -use datafusion::{common::Result, logical_expr::ScalarFunctionImplementation}; +use datafusion::{ + arrow::datatypes::DataType, + logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDFImpl, Signature, + Volatility, + }, +}; use datafusion_ext_commons::df_unimplemented_err; mod brickhouse; @@ -38,12 +44,13 @@ mod spark_unscaled_value; #[allow(clippy::panic)] // Temporarily allow panic to refactor to Result later pub fn create_auron_ext_function( name: &str, - spark_partition_id: usize, -) -> Result { + input_types: Vec, + return_type: DataType, +) -> datafusion::common::Result { // auron ext functions, if used for spark should be start with 'Spark_', // if used for flink should be start with 'Flink_', // same to other engines. - Ok(match name { + let fun: ScalarFunctionImplementation = match name { "Placeholder" => Arc::new(|_| panic!("placeholder() should never be called")), "Spark_NullIf" => Arc::new(spark_null_if::spark_null_if), "Spark_NullIfZero" => Arc::new(spark_null_if::spark_null_if_zero), @@ -86,5 +93,55 @@ pub fn create_auron_ext_function( } "Spark_IsNaN" => Arc::new(spark_isnan::spark_isnan), _ => df_unimplemented_err!("spark ext function not implemented: {name}")?, + }; + + Ok(AuronExtFunction { + name: name.to_string(), + signature: Signature::exact(input_types, Volatility::Volatile), + return_type, + fun, }) } + +pub struct AuronExtFunction { + name: String, + signature: Signature, + return_type: DataType, + fun: ScalarFunctionImplementation, +} + +impl fmt::Debug for AuronExtFunction { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AuronExtFunction") + .field("name", &self.name) + .field("signature", &self.signature) + .field("return_type", &self.return_type) + .field("fun", &"") + .finish() + } +} + +impl ScalarUDFImpl for AuronExtFunction { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion::common::Result { + Ok(self.return_type.clone()) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + (self.fun)(&args.args) + } +}