From 4ca8e40e2f4ab415657d1e07ee868448802f4565 Mon Sep 17 00:00:00 2001
From: Lee Dongjin <dongjin@apache.org>
Date: Fri, 15 Mar 2019 10:42:07 +0900
Subject: [PATCH] KAFKA-7502: Cleanup KTable materialization logic in a single
 place (#6174)

This is a draft cleanup for KAFKA-7502. Here is the details:

* Make KTableKTableJoinNode abstract, and define its child classes ([NonMaterialized,Materialized]KTableKTableJoinNode) instead: now, all materialization-related routines are separated into the other classes.

* KTableKTableJoinNodeBuilder#build now instantiates [NonMaterialized,Materialized]KTableKTableJoinNode classes instead of KTableKTableJoinNode.

Reviewers: Guozhang Wang <wangguoz@gmail.com>,  Bill Bejeck <bbejeck@gmail.com>
---
 .../streams/kstream/internals/KTableImpl.java | 106 +++++------
 .../internals/KTableKTableJoinMerger.java     |  17 +-
 .../internals/graph/KTableKTableJoinNode.java | 172 +++++++++++-------
 3 files changed, 170 insertions(+), 125 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 68f940c905..d97213671e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -43,6 +43,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -465,28 +466,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
                                           final boolean rightOuter) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
-        final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
-        final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
-        return buildJoin(
-            (AbstractStream<K, VO>) other,
-            joiner,
-            leftOuter,
-            rightOuter,
-            joinMergeName,
-            internalQueryableName,
-            materializedInternal
-        );
-    }
-
-    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other,
-                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                           final boolean leftOuter,
-                                           final boolean rightOuter,
-                                           final String joinMergeName,
-                                           final String internalQueryableName,
-                                           final MaterializedInternal<K, R, KeyValueStore<Bytes, byte[]>> materializedInternal) {
-        final Set<String> allSourceNodes = ensureJoinableWith(other);
+        final String joinMergeName = builder.newProcessorName(MERGE_NAME);
+        final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
 
         if (leftOuter) {
             enableSendingOldValues();
@@ -495,57 +477,67 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             ((KTableImpl) other).enableSendingOldValues();
         }
 
-        final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
-        final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
-
-
-        final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
-        final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
+        final KTableKTableAbstractJoin<K, VR, V, VO> joinThis;
+        final KTableKTableAbstractJoin<K, VR, VO, V> joinOther;
 
         if (!leftOuter) { // inner
-            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
+            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
         } else if (!rightOuter) { // left
-            joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-            joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+            joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
+            joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
         } else { // outer
-            joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
-            joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
+            joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, VO>) other, joiner);
+            joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
         }
 
-        final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName);
+        final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
+        final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
+
+        final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
+        final ProcessorParameters<K, Change<VO>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
 
-        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, V, V1, R> kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
+        final Serde<K> keySerde;
+        final Serde<VR> valueSerde;
+        final String queryableStoreName;
+        final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
 
-        // only materialize if specified in Materialized
         if (materializedInternal != null) {
-            kTableJoinNodeBuilder.withMaterializedInternal(materializedInternal);
+            keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
+            valueSerde = materializedInternal.valueSerde();
+            queryableStoreName = materializedInternal.storeName();
+            storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+        } else {
+            keySerde = this.keySerde;
+            valueSerde = null;
+            queryableStoreName = null;
+            storeBuilder = null;
         }
-        kTableJoinNodeBuilder.withNodeName(joinMergeName);
 
-        final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
-        final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
-        final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = new ProcessorParameters<>(joinMerge, joinMergeName);
-
-        kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
-                             .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
-                             .withJoinThisProcessorParameters(joinThisProcessorParameters)
-                             .withJoinThisStoreNames(valueGetterSupplier().storeNames())
-                             .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
-                             .withOtherJoinSideNodeName(((KTableImpl) other).name)
-                             .withThisJoinSideNodeName(name);
-
-        final KTableKTableJoinNode<K, V, V1, R> kTableKTableJoinNode = kTableJoinNodeBuilder.build();
+        final KTableKTableJoinNode<K, V, VO, VR> kTableKTableJoinNode =
+            KTableKTableJoinNode.<K, V, VO, VR>kTableKTableJoinNodeBuilder()
+                .withNodeName(joinMergeName)
+                .withJoinThisProcessorParameters(joinThisProcessorParameters)
+                .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+                .withThisJoinSideNodeName(name)
+                .withOtherJoinSideNodeName(((KTableImpl) other).name)
+                .withJoinThisStoreNames(valueGetterSupplier().storeNames())
+                .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
+                .withKeySerde(keySerde)
+                .withValueSerde(valueSerde)
+                .withQueryableStoreName(queryableStoreName)
+                .withStoreBuilder(storeBuilder)
+                .build();
         builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
 
         // we can inherit parent key serde if user do not provide specific overrides
-        return new KTableImpl<K, Change<R>, R>(
-            joinMergeName,
-            materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
-            materializedInternal != null ? materializedInternal.valueSerde() : null,
+        return new KTableImpl<K, Change<VR>, VR>(
+            kTableKTableJoinNode.nodeName(),
+            kTableKTableJoinNode.keySerde(),
+            kTableKTableJoinNode.valueSerde(),
             allSourceNodes,
-            internalQueryableName,
-            joinMerge,
+            kTableKTableJoinNode.queryableStoreName(),
+            kTableKTableJoinNode.joinMerger(),
             kTableKTableJoinNode,
             builder
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 78c1dc6f48..de38042461 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
+public class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
 
     private final KTableProcessorSupplier<K, ?, V> parent1;
     private final KTableProcessorSupplier<K, ?, V> parent2;
@@ -40,6 +40,10 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
         this.queryableName = queryableName;
     }
 
+    public String getQueryableName() {
+        return queryableName;
+    }
+
     @Override
     public Processor<K, Change<V>> get() {
         return new KTableKTableJoinMergeProcessor();
@@ -78,6 +82,17 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
         sendOldValues = true;
     }
 
+    public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
+                                                         final KTableProcessorSupplier<K, ?, V> parent2) {
+        return of(parent1, parent2, null);
+    }
+
+    public static <K, V> KTableKTableJoinMerger<K, V> of(final KTableProcessorSupplier<K, ?, V> parent1,
+                                                         final KTableProcessorSupplier<K, ?, V> parent2,
+                                                         final String queryableName) {
+        return new KTableKTableJoinMerger<>(parent1, parent2, queryableName);
+    }
+
     private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> {
         private KeyValueStore<K, V> store;
         private TupleForwarder<K, V> tupleForwarder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index aeda0d9e19..03bdda0b2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -17,11 +17,10 @@
 
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
-import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
+import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -33,32 +32,64 @@ import java.util.Arrays;
  */
 public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K, Change<V1>, Change<V2>, Change<VR>> {
 
+    private final Serde<K> keySerde;
+    private final Serde<VR> valueSerde;
     private final String[] joinThisStoreNames;
     private final String[] joinOtherStoreNames;
-    private final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal;
+    private final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
 
     KTableKTableJoinNode(final String nodeName,
-                         final ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner,
                          final ProcessorParameters<K, Change<V1>> joinThisProcessorParameters,
                          final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters,
                          final ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters,
-                         final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
                          final String thisJoinSide,
                          final String otherJoinSide,
+                         final Serde<K> keySerde,
+                         final Serde<VR> valueSerde,
                          final String[] joinThisStoreNames,
-                         final String[] joinOtherStoreNames) {
+                         final String[] joinOtherStoreNames,
+                         final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
 
         super(nodeName,
-              valueJoiner,
-              joinThisProcessorParameters,
-              joinOtherProcessorParameters,
-              joinMergeProcessorParameters,
-              thisJoinSide,
-              otherJoinSide);
-
+            null,
+            joinThisProcessorParameters,
+            joinOtherProcessorParameters,
+            joinMergeProcessorParameters,
+            thisJoinSide,
+            otherJoinSide);
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
         this.joinThisStoreNames = joinThisStoreNames;
         this.joinOtherStoreNames = joinOtherStoreNames;
-        this.materializedInternal = materializedInternal;
+        this.storeBuilder = storeBuilder;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<VR> valueSerde() {
+        return valueSerde;
+    }
+
+    public String[] joinThisStoreNames() {
+        return joinThisStoreNames;
+    }
+
+    public String[] joinOtherStoreNames() {
+        return joinOtherStoreNames;
+    }
+
+    public String queryableStoreName() {
+        return ((KTableKTableJoinMerger) mergeProcessorParameters().processorSupplier()).getQueryableName();
+    }
+
+    /**
+     * The supplier which provides processor with KTable-KTable join merge functionality.
+     */
+    public KTableKTableJoinMerger<K, VR> joinMerger() {
+        return (KTableKTableJoinMerger<K, VR>) mergeProcessorParameters().processorSupplier();
     }
 
     @Override
@@ -68,26 +99,24 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         final String mergeProcessorName = mergeProcessorParameters().processorName();
 
         topologyBuilder.addProcessor(thisProcessorName,
-                                     thisProcessorParameters().processorSupplier(),
-                                     thisJoinSideNodeName());
+            thisProcessorParameters().processorSupplier(),
+            thisJoinSideNodeName());
 
         topologyBuilder.addProcessor(otherProcessorName,
-                                     otherProcessorParameters().processorSupplier(),
-                                     otherJoinSideNodeName());
+            otherProcessorParameters().processorSupplier(),
+            otherJoinSideNodeName());
 
         topologyBuilder.addProcessor(mergeProcessorName,
-                                     mergeProcessorParameters().processorSupplier(),
-                                     thisProcessorName,
-                                     otherProcessorName);
+            mergeProcessorParameters().processorSupplier(),
+            thisProcessorName,
+            otherProcessorName);
 
         topologyBuilder.connectProcessorAndStateStores(thisProcessorName,
-                                                       joinOtherStoreNames);
+            joinOtherStoreNames);
         topologyBuilder.connectProcessorAndStateStores(otherProcessorName,
-                                                       joinThisStoreNames);
+            joinThisStoreNames);
 
-        if (materializedInternal != null) {
-            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder =
-                new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+        if (storeBuilder != null) {
             topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
         }
     }
@@ -95,10 +124,9 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
     @Override
     public String toString() {
         return "KTableKTableJoinNode{" +
-               "joinThisStoreNames=" + Arrays.toString(joinThisStoreNames) +
-               ", joinOtherStoreNames=" + Arrays.toString(joinOtherStoreNames) +
-               ", materializedInternal=" + materializedInternal +
-               "} " + super.toString();
+            "joinThisStoreNames=" + Arrays.toString(joinThisStoreNames()) +
+            ", joinOtherStoreNames=" + Arrays.toString(joinOtherStoreNames()) +
+            "} " + super.toString();
     }
 
     public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() {
@@ -106,23 +134,23 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
     }
 
     public static final class KTableKTableJoinNodeBuilder<K, V1, V2, VR> {
-
         private String nodeName;
-        private String[] joinThisStoreNames;
         private ProcessorParameters<K, Change<V1>> joinThisProcessorParameters;
-        private String[] joinOtherStoreNames;
-        private MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal;
         private ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters;
-        private ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters;
-        private ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner;
         private String thisJoinSide;
         private String otherJoinSide;
+        private Serde<K> keySerde;
+        private Serde<VR> valueSerde;
+        private String[] joinThisStoreNames;
+        private String[] joinOtherStoreNames;
+        private String queryableStoreName;
+        private StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
 
         private KTableKTableJoinNodeBuilder() {
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(final String[] joinThisStoreNames) {
-            this.joinThisStoreNames = joinThisStoreNames;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
+            this.nodeName = nodeName;
             return this;
         }
 
@@ -131,59 +159,69 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withNodeName(final String nodeName) {
-            this.nodeName = nodeName;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters) {
+            this.joinOtherProcessorParameters = joinOtherProcessorParameters;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
-            this.joinOtherStoreNames = joinOtherStoreNames;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(final String thisJoinSide) {
+            this.thisJoinSide = thisJoinSide;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherProcessorParameters(final ProcessorParameters<K, Change<V2>> joinOtherProcessorParameters) {
-            this.joinOtherProcessorParameters = joinOtherProcessorParameters;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(final String otherJoinSide) {
+            this.otherJoinSide = otherJoinSide;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinMergeProcessorParameters(final ProcessorParameters<K, Change<VR>> joinMergeProcessorParameters) {
-            this.joinMergeProcessorParameters = joinMergeProcessorParameters;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withKeySerde(final Serde<K> keySerde) {
+            this.keySerde = keySerde;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueJoiner(final ValueJoiner<? super Change<V1>, ? super Change<V2>, ? extends Change<VR>> valueJoiner) {
-            this.valueJoiner = valueJoiner;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withValueSerde(final Serde<VR> valueSerde) {
+            this.valueSerde = valueSerde;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withThisJoinSideNodeName(final String thisJoinSide) {
-            this.thisJoinSide = thisJoinSide;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinThisStoreNames(final String[] joinThisStoreNames) {
+            this.joinThisStoreNames = joinThisStoreNames;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withOtherJoinSideNodeName(final String otherJoinSide) {
-            this.otherJoinSide = otherJoinSide;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withJoinOtherStoreNames(final String[] joinOtherStoreNames) {
+            this.joinOtherStoreNames = joinOtherStoreNames;
             return this;
         }
 
-        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withMaterializedInternal(
-                final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
-            this.materializedInternal = materializedInternal;
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withQueryableStoreName(final String queryableStoreName) {
+            this.queryableStoreName = queryableStoreName;
             return this;
         }
 
-        public KTableKTableJoinNode<K, V1, V2, VR> build() {
+        public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBuilder<KeyValueStore<K, VR>> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+            return this;
+        }
 
+        @SuppressWarnings("unchecked")
+        public KTableKTableJoinNode<K, V1, V2, VR> build() {
             return new KTableKTableJoinNode<>(nodeName,
-                                              valueJoiner,
-                                              joinThisProcessorParameters,
-                                              joinOtherProcessorParameters,
-                                              joinMergeProcessorParameters,
-                                              materializedInternal,
-                                              thisJoinSide,
-                                              otherJoinSide,
-                                              joinThisStoreNames,
-                                              joinOtherStoreNames);
+                joinThisProcessorParameters,
+                joinOtherProcessorParameters,
+                new ProcessorParameters<>(
+                    KTableKTableJoinMerger.of(
+                        (KTableProcessorSupplier<K, V1, VR>) (joinThisProcessorParameters.processorSupplier()),
+                        (KTableProcessorSupplier<K, V2, VR>) (joinOtherProcessorParameters.processorSupplier()),
+                        queryableStoreName),
+                    nodeName),
+                thisJoinSide,
+                otherJoinSide,
+                keySerde,
+                valueSerde,
+                joinThisStoreNames,
+                joinOtherStoreNames,
+                storeBuilder);
         }
     }
 }
-- 
GitLab