Skip to content
Snippets Groups Projects
Commit 741cb761 authored by Lee Dongjin's avatar Lee Dongjin Committed by Jason Gustafson
Browse files

KAFKA-4514; Add Codec for ZStandard Compression (#2267)

This patch adds support for zstandard compression to Kafka as documented in KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression. 

Reviewers: Ivan Babrou <ibobrik@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
parent 578205ca
No related branches found
No related tags found
No related merge requests found
Showing
with 326 additions and 40 deletions
......@@ -201,6 +201,7 @@
See the License for the specific language governing permissions and
limitations under the License.
------------------------------------------------------------------------------------
This distribution has a binary dependency on jersey, which is available under the CDDL
License as described below.
......@@ -328,3 +329,68 @@ As between Initial Developer and the Contributors, each party is responsible for
NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL)
The code released under the CDDL shall be governed by the laws of the State of California (excluding conflict-of-law provisions). Any litigation relating to this License shall be subject to the jurisdiction of the Federal Courts of the Northern District of California and the state courts of the State of California, with venue lying in Santa Clara County, California.
------------------------------------------------------------------------------------
This distribution has a binary dependency on zstd, which is available under the BSD 3-Clause License as described below.
BSD License
For Zstandard software
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
------------------------------------------------------------------------------------
This distribution has a binary dependency on zstd-jni, which is available under the BSD 2-Clause License
as described below.
Zstd-jni: JNI bindings to Zstd Library
Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.
BSD License
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
......@@ -820,6 +820,7 @@ project(':clients') {
conf2ScopeMappings.addMapping(1000, configurations.jacksonDatabindConfig, "provided")
dependencies {
compile libs.zstd
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
......
......@@ -159,7 +159,7 @@ public class ProducerConfig extends AbstractConfig {
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** <code>metrics.sample.window.ms</code> */
......
......@@ -140,7 +140,7 @@ public class TopicConfig {
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
"This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
"This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
"accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
"original compression codec set by the producer.";
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* The requesting client does not support the compression type of given partition.
*/
public class UnsupportedCompressionTypeException extends ApiException {
private static final long serialVersionUID = 1L;
public UnsupportedCompressionTypeException(String message) {
super(message);
}
public UnsupportedCompressionTypeException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -91,6 +91,7 @@ import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
......@@ -284,7 +285,9 @@ public enum Errors {
FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the epoch on the broker",
FencedLeaderEpochException::new),
UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than the epoch on the broker",
UnknownLeaderEpochException::new);
UnknownLeaderEpochException::new),
UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression type of given partition.",
UnsupportedCompressionTypeException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
......
......@@ -322,6 +322,8 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
if (compressionType == CompressionType.ZSTD)
throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
......
......@@ -113,6 +113,26 @@ public enum CompressionType {
throw new KafkaException(e);
}
}
},
ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
} catch (Throwable e) {
throw new KafkaException(e);
}
}
};
public final int id;
......@@ -156,6 +176,8 @@ public enum CompressionType {
return SNAPPY;
case 3:
return LZ4;
case 4:
return ZSTD;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
......@@ -170,6 +192,8 @@ public enum CompressionType {
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
else if (ZSTD.name.equals(name))
return ZSTD;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
......@@ -177,7 +201,7 @@ public enum CompressionType {
// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
// some platforms.
//
// For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
......@@ -190,6 +214,13 @@ public enum CompressionType {
MethodType.methodType(void.class, OutputStream.class));
}
private static class ZstdConstructors {
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
MethodType.methodType(void.class, InputStream.class));
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
MethodType.methodType(void.class, OutputStream.class));
}
private static MethodHandle findConstructor(String className, MethodType methodType) {
try {
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);
......
......@@ -44,6 +44,9 @@ public class LazyDownConversionRecords implements BaseRecords {
* @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time The time instance to use
*
* @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert
* has a compression type which we do not support down-conversion for.
*/
public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
this.topicPartition = Objects.requireNonNull(topicPartition);
......@@ -150,7 +153,7 @@ public class LazyDownConversionRecords implements BaseRecords {
}
while (batchIterator.hasNext()) {
List<RecordBatch> batches = new ArrayList<>();
final List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;
......@@ -162,6 +165,7 @@ public class LazyDownConversionRecords implements BaseRecords {
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
}
ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
// During conversion, it is possible that we drop certain batches because they do not have an equivalent
// representation in the message format we want to convert to. For example, V0 and V1 message formats
......
......@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -45,35 +46,50 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
}
private MemoryRecords buildOverflowBatch(int remaining) {
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
// This message will be ignored by the consumer because its length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
overflowMessageBatch.putLong(-1L);
// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
// overhead.
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
return MemoryRecords.readableRecords(overflowMessageBatch);
}
@Override
public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
// This message will be ignored by the consumer because its length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
overflowMessageBatch.putLong(-1L);
// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
// overhead.
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch);
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
try {
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
recordConversionStats.add(recordsAndStats.recordConversionStats());
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
convertedRecords = buildOverflowBatch(remaining);
}
} catch (UnsupportedCompressionTypeException e) {
// We have encountered a compression type which does not support down-conversion (e.g. zstd).
// Since we have already sent at least one batch and we have committed to the fetch size, we
// send an overflow batch. The consumer will read the first few records and then fetch from the
// offset of the batch which has the unsupported compression type. At that time, we will
// send back the UNSUPPORTED_COMPRESSION_TYPE erro which will allow the consumer to fail gracefully.
convertedRecords = buildOverflowBatch(remaining);
}
convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);
......
......@@ -102,6 +102,8 @@ public class MemoryRecordsBuilder {
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
}
this.magic = magic;
......
......@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import java.nio.ByteBuffer;
......@@ -45,8 +46,14 @@ public class RecordsUtil {
long startNanos = time.nanoseconds();
for (RecordBatch batch : batches) {
if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
continue;
if (toMagic < RecordBatch.MAGIC_VALUE_V2) {
if (batch.isControlBatch())
continue;
if (batch.compressionType() == CompressionType.ZSTD)
throw new UnsupportedCompressionTypeException("Down-conversion of zstandard-compressed batches " +
"is not supported");
}
if (batch.magic() <= toMagic) {
totalSizeEstimate += batch.sizeInBytes();
......
......@@ -191,9 +191,13 @@ public class FetchRequest extends AbstractRequest {
FETCH_REQUEST_TOPIC_V9,
FORGOTTEN_TOPIC_DATA_V7);
// V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
public static Schema[] schemaVersions() {
return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9};
FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9,
FETCH_REQUEST_V10};
}
// default values for older versions where a request level limit did not exist
......
......@@ -61,6 +61,8 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
* - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have metadata for a topic or partition
* - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the requested partitions is offline
* - {@link Errors#UNSUPPORTED_COMPRESSION_TYPE} If a fetched topic is using a compression type which is
* not supported by the fetch request version
* - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
*/
public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
......@@ -180,10 +182,13 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// V9 adds the current leader epoch (see KIP-320)
private static final Schema FETCH_RESPONSE_V9 = FETCH_RESPONSE_V8;
// V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
public static Schema[] schemaVersions() {
return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9};
FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10};
}
public static final long INVALID_HIGHWATERMARK = -1L;
......
......@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
......@@ -113,9 +114,14 @@ public class ProduceRequest extends AbstractRequest {
*/
private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5;
/**
* V7 bumped up to indicate ZStandard capability. (see KIP-110)
*/
private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
public static Schema[] schemaVersions() {
return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6, PRODUCE_REQUEST_V7};
}
public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
......@@ -151,12 +157,12 @@ public class ProduceRequest extends AbstractRequest {
return new Builder(minVersion, maxVersion, acks, timeout, partitionRecords, transactionalId);
}
private Builder(short minVersion,
short maxVersion,
short acks,
int timeout,
Map<TopicPartition, MemoryRecords> partitionRecords,
String transactionalId) {
public Builder(short minVersion,
short maxVersion,
short acks,
int timeout,
Map<TopicPartition, MemoryRecords> partitionRecords,
String transactionalId) {
super(ApiKeys.PRODUCE, minVersion, maxVersion);
this.acks = acks;
this.timeout = timeout;
......@@ -246,6 +252,10 @@ public class ProduceRequest extends AbstractRequest {
if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain record batches with magic version 2");
if (version < 7 && entry.compressionType() == CompressionType.ZSTD) {
throw new InvalidRecordException("Produce requests with version " + version + " are note allowed to " +
"use ZStandard compression");
}
if (iterator.hasNext())
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
......@@ -330,6 +340,7 @@ public class ProduceRequest extends AbstractRequest {
case 4:
case 5:
case 6:
case 7:
return new ProduceResponse(responseMap, throttleTimeMs);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
......@@ -400,6 +411,7 @@ public class ProduceRequest extends AbstractRequest {
case 4:
case 5:
case 6:
case 7:
return RecordBatch.MAGIC_VALUE_V2;
default:
......
......@@ -144,9 +144,14 @@ public class ProduceResponse extends AbstractResponse {
*/
private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5;
/**
* V7 bumped up to indicate ZStandard capability. (see KIP-110)
*/
private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;
public static Schema[] schemaVersions() {
return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1, PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6};
PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6, PRODUCE_RESPONSE_V7};
}
private final Map<TopicPartition, PartitionResponse> responses;
......
......@@ -197,6 +197,9 @@ public class ProducerBatchTest {
if (compressionType == CompressionType.NONE && magic < MAGIC_VALUE_V2)
continue;
if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
continue;
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic,
compressionType, TimestampType.CREATE_TIME, 0L);
......
......@@ -25,6 +25,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class AbstractLegacyRecordBatchTest {
......@@ -208,4 +209,41 @@ public class AbstractLegacyRecordBatchTest {
assertEquals(expectedTimestamp++, record.timestamp());
}
@Test
public void testZStdCompressionTypeWithV0OrV1() {
SimpleRecord[] simpleRecords = new SimpleRecord[] {
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
};
// Check V0
try {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
CompressionType.ZSTD, TimestampType.CREATE_TIME, simpleRecords);
ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
batch.setLastOffset(1L);
batch.iterator();
fail("Can't reach here");
} catch (IllegalArgumentException e) {
assertEquals("ZStandard compression is not supported for magic 0", e.getMessage());
}
// Check V1
try {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
CompressionType.ZSTD, TimestampType.CREATE_TIME, simpleRecords);
ByteBufferLegacyRecordBatch batch = new ByteBufferLegacyRecordBatch(records.buffer());
batch.setLastOffset(1L);
batch.iterator();
fail("Can't reach here");
} catch (IllegalArgumentException e) {
assertEquals("ZStandard compression is not supported for magic 1", e.getMessage());
}
}
}
......@@ -56,6 +56,9 @@ public class FileLogInputStreamTest {
@Test
public void testWriteTo() throws IOException {
if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
return;
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
fileRecords.flush();
......@@ -81,6 +84,9 @@ public class FileLogInputStreamTest {
@Test
public void testSimpleBatchIteration() throws IOException {
if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
return;
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes());
SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes());
......@@ -108,6 +114,9 @@ public class FileLogInputStreamTest {
if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE)
return;
if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
return;
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
......@@ -185,6 +194,9 @@ public class FileLogInputStreamTest {
@Test
public void testBatchIterationIncompleteBatch() throws IOException {
if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
return;
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes());
SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes());
......
......@@ -16,10 +16,13 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
......@@ -30,6 +33,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
......@@ -39,6 +43,8 @@ import static org.junit.Assert.fail;
@RunWith(value = Parameterized.class)
public class MemoryRecordsBuilderTest {
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
private final CompressionType compressionType;
private final int bufferOffset;
......@@ -52,6 +58,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void testWriteEmptyRecordSet() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
......@@ -207,6 +215,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void testCompressionRateV0() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
......@@ -262,6 +272,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void testCompressionRateV1() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
......@@ -293,6 +305,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void buildUsingLogAppendTime() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
......@@ -322,6 +336,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void buildUsingCreateTime() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
......@@ -353,6 +369,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void testAppendedChecksumConsistency() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(512);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
......@@ -397,6 +415,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void writePastLimit() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.position(bufferOffset);
......@@ -442,6 +462,11 @@ public class MemoryRecordsBuilderTest {
@Test
public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
if (compressionType == CompressionType.ZSTD) {
exceptionRule.expect(UnsupportedCompressionTypeException.class);
exceptionRule.expectMessage("Down-conversion of zstandard-compressed batches is not supported");
}
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
compressionType, TimestampType.LOG_APPEND_TIME, 0L);
......@@ -497,6 +522,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void convertToV1WithMixedV0AndV2Data() {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
......@@ -571,6 +598,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
......@@ -588,6 +617,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldResetBufferToInitialPositionOnAbort() throws Exception {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
......@@ -601,6 +632,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exception {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
......@@ -618,6 +651,8 @@ public class MemoryRecordsBuilderTest {
@Test
public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Exception {
expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
......@@ -699,4 +734,10 @@ public class MemoryRecordsBuilderTest {
}
}
private void expectExceptionWithZStd(CompressionType compressionType, byte magic) {
if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) {
exceptionRule.expect(IllegalArgumentException.class);
exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment