Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fast min/max accumulator for binary / strings (now it uses the slower path) #6906

Open
alamb opened this issue Jul 10, 2023 · 23 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 10, 2023

Is your feature request related to a problem or challenge?

#6904 introduces some fancy new hashing and ways to implement aggregates

See related blog post: https://datafusion.apache.org/blog/2023/08/05/datafusion_fast_grouping/

min/max for strings (StringArray / LargeStringArray, etc) now uses the slower Accumulator implementation which could be made much faster

Describe the solution you'd like

I would like to implement a fast GroupsAccumulator for Min/Max

Describe alternatives you've considered

here is one potential way to implement it:

We could store the current minimum for all groups in the same Rows 🤔 and track an index into that Rows for the current minimum for each group.

This would require an extra copy of the input values, but it could probably be vectorized pretty well, as shown in the following diagram.

Sorry what I meant was something like the following where the accumulator only stored the current minimum values.

This approach would potentially end up with min_storage being full of "garbage" if many batches had new minumums, but I think we could heuristically "compact" min_storage (if it had 2*num_groups, for example) if it got too large

                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  
                                                                           
                                    │           Accumulator             │  
                                                state                      
┌─────────┐       ┌─────────┐       │ ┌─────────┐          ┌─────────┐  │  
│ ┌─────┐ │       │ ┌─────┐ │         │ ┌─────┐ │          │ ┌─────┐ │     
│ │  A  │ │       │ │  A  │─┼────┐  │ │ │  D  │ │    ┌─────┼─│  1  │ │  │  
│ ├─────┤ │       │ ├─────┤ │    │    │ ├─────┤ │    │     │ ├─────┤ │     
│ │  B  │ │       │ │  B  │ │    └──┼─┼▶│  A  │◀┼────┘     │ │  0  │ │  │  
│ ├─────┤ │       │ ├─────┤ │         │ └─────┘ │          │ └─────┘ │     
│ │  A  │ │       │ │  A  │ │       │ │         │          │         │  │  
│ ├─────┤ │       │ ├─────┤ │         │         │          │         │     
│ │  A  │ │       │ │  A  │ │       │ │         │          │         │  │  
│ ├─────┤ │       │ ├─────┤ │         │         │          │         │     
│ │  C  │ │       │ │  C  │ │       │ │         │          │         │  │  
│ └─────┘ │       │ └─────┘ │         │         │          │         │     
└─────────┘       └─────────┘       │ └─────────┘          └─────────┘  │  
                                                                           
  input              input          │ min_storage:         min_values   │  
  values             values           Rows                                 
  (Array)            (Rows)         └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  
          step 1:           step 2: for                                    
          convert           any value                   step 3: min value  
          arguments to      that is a new               (per group) is     
          Row format        group                       tracked as an      
                            minimum, copy               index into         
                            it to a                     min_storage `Rows` 
                            second `Rows`                                  
                                                                           

See #6800 (comment) for more details

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jul 10, 2023
@alamb alamb changed the title Implement fast min/max accumulator for strings (now it uses the slower path) Implement fast min/max accumulator for binary / strings (now it uses the slower path) Jan 8, 2024
@alamb
Copy link
Contributor Author

alamb commented Jan 8, 2024

One observation here is that min and max on strings is not that common of an operation from what it seems -- grouping on strings is more common.

Maybe there is some binary usecase where it is important (e.g. embeddings 🤔 )

@devanbenz
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@alamb
Copy link
Contributor Author

alamb commented Aug 26, 2024

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

I dont know of anyone working on this @devanbenz -- but I also don't know of any benchmarks or actual queries that use min / max on string columns. The place it shows up is when computing statistics when writing parquet, but I think parquet is already pretty good at this (and has its own code to compute min/max)

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

It actually turns out that Min / Max on string/binary columns are in several ClickBench queries:

SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;

SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

I don't think min/max have appeared as priorities in benchmarking before because the queries in question are doing other string heavy operations that tend to dominate. Thus the use of GroupsAccumulatorAdaptor for Min/Max on strings, while bad, is overshadowed by other things

However, while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow. We can likely restore their speed to something similar to apache/arrow-rs#6408 but I also think this is a good time to actually make Min / Max on strings faster. I will write up some ideas on how to do this

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

Background

(I will make a PR shortly to add this to the actual datafusion docs)

GroupsAccumulator logically does this:

      ┌─────┐                            
      │  0  │───────────▶   "A"          
      ├─────┤                            
      │  1  │───────────▶   "Z"          
      └─────┘                            
        ...                 ...          
      ┌─────┐                            
      │ N-2 │               "A"          
      ├─────┤                            
      │ N-1 │───────────▶   "Q"          
      └─────┘                            
                                         
                                         
    Logical group      Current Min/Max   
       number          value for that    
                       group             
                                         
                                         
                                         
GroupsAccumulator to store N aggregate   
values: logically keepa a mapping from   
each group index to the current value                                        

Today, String / Binary min/max values are implemented using GroupsAccumulatorAdapter which results in

                                                              Individual String
                                                              (separate        
                                                              allocation)      
                                                                               
   ┌─────┐            ┌──────────────────────────┐                             
   │  0  │───────────▶│  ScalarValue::Utf8("A")  ├──────────▶   "A"            
   ├─────┤            ├──────────────────────────┤                             
   │  1  │───────────▶│  ScalarValue::Utf8("Z")  │──────────▶   "Z"            
   └─────┘            └──────────────────────────┘                             
     ...                 ...                                    ...            
   ┌─────┐            ┌──────────────────────────┐                             
   │ N-2 │            │  ScalarValue::Utf8("A")  │──────────▶   "A"            
   ├─────┤            ├──────────────────────────┤                             
   │ N-1 │───────────▶│  ScalarValue::Utf8("Q")  │──────────▶   "Q"            
   └─────┘            └──────────────────────────┘                             
                                                                               
                                                                               
 Logical group         Current Min/Max value for that group stored             
    number             as a ScalarValue which points to an                     
                       indivdually allocated String                            
                                                                               
                                                                               
                                                                               
   How GroupsAccumulatorAdaptor works today:                                   
   stores each current min/max as a                                            
   ScalarValue                                                                 
                                                                               

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

Potential Design

One high level idea is to build a data structure that uses the same internal format (views/buffers) as StringViewArray in Arrow:

                  ┌───────────────────────────────────────────┐  
                  │                        Stored in Vec<u8>  │  
                  │   Stored in a         ┌─────────────────┐ │  
                  │     Vec<u128>         │ ┌───────────┐   │ │  
                  │ ┌─────────────┐    ─ ─│▶│some value │   │ │  
   ┌─────┐        │ │ ┌─────────┐ │   │   │ └───────────┘   │ │  
   │  0  │────────┼─┼▶│  View   │ │       │                 │ │  
   ├─────┤        │ │ ├─────────┤ │   │   │                 │ │  
   │  1  │────────┼─┼▶│  View   │─│─ ─ ┐  │      ...        │ │  
   └─────┘        │ │ └─────────┘ │   │   │                 │ │  
     ...          │ │    ...      │    │  │                 │ │  
   ┌─────┐        │ │ ┌─────────┐ │   │   │                 │ │  
   │ N-2 │        │ │ │  View   │─│─ ─ │  │   ┌────────────┐│ │  
   ├─────┤        │ │ ├─────────┤ │     ─ ┼ ─▶│other value ││ │  
   │ N-1 │────────┼─┼▶│  View   │ │       │   └────────────┘│ │  
   └─────┘        │ │ └─────────┘ │       └─────────────────┘ │  
                  │ └─────────────┘  String values are stored │  
                  │                  inline or in extra byte  │  
 Logical group    │                  buffer                   │  
    number        └───────────────────────────────────────────┘  
                   New structure: MutableStringViewBuilder       
                                                                 
                   Current Min/Max value for that group stored in
                   same format as StringViewArray                
                                                                 

In this design, the current value for each group is stored in two parts (as described on arrow docs)

  1. a fixed size u128 view
  2. a variable length part with the string data

As new batches are updated, each View is updated if necessary

Benefits of design:

  1. Hopefully use the same code as in arrow-rs
  2. Allows Zero copy conversion to StringView / BinaryView at output
  3. Use inlined values for quick min/max comparison

I believe (though we will have to verify it) that the conversion from MutableStringViewBuilder to just StringArray (not StringViewArray) should also be better than the current GroupsAccumulatorAdapter. Both conversions need to copy the string bytes again into the packed StringArray format, but the GroupsAccumulatorAdapter also has to allocate/free owned Strings as well

Potential challenges

I think the trickiest part of this code, other the low level code optimizations is that as min/max values are replaced, data in the variable length buffer will become "garbage" (not reachable) thus consuming more memory than necessary:

                        Stored in Vec<u8>   
                     ┌────────────────────┐ 
                     │ ┌────────────────┐ │ 
┌─────────────┐      │ │prev max value 1│ │ 
│ ┌─────────┐ │      │ └────────────────┘ │ 
│ │  View   │─│─ ┐   │        ...         │ 
│ └─────────┘ │      │ ┌────────────────┐ │ 
│     ...     │  │   │ │prev max value m│ │ 
│             │      │ └────────────────┘ │ 
│             │  │   │ ┌────────────────┐ │ 
│             │   ─ ─│▶│prev max value m│ │ 
│             │      │ └────────────────┘ │ 
│             │      │        ...         │ 
│             │      └────────────────────┘ 
│             │                             
└─────────────┘      Previous min/max values
                     are not pointed to     
                     anymore and need to be 
                     cleaned up             

I think this means the code will need something GenericByteViewArray::gc run occasionally

Random Thoughts

Thoughts: maybe this structure (MutableStringViewBuilder??) could be upstreamed eventually

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

@devanbenz
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

Great! I'll get started on this later in the week/over the weekend :) Will likely bug folks as I require help haha 😆

@devanbenz
Copy link
Contributor

take

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

In terms of implementation, what I suggest is:

  1. Do a POC implementaiton: wire up just enough `StringView, don't worry about GC, basic unit tests
  2. Verify it makes the clickbench query faster
  3. Flesh out testing, documentation, add support for StringArrary, etc
  4. Merge and profit (bonus points for blogging about it)

For the POC here is the reproducer I recommend:

Step 1. Get hits_partitioned using bench.sh:

cd benchmarks
./bench.sh data clickbench_partitioned

Step 2: Prepare a script with reproducer query:

set datafusion.execution.parquet.schema_force_view_types = true;


SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2/benchmarks/data$ cat q28.sql
set datafusion.execution.parquet.schema_force_view_types = true;

SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

Step 3: Run script (with release build of datafusion-cli):

datafusion-cli -f q28.sql
  • set datafusion.execution.parquet.schema_force_view_types = true; --> Elapsed 18.431 seconds.
  • set datafusion.execution.parquet.schema_force_view_types = false; --> Elapsed 6.427 seconds.

The goal is to get set datafusion.execution.parquet.schema_force_view_types = true; to be the same (or better) than when it is false

If you look at the flamegraph-string-view.svg, you can see most of the time is spent doing GroupsAccumulator

Screenshot 2024-09-16 at 4 44 50 PM

@Rachelint
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

Seems really interesting, I am reading the related disscusions.

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

BTW in case anyone is interested, I recorded a short video on how to make these flamegraphs: https://youtu.be/2z11xtYw_xs

I will add a link to that in the docs later

@Rachelint
Copy link
Contributor

The challenge of String seems that?

  • If we just simply use a Vec<String> like primitives to keep the min/max values, it is too expensive to convert them to StringArray/StringViewArray(many many copy)
  • But if we use StringArray like approach to keep the values, we can't update the min/max values.
  • So Finally we need to use a StringViewArray like approach to make it, but still have the new challenge about gc?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

The challenge of String seems that?

  • If we just simply use a Vec<String> like primitives to keep the min/max values, it is too expensive to convert them to StringArray/StringViewArray(many many copy)

I think the overhead is actually mostly that there is an additional (small) allocation for each String. For queries with a small numer of groups (like 100) an extra 100 allocations isn't all that bad. For queries with millions of groups the overhad is substantial

  • But if we use StringArray like approach to keep the values, we can't update the min/max values.

I suppose we could potentially update the values as long as the new strings were shorter 🤔

  • So Finally we need to use a StringViewArray like approach to make it, but still have the new challenge about gc?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

@Rachelint
Copy link
Contributor

Rachelint commented Sep 17, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

🤔 Can we still use the string view like approach to store states #6906 (comment), but for the uninlined state, we use a single String to store it?
And when we output them to StringViewArray, we convert this single String to a tiny buffer.

For example, if all states are uninlined(len > 12), the output StringViewArray may be like:

row1: view1 buffer1(with only one string)
row2: view2 buffer2(with only one string)
...
rown: viewn buffern

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

@Rachelint
Copy link
Contributor

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

Yes... At least it will be better than now, even we just use Vec<String> to impl a specific GroupsAccumulator for String type...

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

@Rachelint
Copy link
Contributor

Rachelint commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

Ok, for StringView min/max, seems we can just start with using Vec<u128>(views) to store the inlined state(<= 12), use Vec<String> to store the unlined.

And when converting it to StringViewArray, we just copy the Vec<String> to create the buffer (GroupsAccumulatorAdapter copy the states too).

For the short strings(<=12), it can avoid allocating String, and for the long ones, it just do the same thing as GroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

Ok, for StringView min/max, seems we can just start with using Vec<u128>(views) to store the inlined state(<= 12), use Vec<String> to store the unlined.

And when converting it to StringViewArray, we just copy the Vec<String> to create the buffer (GroupsAccumulatorAdapter copy the states too).

For the short strings(<=12), it can avoid allocating String, and for the long ones, it just do the same thing as GroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?

Seems like a reasonable place to start in my opinion. If we want to get more sophisticated at a later time we can try something more exotic. I suspect there will be times when individual allocations will be faster and times when a buffer will be faster and there will be memory consumption tradeoffs as well.

TLDR we should implement something and as long as it is better than what is currently going on that will be good.

@XiangpengHao
Copy link
Contributor

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

@XiangpengHao
Copy link
Contributor

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

I mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.

@devanbenz
Copy link
Contributor

devanbenz commented Sep 22, 2024

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

I mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.

Cool, I'm assuming this change didn't impact the GroupsAccumulatorAdapter call in anyway since I'm still seeing a similar performance + flamegraph with the following query:

set datafusion.execution.parquet.schema_force_view_types = true;


SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

with_changes_sv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants