Version

    KafkaWriter

    KafkaWriter 64x64

    Short Description

    Ports

    Metadata

    KafkaWriter Attributes

    Details

    See also

    Short Description

    KafkaWriter writes events (messages) to a Kafka cluster.

    Data output Input ports Output ports Transformation Transf. req. Java CTL Auto-propagated metadata

    Kafka cluster

    1

    2

    Ports

    Port type Number Required Description Metadata

    Input

    0

    For records to be written to a Kafka cluster

    InputMetadata

    Output

    0

    InputMetadata

    Output

    0

    For successfully written records

    SuccessMetadata

    Output

    1

    For rejected records

    ErrorMetadata

    If an error port is not connected and an error occurs, the component fails.

    Metadata

    KafkaWriter propagates metadata from left to right, i.e. from input port to output port.

    KafkaWriter has a metadata template on its input and error ports.

    The input port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp).

    The error port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp) plus additional fields offset, errorMessage and stacktrace.

    Table 46. InputMetadata - Input port 0
    Field number Field name Data type Description

    1

    topic

    string

    Topic the event should be written to

    2

    partition

    string

    Topic partition the event should be written to

    3

    key

    string

    Event key

    4

    value

    integer

    Event value

    5

    timestamp

    date

    Event timestamp

    6

    headers

    map[string, byte]

    Optional event metadata headers

    Table 47. SuccessMetadata - Output port 0
    Field number Field name Data type Description

    1

    topic

    string

    Topic the event was written to

    2

    partition

    string

    Topic partition the event was written to

    3

    key

    string

    Event key

    4

    value

    integer

    Event value

    5

    timestamp

    date

    Timestamp when the event was written

    6

    headers

    map[string, byte]

    Optional event metadata headers

    7

    offset

    long

    Offset of the written event in its partition

    You can use the fields partition, offset and timestamp from SuccessMetadata in the Output mapping to get the actual values of the written record. These fields will often not be present in the input data.

    Table 48. Additional fields of ErrorMetadata - Output port 1
    Field number Field name Data type Description

    7

    errorMessage

    string

    Text of the message relatedto the writing failure

    8

    stacktrace

    string

    The whole stacktrace of the writing failure

    The ErrorMetadata are the same as SuccessMetadata with additional fields for troubleshooting of failed records - the error message and stacktrace.

    KafkaWriter Attributes

    Attribute Req Description Possible values

    Basic

    Connection

    yes

    A Kafka connection to be used. See Kafka Connections.

    e.g. MyKafkaConnection

    Topic

    A Kafka topic to produce events to. If not defined here,it must be provided in the Input mapping.

    my-events

    Partition

    A topic partition to use. Can be overriddenin the Input mapping.If not specified, a partition is selected automatically.

    1

    Key type

    Event key data type to use.

    Corresponds to Kafka key.serializer producer property.

    String (default) | Bytes

    Value type

    Event value data type to use.

    Corresponds to Kafka value.serializer producer property.

    String (default) | Bytes

    Input mapping

    Defines the mapping of input metadata fieldsto Kafka record fields.

    Output mapping

    Defines the mapping of Kafka record fieldsto output metadata fields.

    Error mapping

    Defines the mapping of rejected record fieldsto metadata fields on the error port.

    Advanced

    Maximum number of rejected records

    Number of records that can be rejected before a graph failure.The default (empty) means unlimited number of rejected records.

    e.g. 10

    Number of acknowledgments

    Number of acknowledgments the producer requires the leader to have received before considering a request complete.

    Corresponds to Kafka acks producer property.

    Compression type

    Compression type for all data generated by the producer.

    Corresponds to Kafka compression.type producer property.

    Producer configuration

    Additional Kafka producer properties. See Apache Kafka documentation for details.

    {batch.size=2048}

    Details

    KafkaWriter allows you to produce (write) events to Kafka using the Kafka Producer API.

    The events coming from the input port are written to a Kafka topic specified either via the Topic attribute or in the Input mapping.

    Topic partition is assigned automatically (by default), or can be also overridden via the Partition attribute or in the Input mapping.

    If an output port is connected, records successfully written to Kafka are also written to the connected edge. Similarly, failed records are written to the error port edge, if connected.

    Besides the configuration available through the component attributes, any other Kafka producer property can be configured in the Producer configuration attribute.

    Examples

    Writing headers with string values

    Writing headers with string values

    The metadata field for event headers is of type map[string, byte]. To write headers with string values you have to do the type conversion, for example in the component’s Input mapping.

    1. Let’s say you have a metadata on the writer’s input port with fields sourceId and traceId that you want to map to event’s headers.

    2. In the writer’s

      Input mapping , map these fields to the headers field map values. Convert the values to byte (use the proper charset):

      $out.0.headers = { "sourceId" -> str2byte($in.0.sourceId, "UTF-8"), "traceId" -> str2byte($in.0.traceId, "UTF-8") };