KafkaWriter

KafkaWriter 64x64

Short Description

Ports

Metadata

KafkaWriter Attributes

Details

See also

Short Description

KafkaWriter writes events (messages) to a Kafka cluster.

Component Data output Input ports Output ports Transformation Transf. req. Java CTL Auto-propagated metadata

KafkaWriter

Kafka cluster

1

2

Ports

Port type Number Required Description Metadata

Input

0

For records to be written to a Kafka cluster

InputMetadata

Output

0

InputMetadata

Output

0

For successfully written records

SuccessMetadata

Output

1

For rejected records

ErrorMetadata

If an error port is not connected and an error occurs, the component fails.

Metadata

KafkaWriter propagates metadata from left to right, i.e. from input port to output port.

KafkaWriter has a metadata template on its input and error ports.

The input port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp).

The error port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp) plus additional fields offset, errorMessage and stacktrace.

Field number Field name Data type Description
Table 46. InputMetadata - Input port 0

1

topic

string

Topic the event should be written to

2

partition

string

Topic partition the event should be written to

3

key

string

Event key

4

value

integer

Event value

5

timestamp

date

Event timestamp

6

headers

map[string, byte]

Optional event metadata headers

Field number Field name Data type Description
Table 47. SuccessMetadata - Output port 0

1

topic

string

Topic the event was written to

2

partition

string

Topic partition the event was written to

3

key

string

Event key

4

value

integer

Event value

5

timestamp

date

Timestamp when the event was written

6

headers

map[string, byte]

Optional event metadata headers

7

offset

long

Offset of the written event in its partition

You can use the fields partition, offset and timestamp from SuccessMetadata in the Output mapping to get the actual values of the written record. These fields will often not be present in the input data.

Field number Field name Data type Description
Table 48. Additional fields of ErrorMetadata - Output port 1

7

errorMessage

string

Text of the message related to the writing failure

8

stacktrace

string

The whole stacktrace of the writing failure

The ErrorMetadata are the same as SuccessMetadata with additional fields for troubleshooting of failed records - the error message and stacktrace.

KafkaWriter Attributes

Attribute Req Description Possible values

Basic

Connection

yes

A Kafka connection to be used. See Kafka Connections.

e.g. MyKafkaConnection

Topic

A Kafka topic to produce events to. If not defined here, it must be provided in the Input mapping.

my-events

Partition

A topic partition to use. Can be overridden in the Input mapping. If not specified, a partition is selected automatically.

1

Key type

Event key data type to use.

Corresponds to Kafka key.serializer producer property.

String (default) | Bytes

Value type

Event value data type to use.

Corresponds to Kafka value.serializer producer property.

String (default) | Bytes

Input mapping

Defines the mapping of input metadata fields to Kafka record fields.

Output mapping

Defines the mapping of Kafka record fields to output metadata fields.

Error mapping

Defines the mapping of rejected record fields to metadata fields on the error port.

Advanced

Maximum number of rejected records

Number of records that can be rejected before a graph failure. The default (empty) means unlimited number of rejected records.

e.g. 10

Number of acknowledgments

Number of acknowledgments the producer requires the leader to have received before considering a request complete.

Corresponds to Kafka acks producer property.

Compression type

Compression type for all data generated by the producer.

Corresponds to Kafka compression.type producer property.

Producer configuration

Additional Kafka producer properties. See Apache Kafka documentation for details.

{batch.size=2048}

Details

KafkaWriter allows you to produce (write) events to Kafka using the Kafka Producer API.

The events coming from the input port are written to a Kafka topic specified either via the Topic attribute or in the Input mapping.

Topic partition is assigned automatically (by default), or can be also overridden via the Partition attribute or in the Input mapping.

If an output port is connected, records successfully written to Kafka are also written to the connected edge. Similarly, failed records are written to the error port edge, if connected.

Besides the configuration available through the component attributes, any other Kafka producer property can be configured in the Producer configuration attribute.

Examples

Writing headers with string values

Writing headers with string values

The metadata field for event headers is of type map[string, byte]. To write headers with string values you have to do the type conversion, for example in the component’s Input mapping.

  1. Let’s say you have a metadata on the writer’s input port with fields sourceId and traceId that you want to map to event’s headers.

  2. In the writer’s

    Input mapping , map these fields to the headers field map values. Convert the values to byte (use the proper charset):

    $out.0.headers = { "sourceId" -> str2byte($in.0.sourceId, "UTF-8"), "traceId" -> str2byte($in.0.traceId, "UTF-8") };