The tap-kafka Meltano extractor pulls data from Apache Kafka that can then be sent to a destination using a loader.

Alternative variants #

Multiple variants of tap-kafka are available. This document describes the default transferwise variant, which is recommended for new users.

Alternative variants are:

Getting Started #

Prerequisites #

If you haven't already, follow the initial steps of the Getting Started guide:

  1. Install Meltano
  2. Create your Meltano project

Installation and configuration #

  1. Add the tap-kafka extractor to your project using meltano add :

    meltano add extractor tap-kafka
  2. Configure the settings below using meltano config .

Next steps #

Follow the remaining steps of the Getting Started guide:

  1. Select entities and attributes to extract
  2. Add a loader to send data to a destination
  3. Run a data integration (EL) pipeline
If you run into any issues, learn how to get help.

Capabilities #

Settings #

tap-kafka requires the configuration of the following settings:

The settings for extractor tap-kafka that are known to Meltano are documented below. To quickly find the setting you're looking for, use the Table of Contents at the top of the page.

Topic (topic) #

Name of kafka topics to subscribe to

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set topic <topic>

export TAP_KAFKA_TOPIC=<topic>

Group ID (group_id) #

The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set group_id <group_id>

export TAP_KAFKA_GROUP_ID=<group_id>

Bootstrap Servers (bootstrap_servers) #

host[:port] string (or list of comma separated host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set bootstrap_servers <bootstrap_servers>

export TAP_KAFKA_BOOTSTRAP_SERVERS=<bootstrap_servers>

Primary Keys (primary_keys) #

Optionally you can define primary key for the consumed messages. It requires a column name and /slashed/paths ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set primary_keys '{...}'

export TAP_KAFKA_PRIMARY_KEYS='{...}'

Use Message Key (use_message_key) #

(Default true) Defines whether to use Kafka message key as a primary key for the record. Note - custom primary key(s) takes precedence if such defined and use_message_key is set to true.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set use_message_key true

export TAP_KAFKA_USE_MESSAGE_KEY=true

Initial Start Time (initial_start_time) #

(Default latest) Start time reference of the message consumption if no bookmarked position in state.sjon. One of - latest, earliest or an ISO-8601 formatted timestamp string.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set initial_start_time <initial_start_time>

export TAP_KAFKA_INITIAL_START_TIME=<initial_start_time>

Max Runtime (ms) (max_runtime_ms) #

(Default 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set max_runtime_ms 1234

export TAP_KAFKA_MAX_RUNTIME_MS=1234

Commit Internal (ms) (commit_interval_ms) #

(Default 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set commit_interval_ms 1234

export TAP_KAFKA_COMMIT_INTERVAL_MS=1234

Consumer Timeout (ms) (consumer_timeout_ms) #

(Default 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set consumer_timeout_ms 1234

export TAP_KAFKA_CONSUMER_TIMEOUT_MS=1234

Session Timeout (ms) (session_timeout_ms) #

(Default 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set session_timeout_ms 1234

export TAP_KAFKA_SESSION_TIMEOUT_MS=1234

Heartbeat Interval (ms) (heartbeat_interval_ms) #

(Default 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set heartbeat_interval_ms 1234

export TAP_KAFKA_HEARTBEAT_INTERVAL_MS=1234

Max Poll Records (max_poll_records) #

(Default 500) KafkaConsumer setting. Maximum number of records to poll.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set max_poll_records 1234

export TAP_KAFKA_MAX_POLL_RECORDS=1234

Max Poll Interval (ms) (max_poll_interval_ms) #

(Default 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set max_poll_interval_ms 1234

export TAP_KAFKA_MAX_POLL_INTERVAL_MS=1234

Message Format (message_format) #

(Default json) Supported message formats are json and protobuf.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set message_format <message_format>

export TAP_KAFKA_MESSAGE_FORMAT=<message_format>

Protobuf Schema (proto_schema) #

Protobuf message format in .proto syntax. Required if the message_format is protobuf.

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set proto_schema <proto_schema>

export TAP_KAFKA_PROTO_SCHEMA=<proto_schema>

Protobuf Classes Directory (proto_classes_dir) #

(Default current working dir)

How to use #

Manage this setting using meltano config or an environment variable:

meltano config tap-kafka set proto_classes_dir <proto_classes_dir>

export TAP_KAFKA_PROTO_CLASSES_DIR=<proto_classes_dir>

Looking for help? #

If you're having trouble getting the tap-kafka extractor to work, look for an existing issue in its repository, file a new issue, or join the Meltano Slack community and ask for help in the #plugins-general channel.

Found an issue on this page? #

This page is generated from a YAML file that you can contribute changes to. Edit it on GitHub!