From 9ecadc4df474d9cfbfda3256f01eba1423cf5902 Mon Sep 17 00:00:00 2001 From: Bill Bejeck <bbejeck@gmail.com> Date: Tue, 12 Mar 2019 16:35:25 -0400 Subject: [PATCH] MINOR: Use Java 8 lambdas in KStreamImplTest (#6430) Just a minor cleanup to use Java 8 lambdas vs anonymous classes in this test. I ran all tests in the streams test suite Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com> --- .../scala/kafka/tools/StreamsResetter.java | 2 +- .../kstream/internals/KStreamImplTest.java | 73 ++++--------------- 2 files changed, 14 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 3666f6740e..71529f8ca4 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -257,7 +257,7 @@ public class StreamsResetter { CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified"); } - final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<OptionSpec<?>>(); + final scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = new scala.collection.immutable.HashSet<>(); allScenarioOptions.$plus(toOffsetOption); allScenarioOptions.$plus(toDatetimeOption); allScenarioOptions.$plus(byDurationOption); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 76a01cde53..bd2ab5ba74 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -104,78 +104,31 @@ public class KStreamImplTest { final KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed); - final KStream<String, String> stream1 = - source1.filter(new Predicate<String, String>() { - @Override - public boolean test(final String key, final String value) { - return true; - } - }).filterNot(new Predicate<String, String>() { - @Override - public boolean test(final String key, final String value) { - return false; - } - }); + final KStream<String, String> stream1 = source1.filter((key, value) -> true) + .filterNot((key, value) -> false); - final KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String, Integer>() { - @Override - public Integer apply(final String value) { - return new Integer(value); - } - }); + final KStream<String, Integer> stream2 = stream1.mapValues(Integer::new); - final KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() { - @Override - public Iterable<Integer> apply(final String value) { - return Collections.singletonList(new Integer(value)); - } - }); + final KStream<String, Integer> stream3 = source2.flatMapValues((ValueMapper<String, Iterable<Integer>>) + value -> Collections.singletonList(new Integer(value))); final KStream<String, Integer>[] streams2 = stream2.branch( - new Predicate<String, Integer>() { - @Override - public boolean test(final String key, final Integer value) { - return (value % 2) == 0; - } - }, - new Predicate<String, Integer>() { - @Override - public boolean test(final String key, final Integer value) { - return true; - } - } + (key, value) -> (value % 2) == 0, + (key, value) -> true ); final KStream<String, Integer>[] streams3 = stream3.branch( - new Predicate<String, Integer>() { - @Override - public boolean test(final String key, final Integer value) { - return (value % 2) == 0; - } - }, - new Predicate<String, Integer>() { - @Override - public boolean test(final String key, final Integer value) { - return true; - } - } + (key, value) -> (value % 2) == 0, + (key, value) -> true ); final int anyWindowSize = 1; final Joined<String, Integer, Integer> joined = Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); - final KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() { - @Override - public Integer apply(final Integer value1, final Integer value2) { - return value1 + value2; - } - }, JoinWindows.of(ofMillis(anyWindowSize)), joined); + final KStream<String, Integer> stream4 = streams2[0].join(streams3[0], + (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(anyWindowSize)), joined); - streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() { - @Override - public Integer apply(final Integer value1, final Integer value2) { - return value1 + value2; - } - }, JoinWindows.of(ofMillis(anyWindowSize)), joined); + streams2[1].join(streams3[1], (value1, value2) -> value1 + value2, + JoinWindows.of(ofMillis(anyWindowSize)), joined); stream4.to("topic-5"); -- GitLab