Skip to content
Snippets Groups Projects
Commit 2aca6241 authored by cadonna's avatar cadonna Committed by Matthias J. Sax
Browse files

MINOR: Avoid double null check in KStream#transform() (#6429)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
parent 8e975400
No related branches found
No related tags found
No related merge requests found
......@@ -439,17 +439,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
builder.addGraphNode(this.streamsGraphNode, sinkNode);
}
@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return flatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
}
@Override
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
private <K1, V1> KStream<K1, V1> doFlatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORM_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
......@@ -464,6 +455,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder);
}
@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return doFlatTransform(new TransformerSupplierAdapter<>(transformerSupplier), stateStoreNames);
}
@Override
public <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
return doFlatTransform(transformerSupplier, stateStoreNames);
}
@Override
public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment