The tap-kafka Singer tap pulls data from Apache Kafka that can then be sent to a destination using a Singer target.

Alternative variants #

Multiple variants of tap-kafka are available. This document describes the default transferwise variant, which is recommended for new users.

Alternative variants are:

Standalone usage #

Install the package using pip:

pip install pipelinewise-tap-kafka

For additional instructions, refer to the README in the repository.

Usage with Meltano #

Meltano helps you manage your configuration, incremental replication, and scheduled pipelines.

View the Meltano-specific tap-kafka instructions to learn more.

Capabilities #

These capabilities can also be overriden by specifying the capabilities key in your meltano.yml file.

Settings #

tap-kafka requires the configuration of the following settings:

The settings for tap tap-kafka that are known to Meltano are documented below. To quickly find the setting you're looking for, use the Table of Contents at the top of the page.

You can override these settings or specify additional ones in your meltano.yml by adding the settings key. Please consider adding any settings you have defined locally to this definition on MeltanoHub by making a pull request to the YAML file that defines the settings for this tap.

Topic (topic) #

Name of kafka topics to subscribe to

Group ID (group_id) #

The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.

Bootstrap Servers (bootstrap_servers) #

host[:port] string (or list of comma separated host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata.

Primary Keys (primary_keys) #

Optionally you can define primary key for the consumed messages. It requires a column name and /slashed/paths ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message.

Use Message Key (use_message_key) #

(Default true) Defines whether to use Kafka message key as a primary key for the record. Note - custom primary key(s) takes precedence if such defined and use_message_key is set to true.

Initial Start Time (initial_start_time) #

(Default latest) Start time reference of the message consumption if no bookmarked position in state.sjon. One of - latest, earliest or an ISO-8601 formatted timestamp string.

Max Runtime (ms) (max_runtime_ms) #

(Default 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection.

Commit Internal (ms) (commit_interval_ms) #

(Default 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.

Consumer Timeout (ms) (consumer_timeout_ms) #

(Default 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration

Session Timeout (ms) (session_timeout_ms) #

(Default 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.

Heartbeat Interval (ms) (heartbeat_interval_ms) #

(Default 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

Max Poll Records (max_poll_records) #

(Default 500) KafkaConsumer setting. Maximum number of records to poll.

Max Poll Interval (ms) (max_poll_interval_ms) #

(Default 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.

Message Format (message_format) #

(Default json) Supported message formats are json and protobuf.

Protobuf Schema (proto_schema) #

Protobuf message format in .proto syntax. Required if the message_format is protobuf.

Protobuf Classes Directory (proto_classes_dir) #

(Default current working dir)

