KafkaWriter
Short Description |
Ports |
Metadata |
KafkaWriter Attributes |
Details |
See also |
Short Description
KafkaWriter writes events (messages) to a Kafka cluster.
Component | Data output | Input ports | Output ports | Transformation | Transf. req. | Java | CTL | Auto-propagated metadata |
---|---|---|---|---|---|---|---|---|
KafkaWriter | Kafka cluster | 1 | 2 | ✓ | ⨯ | ⨯ | ✓ | ✓ |
Ports
Port type | Number | Required | Description | Metadata |
---|---|---|---|---|
Input | 0 | ✓ | For records to be written to a Kafka cluster | InputMetadata |
Output | 0 | ⨯ | InputMetadata | |
Output | 0 | ⨯ | For successfully written records | SuccessMetadata |
Output | 1 | ⨯ | For rejected records | ErrorMetadata |
If an error port is not connected and an error occurs, the component fails.
Metadata
KafkaWriter propagates metadata from left to right, i.e. from input port to output port.
KafkaWriter has a metadata template on its input and error ports.
The input port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp).
The error port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp) plus additional fields offset, errorMessage and stacktrace.
Table 56.4. InputMetadata - Input port 0
Field number | Field name | Data type | Description |
---|---|---|---|
1 | topic | string | Topic the event should be written to |
2 | partition | string | Topic partition the event should be written to |
3 | key | string | Event key |
4 | value | integer | Event value |
5 | timestamp | date | Event timestamp |
6 | headers | map[string, byte] | Optional event metadata headers |
Table 56.5. SuccessMetadata - Output port 0
Field number | Field name | Data type | Description |
---|---|---|---|
1 | topic | string | Topic the event was written to |
2 | partition | string | Topic partition the event was written to |
3 | key | string | Event key |
4 | value | integer | Event value |
5 | timestamp | date | Timestamp when the event was written |
6 | headers | map[string, byte] | Optional event metadata headers |
7 | offset | long | Offset of the written event in its partition |
You can use the fields partition, offset and timestamp from SuccessMetadata in the Output mapping to get the actual values of the written record. These fields will often not be present in the input data.
Table 56.6. Additional fields of ErrorMetadata - Output port 1
Field number | Field name | Data type | Description |
---|---|---|---|
7 | errorMessage | string | Text of the message related to the writing failure |
8 | stacktrace | string | The whole stacktrace of the writing failure |
The ErrorMetadata are the same as SuccessMetadata with additional fields for troubleshooting of failed records - the error message and stacktrace.
KafkaWriter Attributes
Attribute | Req | Description | Possible values |
---|---|---|---|
Basic | |||
Connection | yes |
A Kafka connection to be used. See Kafka connection. | e.g. MyKafkaConnection |
Topic | A Kafka topic to produce events to. If not defined here, it must be provided in the Input mapping. | my-events | |
Partition | A topic partition to use. Can be overridden in the Input mapping. If not specified, a partition is selected automatically. | 1 | |
Key type |
Event key data type to use.
Corresponds to Kafka | String (default) | Bytes | |
Value type |
Event value data type to use.
Corresponds to Kafka | String (default) | Bytes | |
Input mapping | Defines the mapping of input metadata fields to Kafka record fields. | ||
Output mapping | Defines the mapping of Kafka record fields to output metadata fields. | ||
Error mapping | Defines the mapping of rejected record fields to metadata fields on the error port. | ||
Advanced | |||
Maximum number of rejected records | Number of records that can be rejected before a graph failure. The default (empty) means unlimited number of rejected records. | e.g. 10 | |
Number of acknowledgments |
Number of acknowledgments the producer requires the leader to have received before considering a request complete.
Corresponds to Kafka | ||
Compression type |
Compression type for all data generated by the producer.
Corresponds to Kafka | ||
Producer configuration | Additional Kafka producer properties. See Apache Kafka documentation for details. | {batch.size=2048} |
Details
KafkaWriter allows you to produce (write) events to Kafka using the Kafka Producer API.
The events coming from the input port are written to a Kafka topic specified either via the Topic attribute or in the Input mapping.
Topic partition is assigned automatically (by default), or can be also overridden via the Partition attribute or in the Input mapping.
If an output port is connected, records successfully written to Kafka are also written to the connected edge. Similarly, failed records are written to the error port edge, if connected.
Besides the configuration available through the component attributes, any other Kafka producer property can be configured in the Producer configuration attribute.
Examples
Writing headers with string values |
Writing headers with string values
The metadata field for event headers is of type map[string, byte]
. To write headers with string values
you have to do the type conversion, for example in the component's Input mapping.
-
Let's say you have a metadata on the writer's input port with fields
sourceId
andtraceId
that you want to map to event's headers. -
In the writer's Input mapping, map these fields to the
headers
field map values. Convert the values to byte (use the proper charset):$out.0.headers = { "sourceId" -> str2byte($in.0.sourceId, "UTF-8"), "traceId" -> str2byte($in.0.traceId, "UTF-8") };