Skip to content
Snippets Groups Projects
Commit 527b98d8 authored by Liquan Pei's avatar Liquan Pei Committed by Ewen Cheslack-Postava
Browse files

KAFKA-3421: Connect developer guide update and several fixes

This is a follow up of KAKFA-3421 to update the connect developer guide to include the configuration validation. Also includes a couple of minor fixes.

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1366 from Ishiihara/connect-dev-doc
parent 56361193
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
......@@ -36,8 +37,15 @@ internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs
status.storage.topic=connect-status
\ No newline at end of file
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
\ No newline at end of file
......@@ -53,15 +53,17 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or
&gt; bin/connect-distributed.sh config/connect-distributed.properties
</pre>
The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets. In particular, the following configuration parameters are critical to set before starting your cluster:
The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage.
In particular, the following configuration parameters are critical to set before starting your cluster:
<ul>
<li><code>group.id</code> (default <code>connect-cluster</code>) - unique name for the cluster, used in forming the Connect cluster group; note that this <b>must not conflict</b> with consumer group IDs</li>
<li><code>config.storage.topic</code> (default <code>connect-configs</code>) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic</li>
<li><code>offset.storage.topic</code> (default <code>connect-offsets</code>) - topic to use for ; this topic should have many partitions and be replicated</li>
<li><code>config.storage.topic</code> (default <code>connect-configs</code>) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.</li>
<li><code>offset.storage.topic</code> (default <code>connect-offsets</code>) - topic to use for storing offsets; this topic should have many partitions and be replicated</li>
<li><code>status.storage.topic</code> (default <code>connect-status</code>) - topic to use for storing statuses; this topic can have multiple partitions and should be replicated</li>
</ul>
Note that in distributed mode the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors.
Note that in distributed mode the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors.
<h4><a id="connect_configuring" href="#connect_configuring">Configuring Connectors</a></h4>
......@@ -158,7 +160,7 @@ The easiest method to fill in is <code>getTaskClass()</code>, which defines the
<pre>
@Override
public Class<? extends Task> getTaskClass() {
public Class&lt;? extends Task&gt; getTaskClass() {
return FileStreamSourceTask.class;
}
</pre>
......@@ -179,7 +181,7 @@ public void stop() {
}
</pre>
Finally, the real core of the implementation is in <code>getTaskConfigs()</code>. In this case we're only
Finally, the real core of the implementation is in <code>getTaskConfigs()</code>. In this case we are only
handling a single file, so even though we may be permitted to generate more tasks as per the
<code>maxTasks</code> argument, we return a list with only one entry:
......@@ -225,7 +227,7 @@ public class FileStreamSourceTask extends SourceTask&lt;Object, Object&gt; {
@Override
public synchronized void stop() {
stream.close()
stream.close();
}
</pre>
......@@ -241,8 +243,8 @@ public List&lt;SourceRecord&gt; poll() throws InterruptedException {
while (streamValid(stream) &amp;&amp; records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
Map&lt;String, Object&gt; sourcePartition = Collections.singletonMap("filename", filename);
Map&lt;String, Object&gt; sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
......@@ -267,11 +269,13 @@ The previous section described how to implement a simple <code>SourceTask</code>
<pre>
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) { ... }
public abstract void put(Collection&lt;SinkRecord&gt; records);
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
public abstract void put(Collection&lt;SinkRecord&gt; records);
public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
</pre>
The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>: Kafka topic, partition, offset and the event key and value.
......@@ -305,8 +309,8 @@ Kafka Connect is intended to define bulk data copying jobs, such as copying an e
Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the <code>ConnectorContext</code> object that reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
<pre>
if (inputsChanged())
this.context.requestTaskReconfiguration();
if (inputsChanged())
this.context.requestTaskReconfiguration();
</pre>
The framework will promptly request new configuration information and update the tasks, allowing them to gracefully commit their progress before reconfiguring them. Note that in the <code>SourceConnector</code> this monitoring is currently left up to the connector implementation. If an extra thread is required to perform this monitoring, the connector must allocate it itself.
......@@ -315,6 +319,26 @@ Ideally this code for monitoring changes would be isolated to the <code>Connecto
<code>SinkConnectors</code> usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. <code>SinkTasks</code> should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handle in these cases may be conflicts between multiple <code>SinkTasks</code> seeing a new input stream for the first time and simultaneously trying to create the new resource. <code>SinkConnectors</code>, on the other hand, will generally require no special code for handling a dynamic set of streams.
<h4><a id="connect_configs" href="#connect_configs">Connect Configuration Validation</a></h4>
Kafka Connect allows you to validate connector configurations before submitting a connector to be executed and can provide feedback about errors and recommended values. To take advantage of this, connector developers need to provide an implementation of <code>config()</code> to expose the configuration definition to the framework.
The following code in <code>FileStreamSourceConnector</code> defines the configuration and exposes it to the framework.
<pre>
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
public ConfigDef config() {
return CONFIG_DEF;
}
</pre>
<code>ConfigDef</code> class is used for specifying the set of expected configurations. For each configuration, you can specify the name, the type, the default value, the documentation, the group information, the order in the group, the width of the configuration value and the name suitable for display in the UI. Plus, you can provide special validation logic used for single configuration validation by overriding the <code>Validator</code> class. Moreover, as there may be dependencies between configurations, for example, the valid values and visibility of a configuration may change according to the values of other configurations. To handle this, <code>ConfigDef</code> allows you to specify the dependents of a configuration and to provide an implementation of <code>Recommender</code> to get valid values and set visibility of a configuration given the current configuration values.
Also, the <code>validate()</code> method in <code>Connector</code> provides a default validation implementation which returns a list of allowed configurations together with configuration errors and recommended values for each configuration. However, it does not use the recommended values for configuration validation. You may provide an override of the default implementation for customized configuration validation, which may use the recommended values.
<h4><a id="connect_schemas" href="#connect_schemas">Working with Schemas</a></h4>
The FileStream connectors are good examples because they are simple, but they also have trivially structured data -- each line is just a string. Almost all practical connectors will need schemas with more complex data formats.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment