Skip to content
Snippets Groups Projects
Commit 62171781 authored by Stanislav Kozlovski's avatar Stanislav Kozlovski Committed by Colin Patrick McCabe
Browse files

KAFKA-7819: Improve RoundTripWorker (#6187)

RoundTripWorker to should use a long field for maxMessages rather than an int.  The consumer group used should unique as well.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
parent 58d05729
No related branches found
No related tags found
No related merge requests found
......@@ -57,11 +57,13 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
......@@ -82,6 +84,10 @@ public class RoundTripWorker implements TaskWorker {
private final AtomicBoolean running = new AtomicBoolean(false);
private final Lock lock = new ReentrantLock();
private final Condition unackedSendsAreZero = lock.newCondition();
private ScheduledExecutorService executor;
private WorkerStatusTracker status;
......@@ -92,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
private KafkaConsumer<byte[], byte[]> consumer;
private CountDownLatch unackedSends;
private Long unackedSends;
private ToSendTracker toSendTracker;
......@@ -114,7 +120,7 @@ public class RoundTripWorker implements TaskWorker {
this.doneFuture = doneFuture;
this.producer = null;
this.consumer = null;
this.unackedSends = new CountDownLatch(spec.maxMessages());
this.unackedSends = spec.maxMessages();
executor.submit(new Prepare());
}
......@@ -157,29 +163,29 @@ public class RoundTripWorker implements TaskWorker {
}
private static class ToSendTrackerResult {
final int index;
final long index;
final boolean firstSend;
ToSendTrackerResult(int index, boolean firstSend) {
ToSendTrackerResult(long index, boolean firstSend) {
this.index = index;
this.firstSend = firstSend;
}
}
private static class ToSendTracker {
private final int maxMessages;
private final List<Integer> failed = new ArrayList<>();
private int frontier = 0;
private final long maxMessages;
private final List<Long> failed = new ArrayList<>();
private long frontier = 0;
ToSendTracker(int maxMessages) {
ToSendTracker(long maxMessages) {
this.maxMessages = maxMessages;
}
synchronized void addFailed(int index) {
synchronized void addFailed(long index) {
failed.add(index);
}
synchronized int frontier() {
synchronized long frontier() {
return frontier;
}
......@@ -232,7 +238,7 @@ public class RoundTripWorker implements TaskWorker {
break;
}
throttle.increment();
final int messageIndex = result.index;
final long messageIndex = result.index;
if (result.firstSend) {
toReceiveTracker.addPending(messageIndex);
uniqueMessagesSent++;
......@@ -248,7 +254,14 @@ public class RoundTripWorker implements TaskWorker {
spec.valueGenerator().generate(messageIndex));
producer.send(record, (metadata, exception) -> {
if (exception == null) {
unackedSends.countDown();
try {
lock.lock();
unackedSends -= 1;
if (unackedSends <= 0)
unackedSendsAreZero.signalAll();
} finally {
lock.unlock();
}
} else {
log.info("{}: Got exception when sending message {}: {}",
id, messageIndex, exception.getMessage());
......@@ -259,23 +272,28 @@ public class RoundTripWorker implements TaskWorker {
} catch (Throwable e) {
WorkerUtils.abort(log, "ProducerRunnable", e, doneFuture);
} finally {
log.info("{}: ProducerRunnable is exiting. messagesSent={}; uniqueMessagesSent={}; " +
"ackedSends={}.", id, messagesSent, uniqueMessagesSent,
spec.maxMessages() - unackedSends.getCount());
try {
lock.lock();
log.info("{}: ProducerRunnable is exiting. messagesSent={}; uniqueMessagesSent={}; " +
"ackedSends={}/{}.", id, messagesSent, uniqueMessagesSent,
spec.maxMessages() - unackedSends, spec.maxMessages());
} finally {
lock.unlock();
}
}
}
}
private class ToReceiveTracker {
private final TreeSet<Integer> pending = new TreeSet<>();
private final TreeSet<Long> pending = new TreeSet<>();
private int totalReceived = 0;
private long totalReceived = 0;
synchronized void addPending(int messageIndex) {
synchronized void addPending(long messageIndex) {
pending.add(messageIndex);
}
synchronized boolean removePending(int messageIndex) {
synchronized boolean removePending(long messageIndex) {
if (pending.remove(messageIndex)) {
totalReceived++;
return true;
......@@ -284,18 +302,18 @@ public class RoundTripWorker implements TaskWorker {
}
}
synchronized int totalReceived() {
synchronized long totalReceived() {
return totalReceived;
}
void log() {
int numToReceive;
List<Integer> list = new ArrayList<>(LOG_NUM_MESSAGES);
long numToReceive;
List<Long> list = new ArrayList<>(LOG_NUM_MESSAGES);
synchronized (this) {
numToReceive = pending.size();
for (Iterator<Integer> iter = pending.iterator();
for (Iterator<Long> iter = pending.iterator();
iter.hasNext() && (list.size() < LOG_NUM_MESSAGES); ) {
Integer i = iter.next();
Long i = iter.next();
list.add(i);
}
}
......@@ -311,7 +329,7 @@ public class RoundTripWorker implements TaskWorker {
this.props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-1");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-consumer-group-" + id);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 105000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
......@@ -341,9 +359,16 @@ public class RoundTripWorker implements TaskWorker {
if (toReceiveTracker.removePending(messageIndex)) {
uniqueMessagesReceived++;
if (uniqueMessagesReceived >= spec.maxMessages()) {
log.info("{}: Consumer received the full count of {} unique messages. " +
"Waiting for all sends to be acked...", id, spec.maxMessages());
unackedSends.await();
try {
lock.lock();
log.info("{}: Consumer received the full count of {} unique messages. " +
"Waiting for all {} sends to be acked...", id, spec.maxMessages(), unackedSends);
while (unackedSends > 0)
unackedSendsAreZero.await();
} finally {
lock.unlock();
}
log.info("{}: all sends have been acked.", id);
new StatusUpdater().update();
doneFuture.complete("");
......@@ -360,6 +385,8 @@ public class RoundTripWorker implements TaskWorker {
log.debug("{}: Consumer got WakeupException", id, e);
} catch (TimeoutException e) {
log.debug("{}: Consumer got TimeoutException", id, e);
} finally {
lock.unlock();
}
}
} catch (Throwable e) {
......@@ -415,9 +442,9 @@ public class RoundTripWorker implements TaskWorker {
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
throw new IllegalStateException("ProduceBenchWorker is not running.");
throw new IllegalStateException("RoundTripWorker is not running.");
}
log.info("{}: Deactivating RoundTripWorkloadWorker.", id);
log.info("{}: Deactivating RoundTripWorker.", id);
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
......@@ -428,5 +455,6 @@ public class RoundTripWorker implements TaskWorker {
this.unackedSends = null;
this.executor = null;
this.doneFuture = null;
log.info("{}: Deactivated RoundTripWorker.", id);
}
}
......@@ -36,7 +36,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
private final int targetMessagesPerSec;
private final PayloadGenerator valueGenerator;
private final TopicsSpec activeTopics;
private final int maxMessages;
private final long maxMessages;
private final Map<String, String> commonClientConf;
private final Map<String, String> producerConf;
private final Map<String, String> consumerConf;
......@@ -54,7 +54,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("activeTopics") TopicsSpec activeTopics,
@JsonProperty("maxMessages") int maxMessages) {
@JsonProperty("maxMessages") long maxMessages) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers;
......@@ -96,7 +96,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
}
@JsonProperty
public int maxMessages() {
public long maxMessages() {
return maxMessages;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment