Description
This is a follow on issue for #8974
In Spark we currently support min and max aggregations for nested structs and lists. From a performance standpoint sorting is very expensive and it would be super great if there was a way for us to speed up these types of operations. We think that a hash aggregate would be a huge win. As an example the following spark queries are exactly the same. The only difference is that a long value is stored as a top level column vs being stored in a struct. The performance difference is 9x.
scala> spark.time(spark.range(10000000000L).selectExpr("id % 2 as k1", "CAST(id % 3 as STRING) as k2", "id as v").groupBy("k1", "k2").agg(max(col("v"))).orderBy("k1", "k2").show())
+---+---+----------+
| k1| k2| max(v)|
+---+---+----------+
| 0| 0|9999999996|
| 0| 1|9999999994|
| 0| 2|9999999998|
| 1| 0|9999999999|
| 1| 1|9999999997|
| 1| 2|9999999995|
+---+---+----------+
Time taken: 4252 ms
scala> spark.time(spark.range(10000000000L).selectExpr("id % 2 as k1", "CAST(id % 3 as STRING) as k2", "struct(id) as v").groupBy("k1", "k2").agg(max(col("v"))).orderBy("k1", "k2").show())
+---+---+------------+
| k1| k2| max(v)|
+---+---+------------+
| 0| 0|{9999999996}|
| 0| 1|{9999999994}|
| 0| 2|{9999999998}|
| 1| 0|{9999999999}|
| 1| 1|{9999999997}|
| 1| 2|{9999999995}|
+---+---+------------+
Time taken: 39208 ms
That said the goal is performance, not to implement this just to implement it. I also realize that this is a very contrived case, and a struct with a single value in it is not the goal. The goal is to speed up simple min/max aggregations for nested types. I also don't expect the performance of a nested min/max to match the performance of something that can support atomic operations as a part of the aggregation.
Activity