Conversation
d290657 to
25db829
Compare
There was a problem hiding this comment.
Pull request overview
This PR attempts to introduce function caching for external functions using Rust's OnceLock to avoid repeatedly wrapping function pointers in Arc. A cache! macro is introduced to replace direct Arc::new() calls for all Spark extension functions (except the "Placeholder" function).
Changes:
- Added
OnceLockimport and created acache!macro to cache function implementations - Replaced 30+ direct
Arc::new(function)calls withcache!(function)macro invocations
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| macro_rules! cache { | ||
| ($func:path) => {{ | ||
| static CELL: OnceLock<ScalarFunctionImplementation> = OnceLock::new(); | ||
| CELL.get_or_init(|| Arc::new($func)).clone() | ||
| }}; | ||
| } |
There was a problem hiding this comment.
The cache macro has a critical bug: all invocations share the same static CELL variable. This means that once any function is cached in the first call, all subsequent function lookups will return that same cached function, regardless of which function was requested.
For example, if "Spark_NullIf" is called first, it will cache spark_null_if::spark_null_if. Then when "Spark_NullIfZero" is called, it will return the same spark_null_if::spark_null_if function instead of spark_null_if::spark_null_if_zero.
To fix this, each invocation needs its own unique static variable. This can be achieved by making the static variable name unique per function path, or by using a different caching approach such as a global HashMap with function names as keys.
There was a problem hiding this comment.
In rust, statics inside functions or blocks are not global singletons sharing the same name; they are local singletons unique to that specific scope instantiation. In this case, it is to that specific matching arm.
Just running this test in AuronQuerySuite
test("my cache test") {
withTable("my_cache_table") {
sql("""
|create table my_cache_table using parquet as
|select col1, col2 from values ('a,A', '{"a":"1", "b":"2"}'), ('b,B', '{"a":"3", "b":"4"}'), ('c,C', '{"a":"5", "b":"6"}')
|""".stripMargin)
sql("""
|select split(col1, ',')[0],
| split(col1, ',')[1],
| get_json_object(col2, '$.a'),
| get_json_object(col2, '$.b')
|from my_cache_table
|""".stripMargin).show()
}
}
we can see the following correct answer.
+---------------------+---------------------+--------------------------+--------------------------+
|split(col1, ,, -1)[0]|split(col1, ,, -1)[1]|get_json_object(col2, $.a)|get_json_object(col2, $.b)|
+---------------------+---------------------+--------------------------+--------------------------+
| a| A| 1| 2|
| b| B| 3| 4|
| c| C| 5| 6|
+---------------------+---------------------+--------------------------+--------------------------+
It can handle different ext function StringSplit, GetParsedJsonObject and ParseJson.
ProjectExec [
(spark_ext_function_Spark_StringSplit(#2@0, ,)).[1] AS #16,
(spark_ext_function_Spark_StringSplit(#2@0, ,)).[2] AS #17, spark_ext_function_Spark_GetParsedJsonObject(spark_ext_function_Spark_ParseJson(#3@1), $.a) AS #18,
spark_ext_function_Spark_GetParsedJsonObject(spark_ext_function_Spark_ParseJson(#3@1), $.b) AS #19
], schema=[#16:Utf8;N, #17:Utf8;N, #18:Utf8;N, #19:Utf8;N]
There was a problem hiding this comment.
i think a wrapper for ScalarFunctionImplementation with overriden PartialEq/Eq implementation is better
453831c to
ec78595
Compare
ec78595 to
ddf31fd
Compare
Which issue does this PR close?
Closes #1902
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?
How was this patch tested?