Version

    KafkaReader

    Incubation

    Short Description
    Ports
    Metadata
    KafkaReader Attributes
    Details
    Examples
    Compatibility
    See also

    Short Description

    KafkaReader reads events (messages) from a Kafka cluster.

    Component Data source Input ports Output ports Each to all outputs Different to different outputs Transformation Transf. req. Java CTL Auto-propagated metadata
    KafkaReaderKafka cluster01

    Ports

    Port typeNumberRequiredDescriptionMetadata
    Output0 Consumed Kafka eventsKafkaMetadata

    Metadata

    KafkaReader does not propagate metadata.

    KafkaReader has a metadata template on its output port available.

    Table 65.2. KafkaMetadata - Output port 0

    Field numberField nameData typeDescription
    1topicstringEvent topic
    2partitioninteger Topic partition of the event
    3keystringEvent key
    4valuestringEvent value
    5timestampdateEvent timestamp
    6headersmap[string, byte]Optional event metadata headers
    7offsetlongEvent offset (order) in its respective partition

    KafkaReader Attributes

    AttributeReqDescriptionPossible values
    Basic
    Connectionyes

    A Kafka connection to be used. See Kafka connection.

    e.g. MyKafkaConnection
    Topic 

    A Kafka topic to consume events from. A single topic name or a semicolon-separated list of topic names can be used.

    Either this attribute or Topic pattern attribute must be specified. If both are specified, Topic pattern takes precedence.

    e.g. my-events | topic1;topic2
    Topic pattern 

    A regular expression pattern to match Kafka topic names to be consumed from.

    Either this attribute or Topic attribute must be specified. If both are specified, Topic pattern takes precedence.

    e.g. my-topics\d+
    Groupyes

    A consumer group this consumer belongs to. It is required for parition auto-assignment among multiple consumers.

    Corresponds to Kafka group.id consumer property.

    e.g. my-group
    Key type 

    Event key data type to use.

    Corresponds to Kafka key.deserializer consumer property.

    String (default) | Bytes
    Value type 

    Event value data type to use.

    Corresponds to Kafka value.deserializer consumer property.
    String (default) | Bytes
    Auto-commit 

    When enabled, the consumer's offset will be periodically committed in the background.

    Corresponds to Kafka enable.auto.commit consumer property.

    true (default) | false
    Maximum time for new event  The maximum waiting time for new Kafka events to appear. e.g. 500ms | 3s | 1m
    Requested number of events  The maximum number of events to be consumed. The actual number of consumed events might be higher because the events are consumed in batches. The size of this batch can be changed by setting the Kafka max.poll.records consumer property. e.g. 1000
    Maximum time of reading The total maximum time of reading.e.g. 10s | 1m
    Output mapping  Defines the mapping of results to a standard output port.  
    Advanced
    Assigned topic partition  Topic partition to consume events from. If not specified it will be assigned automatically. natural numbers
    Seek to offset  A specific event offset to start consuming from. natural numbers
    Seek to boundary Set to start consuming from the first or last event offset.Seek to beginning | Seek to end
    Seek to timestamp  Set to start consuming from a specific event offset based on specified timestamp. e.g. 2020-12-01 06:00:00
    Commit position after seek  Set to commit position after seeking. Practically only affects behavior with auto-commit disabled. true (default) | false
    Auto-commit interval 

    Time interval in which consumed offsets are committed when auto-commit is enabled.

    Corresponds to Kafka auto.commit.interval.ms consumer property.

    e.g. 500ms | 3s | 1m
    Consumer configuration  Additional Kafka consumer properties. See Apache Kafka documentation for details. {fetch.min.bytes=512}

    Details

    KafkaReader allows you to subscribe to (read and process) events from Kafka using the Kafka Consumer API.

    Using the Group attribute, readers can be organized into consumer groups - each reader within the group reads from a unique partition and the group as a whole consumes all messages from the entire topic.

    Besides the configuration available through the component attributes, any other Kafka consumer property can be configured in the Consumer configuration attribute.

    Managing committed offsets

    1. Auto-commit - with auto-commit option enabled (default), the offset is committed periodically in the background (with configurable time interval). The drawback of this option is that it does not allow to rollback in case of further event processing failure.
    2. Manual commit - to commit processed events manually you need to disable the auto-commit option and design your job to use offset of the last successfully processed event (e.g. stored as a job parameter) in the Seek to offset attribute or use a dedicated KafkaCommit component. You can find more details in the KafkaCommit component section.

    Examples

    Basic reading from a Kafka topic
    Reading from a Kafka topic in parallel

    Basic reading from a Kafka topic

    1. Create a new Kafka connection to an existing Kafka Cluster by following Creating Kafka Connection.
    2. Place a new KafkaReader component, in its Connection attribute, select the previously created connection.
    3. Specify the topic name to consume from in the Topic attribute, e.g. example-topic.
    4. Specify an arbitrary consumer group ID in the Group attribute,e.g. example-group.
    5. When the job is executed, the events in the specified Topic will be consumed.

    Reading from a Kafka topic in parallel

    1. Have a Kafka topic with multiple, let's say 3 partitions and some events produced in them.
    2. Add multiple KafkaReader components to your job, up to the number of partitions in the topic. Having more readers than the number of partitions has no effect, the additional readers will not read any records.
    3. Configure the reader components, set the Connection, Topic and Group to be the same in each of the components.
    4. Connect the reader output edges.
    5. When the job is executed, the readers will get partition assigned automatically and events will get consumed evenly.

    Compatibility

    VersionCompatibility Notice
    5.9.0

    KafkaReader is available since 5.9.0 in incubation mode. It uses Kafka Consumer API version 2.7.0.