Skip to content

Commit

Permalink
Merge pull request #1309 from smallrye/feat/multi-split
Browse files Browse the repository at this point in the history
Multi split operator
  • Loading branch information
jponge committed Jul 10, 2023
2 parents 43c3b61 + 7f4f398 commit 763cfec
Show file tree
Hide file tree
Showing 11 changed files with 833 additions and 66 deletions.
232 changes: 167 additions & 65 deletions documentation/Pipfile.lock

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions documentation/docs/guides/multi-split.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
tags:
- guide
- intermediate
---

# Splitting a Multi into several Multi

It is possible to split a `Multi` into several `Multi` streams.

## Using the split operator

Suppose that we have a stream of strings that represent _signals_, and that we want a `Multi` for each kind of signal:

- `?foo`, `?bar` are _input_ signals,
- `!foo`, `!bar` are _output_ signals,
- `foo`, `bar` are _other_ signals.

To do that, we need a function that maps each item of the stream to its target stream.
The splitter API needs a Java enumeration to define keys, as in:

```java linenums="1"
{{ insert('java/guides/operators/SplitTest.java', 'enum') }}
```

Now we can use the `split` operator that provides a splitter object, and fetch individual `Multi` for each split stream using the `get` method:

```java linenums="1"
{{ insert('java/guides/operators/SplitTest.java', 'splits') }}
```

This prints the following console output:

```
output - a
input - b
output - c
output - d
other - 123
input - e
```

## Notes on using splits

- Items flow when all splits have a subscriber.
- The flow stops when either of the subscribers cancels, or when any subscriber has a no outstanding demand.
- The flow resumes when all splits have a subscriber again, and when all subscribers have outstanding demand.
- Only one subscriber can be active for a given split. Other subscription attempts will receive an error.
- When a subscriber cancels, then a new subscription attempt on its corresponding split can succeed.
- Subscribing to an already completed or errored split results in receiving the terminal signal (`onComplete()` or `onFailure(err)`).
- The upstream `Multi` gets subscribed to when the first split subscription happens, no matter which split it is.
- The first split subscription passes its context, if any, to the upstream `Multi`. It is expected that all split subscribers share the same context object, or the behavior of your code will most likely be incorrect.
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nav:
- 'guides/context-passing.md'
- 'guides/replaying-multis.md'
- 'guides/controlling-demand.md'
- 'guides/multi-split.md'
- 'Reference':
- 'reference/migrating-to-mutiny-2.md'
- 'reference/why-is-asynchronous-important.md'
Expand Down
45 changes: 45 additions & 0 deletions documentation/src/test/java/guides/operators/SplitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package guides.operators;

import io.smallrye.mutiny.Multi;
import org.junit.jupiter.api.Test;

public class SplitTest {

// <enum>
enum Signals {
INPUT,
OUTPUT,
OTHER
}
// </enum>

@Test
public void splitDemo() {
// <splits>
Multi<String> multi = Multi.createFrom().items(
"!a", "?b", "!c", "!d", "123", "?e"
);

var splitter = multi.split(Signals.class, s -> {
if (s.startsWith("?")) {
return Signals.INPUT;
} else if (s.startsWith("!")) {
return Signals.OUTPUT;
} else {
return Signals.OTHER;
}
});

splitter.get(Signals.INPUT)
.onItem().transform(s -> s.substring(1))
.subscribe().with(signal -> System.out.println("input - " + signal));

splitter.get(Signals.OUTPUT)
.onItem().transform(s -> s.substring(1))
.subscribe().with(signal -> System.out.println("output - " + signal));

splitter.get(Signals.OTHER)
.subscribe().with(signal -> System.out.println("other - " + signal));
// </splits>
}
}
1 change: 1 addition & 0 deletions implementation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<sourceFileInclude>io/smallrye/mutiny/infrastructure/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/multi/processors/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/multi/split/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/subscription/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/tuples/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/unchecked/*.java</sourceFileInclude>
Expand Down
31 changes: 31 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import java.util.function.*;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.groups.*;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;

public interface Multi<T> extends Publisher<T> {

Expand Down Expand Up @@ -637,4 +639,33 @@ default Multi<T> capDemandsTo(long max) {
*/
@CheckReturnValue
Multi<T> capDemandsUsing(LongFunction<Long> function);

/**
* Splits this {@link Multi} into several co-operating {@link Multi} based on an enumeration and a mapping function.
* <p>
* Here is a sample where a stream of integers is split into streams for odd and even numbers:
*
* <pre>
* {@code
* // Split someMulti into 2 streams
* var splitter = someMulti.split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
*
* // Stream for odd numbers
* vor odd = splitter.get(OddEven.ODD).subscribe().with(...);
*
* // Stream for even numbers
* vor even = splitter.get(OddEven.EVEN).subscribe().with(...);
* }
* </pre>
*
* @param keyType the key type
* @param splitter the splitter function
* @return a splitter
* @param <K> the key type
*/
@CheckReturnValue
@Experimental("Multi splitting is an experimental API in Mutiny 2.3.0")
default <K extends Enum<K>> MultiSplitter<T, K> split(Class<K> keyType, Function<T, K> splitter) {
return new MultiSplitter<>(this, keyType, splitter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,4 @@ public MultiDemandPacing<T> paceDemand() {
public Multi<T> capDemandsUsing(LongFunction<Long> function) {
return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function")));
}

}
Loading

0 comments on commit 763cfec

Please sign in to comment.