KafkaReader
Incubation
Short Description |
Ports |
Metadata |
KafkaReader Attributes |
Details |
Examples |
Compatibility |
See also |
Short Description
KafkaReader reads events (messages) from a Kafka cluster.
Component | Data source | Input ports | Output ports | Each to all outputs | Different to different outputs | Transformation | Transf. req. | Java | CTL | Auto-propagated metadata |
---|---|---|---|---|---|---|---|---|---|---|
KafkaReader | Kafka cluster | 0 | 1 | ⨯ | ⨯ | ✓ | ⨯ | ⨯ | ✓ | ⨯ |
Ports
Port type | Number | Required | Description | Metadata |
---|---|---|---|---|
Output | 0 | ✓ | Consumed Kafka events | KafkaMetadata |
Metadata
KafkaReader does not propagate metadata.
KafkaReader has a metadata template on its output port available.
Table 65.2. KafkaMetadata - Output port 0
Field number | Field name | Data type | Description |
---|---|---|---|
1 | topic | string | Event topic |
2 | partition | integer | Topic partition of the event |
3 | key | string | Event key |
4 | value | string | Event value |
5 | timestamp | date | Event timestamp |
6 | headers | map[string, byte] | Optional event metadata headers |
7 | offset | long | Event offset (order) in its respective partition |
KafkaReader Attributes
Attribute | Req | Description | Possible values |
---|---|---|---|
Basic | |||
Connection | yes |
A Kafka connection to be used. See Kafka connection. | e.g. MyKafkaConnection |
Topic |
A Kafka topic to consume events from. A single topic name or a semicolon-separated list of topic names can be used. Either this attribute or Topic pattern attribute must be specified. If both are specified, Topic pattern takes precedence. | e.g. my-events | topic1;topic2 | |
Topic pattern |
A regular expression pattern to match Kafka topic names to be consumed from. Either this attribute or Topic attribute must be specified. If both are specified, Topic pattern takes precedence. | e.g. my-topics\d+ | |
Group | yes |
A consumer group this consumer belongs to. It is required for parition auto-assignment among multiple consumers.
Corresponds to Kafka | e.g. my-group |
Key type |
Event key data type to use.
Corresponds to Kafka | String (default) | Bytes | |
Value type |
Event value data type to use. Corresponds to Kafkavalue.deserializer consumer property.
| String (default) | Bytes | |
Auto-commit |
When enabled, the consumer's offset will be periodically committed in the background.
Corresponds to Kafka | true (default) | false | |
Maximum time for new event | The maximum waiting time for new Kafka events to appear. | e.g. 500ms | 3s | 1m | |
Requested number of events |
The maximum number of events to be consumed.
The actual number of consumed events might be higher because the events
are consumed in batches.
The size of this batch can be changed by setting the Kafka
max.poll.records consumer property.
| e.g. 1000 | |
Maximum time of reading | The total maximum time of reading. | e.g. 10s | 1m | |
Output mapping | Defines the mapping of results to a standard output port. | ||
Advanced | |||
Assigned topic partition | Topic partition to consume events from. If not specified it will be assigned automatically. | natural numbers | |
Seek to offset | A specific event offset to start consuming from. | natural numbers | |
Seek to boundary | Set to start consuming from the first or last event offset. | Seek to beginning | Seek to end | |
Seek to timestamp | Set to start consuming from a specific event offset based on specified timestamp. | e.g. 2020-12-01 06:00:00 | |
Commit position after seek | Set to commit position after seeking. Practically only affects behavior with auto-commit disabled. | true (default) | false | |
Auto-commit interval |
Time interval in which consumed offsets are committed when auto-commit is enabled.
Corresponds to Kafka | e.g. 500ms | 3s | 1m | |
Consumer configuration | Additional Kafka consumer properties. See Apache Kafka documentation for details. | {fetch.min.bytes=512} |
Details
KafkaReader allows you to subscribe to (read and process) events from Kafka using the Kafka Consumer API.
Using the Group attribute, readers can be organized into consumer groups - each reader within the group reads from a unique partition and the group as a whole consumes all messages from the entire topic.
Besides the configuration available through the component attributes, any other Kafka consumer property can be configured in the Consumer configuration attribute.
Managing committed offsets
- Auto-commit - with auto-commit option enabled (default), the offset is committed periodically in the background (with configurable time interval). The drawback of this option is that it does not allow to rollback in case of further event processing failure.
-
Manual commit - to commit processed events manually you need to disable
the auto-commit option and design your job to use offset of the last
successfully processed event (e.g. stored as a job parameter) in the
Seek to offset
attribute or use a dedicated KafkaCommit component. You can find more details in the KafkaCommit component section.
Examples
Basic reading from a Kafka topic |
Reading from a Kafka topic in parallel |
Basic reading from a Kafka topic
- Create a new Kafka connection to an existing Kafka Cluster by following Creating Kafka Connection.
- Place a new KafkaReader component, in its Connection attribute, select the previously created connection.
-
Specify the topic name to consume from in the Topic
attribute, e.g.
example-topic
. -
Specify an arbitrary consumer group ID in the Group
attribute,e.g.
example-group
. - When the job is executed, the events in the specified Topic will be consumed.
Reading from a Kafka topic in parallel
- Have a Kafka topic with multiple, let's say 3 partitions and some events produced in them.
- Add multiple KafkaReader components to your job, up to the number of partitions in the topic. Having more readers than the number of partitions has no effect, the additional readers will not read any records.
- Configure the reader components, set the Connection, Topic and Group to be the same in each of the components.
- Connect the reader output edges.
- When the job is executed, the readers will get partition assigned automatically and events will get consumed evenly.
Compatibility
Version | Compatibility Notice |
---|---|
5.9.0 |
KafkaReader is available since 5.9.0 in incubation mode. It uses Kafka Consumer API version 2.7.0. |