Skip to content
Snippets Groups Projects
Commit 9ecadc4d authored by Bill Bejeck's avatar Bill Bejeck Committed by Guozhang Wang
Browse files

MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)

Just a minor cleanup to use Java 8 lambdas vs anonymous classes in this test.

I ran all tests in the streams test suite

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
parent 2aca6241
No related branches found
No related tags found
No related merge requests found
......@@ -257,7 +257,7 @@ public class StreamsResetter {
CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
}
final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<OptionSpec<?>>();
final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>();
allScenarioOptions.$plus(toOffsetOption);
allScenarioOptions.$plus(toDatetimeOption);
allScenarioOptions.$plus(byDurationOption);
......
......@@ -104,78 +104,31 @@ public class KStreamImplTest {
final KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
final KStream<String, String> stream1 =
source1.filter(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
return true;
}
}).filterNot(new Predicate<String, String>() {
@Override
public boolean test(final String key, final String value) {
return false;
}
});
final KStream<String, String> stream1 = source1.filter((key, value) -> true)
.filterNot((key, value) -> false);
final KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String, Integer>() {
@Override
public Integer apply(final String value) {
return new Integer(value);
}
});
final KStream<String, Integer> stream2 = stream1.mapValues(Integer::new);
final KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(final String value) {
return Collections.singletonList(new Integer(value));
}
});
final KStream<String, Integer> stream3 = source2.flatMapValues((ValueMapper<String, Iterable<Integer>>)
value -> Collections.singletonList(new Integer(value)));
final KStream<String, Integer>[] streams2 = stream2.branch(
new Predicate<String, Integer>() {
@Override
public boolean test(final String key, final Integer value) {
return (value % 2) == 0;
}
},
new Predicate<String, Integer>() {
@Override
public boolean test(final String key, final Integer value) {
return true;
}
}
(key, value) -> (value % 2) == 0,
(key, value) -> true
);
final KStream<String, Integer>[] streams3 = stream3.branch(
new Predicate<String, Integer>() {
@Override
public boolean test(final String key, final Integer value) {
return (value % 2) == 0;
}
},
new Predicate<String, Integer>() {
@Override
public boolean test(final String key, final Integer value) {
return true;
}
}
(key, value) -> (value % 2) == 0,
(key, value) -> true
);
final int anyWindowSize = 1;
final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
final KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(final Integer value1, final Integer value2) {
return value1 + value2;
}
}, JoinWindows.of(ofMillis(anyWindowSize)), joined);
final KStream<String, Integer> stream4 = streams2[0].join(streams3[0],
(value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(anyWindowSize)), joined);
streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(final Integer value1, final Integer value2) {
return value1 + value2;
}
}, JoinWindows.of(ofMillis(anyWindowSize)), joined);
streams2[1].join(streams3[1], (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(anyWindowSize)), joined);
stream4.to("topic-5");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment