Version

    KafkaWriter

    Incubation

    Short Description
    Ports
    Metadata
    KafkaWriter Attributes
    Details
    Compatibility
    See also

    Short Description

    KafkaWriter writes events (messages) to a Kafka cluster.

    Component Data output Input ports Output ports Transformation Transf. req. Java CTL Auto-propagated metadata
    KafkaWriterKafka cluster12

    Ports

    Port typeNumberRequiredDescriptionMetadata
    Input0 For records to be written to a Kafka clusterInputMetadata
    Output0  InputMetadata
    Output0 For successfully written recordsSuccessMetadata
    Output1 For rejected recordsErrorMetadata

    If an error port is not connected and an error occurs, the component fails.

    Metadata

    KafkaWriter propagates metadata from left to right, i.e. from input port to output port.

    KafkaWriter has a metadata template on its input and error ports.

    The input port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp).

    The error port metadata template contains fields required for creation of a Kafka record (topic, partition, key, value and timestamp) plus additional fields offset, errorMessage and stacktrace.

    Table 65.3. InputMetadata - Input port 0

    Field numberField nameData typeDescription
    1topicstring Topic the event should be written to
    2partitionstring Topic partition the event should be written to
    3keystringEvent key
    4valueintegerEvent value
    5timestampdateEvent timestamp
    6headersmap[string, byte]Optional event metadata headers

    Table 65.4. SuccessMetadata - Output port 0

    Field numberField nameData typeDescription
    1topicstringTopic the event was written to
    2partitionstring Topic partition the event was written to
    3keystringEvent key
    4valueintegerEvent value
    5timestampdateTimestamp when the event was written
    6headersmap[string, byte]Optional event metadata headers
    7offsetlong Offset of the written event in its partition

    You can use the fields partition, offset and timestamp from SuccessMetadata in the Output mapping to get the actual values of the written record. These fields will often not be present in the input data.

    Table 65.5. Additional fields of ErrorMetadata - Output port 1

    Field numberField nameData typeDescription
    7errorMessagestring Text of the message related to the writing failure
    8stacktracestring The whole stacktrace of the writing failure

    The ErrorMetadata are the same as SuccessMetadata with additional fields for troubleshooting of failed records - the error message and stacktrace.

    KafkaWriter Attributes

    AttributeReqDescriptionPossible values
    Basic
    Connectionyes

    A Kafka connection to be used. See Kafka connection.

    e.g. MyKafkaConnection
    Topic  A Kafka topic to produce events to. If not defined here, it must be provided in the Input mapping. my-events
    Partition  A topic partition to use. Can be overridden in the Input mapping. If not specified, a partition is selected automatically. 1
    Key type 

    Event key data type to use.

    Corresponds to Kafka key.serializer producer property.

    String (default) | Bytes
    Value type 

    Event value data type to use.

    Corresponds to Kafka value.serializer producer property.

    String (default) | Bytes
    Input mapping  Defines the mapping of input metadata fields to Kafka record fields.  
    Output mapping  Defines the mapping of Kafka record fields to output metadata fields.  
    Error mapping  Defines the mapping of rejected record fields to metadata fields on the error port.  
    Advanced
    Maximum number of rejected records  Number of records that can be rejected before a graph failure. The default (empty) means unlimited number of rejected records. e.g. 10
    Number of acknowledgments 

    Number of acknowledgments the producer requires the leader to have received before considering a request complete.

    Corresponds to Kafka acks producer property.

     
    Compression type  Compression type for all data generated by the producer.

    Corresponds to Kafka compression.type producer property.

     
    Producer configuration  Additional Kafka producer properties. See Apache Kafka documentation for details. {batch.size=2048}

    Details

    KafkaWriter allows you to produce (write) events to Kafka using the Kafka Producer API.

    The events coming from the input port are written to a Kafka topic specified either via the Topic attribute or in the Input mapping.

    Topic partition is assigned automatically (by default), or can be also overridden via the Partition attribute or in the Input mapping.

    If an output port is connected, records successfully written to Kafka are also written to the connected edge. Similarly, failed records are written to the error port edge, if connected.

    Besides the configuration available through the component attributes, any other Kafka producer property can be configured in the Producer configuration attribute.

    Examples

    Writing headers with string values

    Writing headers with string values

    The metadata field for event headers is of type map[string, byte]. To write headers with string values you have to do the type conversion, for example in the component's Input mapping.

    1. Let's say you have a metadata on the writer's input port with fields sourceId and traceId that you want to map to event's headers.
    2. In the writer's Input mapping, map these fields to the headers field map values. Convert the values to byte (use the proper charset):
      $out.0.headers = { "sourceId" -> str2byte($in.0.sourceId, "UTF-8"), "traceId" -> str2byte($in.0.traceId, "UTF-8") };

    Compatibility

    VersionCompatibility Notice
    5.9.0

    KafkaReader is available since 5.9.0 in incubation mode. It uses Kafka Producer API version 2.7.0.