Apache Kafka
Table of Contents
- Alternative variants
- Getting Started
- Capabilities
- Settings
-
Topic (
topic
) -
Group ID (
group_id
) -
Bootstrap Servers (
bootstrap_servers
) -
Primary Keys (
primary_keys
) -
Use Message Key (
use_message_key
) -
Initial Start Time (
initial_start_time
) -
Max Runtime (ms) (
max_runtime_ms
) -
Commit Internal (ms) (
commit_interval_ms
) -
Consumer Timeout (ms) (
consumer_timeout_ms
) -
Session Timeout (ms) (
session_timeout_ms
) -
Heartbeat Interval (ms) (
heartbeat_interval_ms
) -
Max Poll Records (
max_poll_records
) -
Max Poll Interval (ms) (
max_poll_interval_ms
) -
Message Format (
message_format
) -
Protobuf Schema (
proto_schema
) -
Protobuf Classes Directory (
proto_classes_dir
)
-
Topic (
- Looking for help?
The tap-kafka
Meltano extractor pulls data from Apache Kafka that can then be sent to a destination using a loader.
Alternative variants #
Multiple
variants
of tap-kafka
are available.
This document describes the default transferwise
variant,
which is recommended for new users.
Alternative variants are:
Getting Started #
Prerequisites #
If you haven't already, follow the initial steps of the Getting Started guide:
Installation and configuration #
-
Add the
tap-kafka
extractor to your project usingmeltano add
:meltano add extractor tap-kafka
-
Configure the settings below using
meltano config
.
Next steps #
Follow the remaining steps of the Getting Started guide:
- Select entities and attributes to extract
- Add a loader to send data to a destination
- Run a data integration (EL) pipeline
Capabilities #
Settings #
tap-kafka
requires the
configuration
of the following settings:
The settings for extractor tap-kafka
that are known to Meltano are documented below.
To quickly find the
setting you're looking for, use the Table of Contents at
the top of the page.
Topic (topic
)
#
-
Environment variable:
TAP_KAFKA_TOPIC
Name of kafka topics to subscribe to
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set topic <topic>
export TAP_KAFKA_TOPIC=<topic>
Group ID (group_id
)
#
-
Environment variable:
TAP_KAFKA_GROUP_ID
The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set group_id <group_id>
export TAP_KAFKA_GROUP_ID=<group_id>
Bootstrap Servers (bootstrap_servers
)
#
-
Environment variable:
TAP_KAFKA_BOOTSTRAP_SERVERS
host[:port] string (or list of comma separated host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set bootstrap_servers <bootstrap_servers>
export TAP_KAFKA_BOOTSTRAP_SERVERS=<bootstrap_servers>
Primary Keys (primary_keys
)
#
-
Environment variable:
TAP_KAFKA_PRIMARY_KEYS
Optionally you can define primary key for the consumed messages. It requires a column name and /slashed/paths ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set primary_keys '{...}'
export TAP_KAFKA_PRIMARY_KEYS='{...}'
Use Message Key (use_message_key
)
#
-
Environment variable:
TAP_KAFKA_USE_MESSAGE_KEY
(Default true) Defines whether to use Kafka message key as a primary key for the record. Note - custom primary key(s) takes precedence if such defined and use_message_key is set to true.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set use_message_key true
export TAP_KAFKA_USE_MESSAGE_KEY=true
Initial Start Time (initial_start_time
)
#
-
Environment variable:
TAP_KAFKA_INITIAL_START_TIME
(Default latest) Start time reference of the message consumption if no bookmarked position in state.sjon. One of - latest, earliest or an ISO-8601 formatted timestamp string.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set initial_start_time <initial_start_time>
export TAP_KAFKA_INITIAL_START_TIME=<initial_start_time>
Max Runtime (ms) (max_runtime_ms
)
#
-
Environment variable:
TAP_KAFKA_MAX_RUNTIME_MS
(Default 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set max_runtime_ms 1234
export TAP_KAFKA_MAX_RUNTIME_MS=1234
Commit Internal (ms) (commit_interval_ms
)
#
-
Environment variable:
TAP_KAFKA_COMMIT_INTERVAL_MS
(Default 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set commit_interval_ms 1234
export TAP_KAFKA_COMMIT_INTERVAL_MS=1234
Consumer Timeout (ms) (consumer_timeout_ms
)
#
-
Environment variable:
TAP_KAFKA_CONSUMER_TIMEOUT_MS
(Default 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set consumer_timeout_ms 1234
export TAP_KAFKA_CONSUMER_TIMEOUT_MS=1234
Session Timeout (ms) (session_timeout_ms
)
#
-
Environment variable:
TAP_KAFKA_SESSION_TIMEOUT_MS
(Default 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set session_timeout_ms 1234
export TAP_KAFKA_SESSION_TIMEOUT_MS=1234
Heartbeat Interval (ms) (heartbeat_interval_ms
)
#
-
Environment variable:
TAP_KAFKA_HEARTBEAT_INTERVAL_MS
(Default 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set heartbeat_interval_ms 1234
export TAP_KAFKA_HEARTBEAT_INTERVAL_MS=1234
Max Poll Records (max_poll_records
)
#
-
Environment variable:
TAP_KAFKA_MAX_POLL_RECORDS
(Default 500) KafkaConsumer setting. Maximum number of records to poll.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set max_poll_records 1234
export TAP_KAFKA_MAX_POLL_RECORDS=1234
Max Poll Interval (ms) (max_poll_interval_ms
)
#
-
Environment variable:
TAP_KAFKA_MAX_POLL_INTERVAL_MS
(Default 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set max_poll_interval_ms 1234
export TAP_KAFKA_MAX_POLL_INTERVAL_MS=1234
Message Format (message_format
)
#
-
Environment variable:
TAP_KAFKA_MESSAGE_FORMAT
(Default json) Supported message formats are json and protobuf.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set message_format <message_format>
export TAP_KAFKA_MESSAGE_FORMAT=<message_format>
Protobuf Schema (proto_schema
)
#
-
Environment variable:
TAP_KAFKA_PROTO_SCHEMA
Protobuf message format in .proto syntax. Required if the message_format is protobuf.
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set proto_schema <proto_schema>
export TAP_KAFKA_PROTO_SCHEMA=<proto_schema>
Protobuf Classes Directory (proto_classes_dir
)
#
-
Environment variable:
TAP_KAFKA_PROTO_CLASSES_DIR
(Default current working dir)
How to use #
Manage this setting using
meltano config
or an
environment variable:
meltano config tap-kafka set proto_classes_dir <proto_classes_dir>
export TAP_KAFKA_PROTO_CLASSES_DIR=<proto_classes_dir>
Looking for help? #
If you're having trouble getting the
tap-kafka
extractor to work, look for an
existing issue in its repository, file a new issue,
or
join the Meltano Slack community
and ask for help in the #plugins-general
channel.
Found an issue on this page? #
This page is generated from a YAML file that you can contribute changes to. Edit it on GitHub!