Skip to content

GroupBy using only one group after some time #7544

Open
@PiotrDuz

Description

Hello,
I have experienced a strange behaviour in code and created unit tests that demonstrate it.
I am running RxJava3 v 3.1.5

Tests written in groovy and spock:

void 'test groupBy'() {
       given:
       def scheduler = Schedulers.from(Executors.newFixedThreadPool(5))
       when:
       Flowable.range(0, 5)
               .flatMap(partition -> {
                   return Flowable.range(0, 10_000_000)
                           .subscribeOn(Schedulers.io())
                           .filter(item -> item % 5 == partition)
                           .delay(10, TimeUnit.MILLISECONDS)
                           .map(item -> new Container(partition, item))
               })
               .doOnNext(container -> System.out.println("Next :" + container.partition))
               .groupBy(item -> item.getPartition())
               .flatMap(grouped -> {
                   grouped.flatMap(container -> {
                       Flowable.fromRunnable(() -> {
                           Thread.sleep(100)
                           System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName())
                       })
                               .subscribeOn(scheduler)
                   }, 1)
               })
               .blockingSubscribe()


       then:
       1
   }


   void 'without groupBy'() {
       given:
       def scheduler = Schedulers.from(Executors.newFixedThreadPool(5))
       when:
       Flowable.range(0, 5)
               .flatMap(partition -> {
                   return Flowable.range(0, 10_000_000)
                           .subscribeOn(Schedulers.io())
                           .filter(item -> item % 5 == partition)
                           .observeOn(scheduler)
                           .delay(10, TimeUnit.MILLISECONDS)
                           .map(item -> new Container(partition, item))
                           .map(container -> {
                               Thread.sleep(100)
                               System.out.println("Partition " + container.partition + " " + Thread.currentThread().getName())
                               return container
                           })
               })
               .blockingSubscribe()
       then:
       1
   }

public class Container {
   private final int partition;

   private final int value;

   public Container(int partition, int value) {
       this.partition = partition;
       this.value = value;
   }

   public int getPartition() {
       return partition;
   }

   public int getValue() {
       return value;
   }
}

First test with groupBy fetches simultaneously values from a source (lets assume that fetching is slow so we do it in parallel for different values)
Fetched values must be processed sequentially, so after joining them in common stream, we group them by "partition" and then process groups in parallel - but still keeping processing of each group sequential.
Unfortunately after few seconds only one partition is being processed. The threads are changing, but data is fetched only from partiton X. I would expect that each group is processed in parallel, so in output log there should be many partitions interleaving.

The other tests shows how the result should be. It actually allows us to fetch data in parallel and proces streams together, sequantially each. So I treat it as a workaroud, but still wonder what happened in first example.

Could you help me with finding a couse of this behaviour?

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions