Apache Kafka

tap-kafka from transferwise

Open-source distributed event streaming platform

The tap-kafka extractor pulls data from Apache Kafka that can then be sent to a destination using a loader.

Available Variants

Getting Started

Prerequisites

If you haven't already, follow the initial steps of the Getting Started guide:

  1. Install Meltano
  2. Create your Meltano project

Installation and configuration

  1. Add the tap-kafka extractor to your project using
    meltano add
    :
  2. meltano add extractor tap-kafka
  3. Configure the tap-kafka settings using
    meltano config
    :
  4. meltano config tap-kafka set --interactive
  5. Test that extractor settings are valid using
    meltano config
    :
  6. meltano config tap-kafka test

Next steps

If you run into any issues, learn how to get help.

Capabilities

The current capabilities for tap-kafka may have been automatically set when originally added to the Hub. Please review the capabilities when using this extractor. If you find they are out of date, please consider updating them by making a pull request to the YAML file that defines the capabilities for this extractor.

This plugin has the following capabilities:

  • properties
  • discover
  • state

You can override these capabilities or specify additional ones in your meltano.yml by adding the capabilities key.

Settings

The tap-kafka settings that are known to Meltano are documented below. To quickly find the setting you're looking for, click on any setting name from the list:

You can override these settings or specify additional ones in your meltano.yml by adding the settings key.

Please consider adding any settings you have defined locally to this definition on MeltanoHub by making a pull request to the YAML file that defines the settings for this plugin.

Topic (topic)

  • Environment variable: TAP_KAFKA_TOPIC

Name of kafka topics to subscribe to

Group ID (group_id)

  • Environment variable: TAP_KAFKA_GROUP_ID

The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets.

Bootstrap Servers (bootstrap_servers)

  • Environment variable: TAP_KAFKA_BOOTSTRAP_SERVERS

host[:port] string (or list of comma separated host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata.

Primary Keys (primary_keys)

  • Environment variable: TAP_KAFKA_PRIMARY_KEYS

Optionally you can define primary key for the consumed messages. It requires a column name and /slashed/paths ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message.

Use Message Key (use_message_key)

  • Environment variable: TAP_KAFKA_USE_MESSAGE_KEY

(Default true) Defines whether to use Kafka message key as a primary key for the record. Note - custom primary key(s) takes precedence if such defined and use_message_key is set to true.

Initial Start Time (initial_start_time)

  • Environment variable: TAP_KAFKA_INITIAL_START_TIME

(Default latest) Start time reference of the message consumption if no bookmarked position in state.sjon. One of - latest, earliest or an ISO-8601 formatted timestamp string.

Max Runtime (ms) (max_runtime_ms)

  • Environment variable: TAP_KAFKA_MAX_RUNTIME_MS

(Default 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection.

Commit Internal (ms) (commit_interval_ms)

  • Environment variable: TAP_KAFKA_COMMIT_INTERVAL_MS

(Default 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.

Consumer Timeout (ms) (consumer_timeout_ms)

  • Environment variable: TAP_KAFKA_CONSUMER_TIMEOUT_MS

(Default 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration

Session Timeout (ms) (session_timeout_ms)

  • Environment variable: TAP_KAFKA_SESSION_TIMEOUT_MS

(Default 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.

Heartbeat Interval (ms) (heartbeat_interval_ms)

  • Environment variable: TAP_KAFKA_HEARTBEAT_INTERVAL_MS

(Default 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.

Max Poll Records (max_poll_records)

  • Environment variable: TAP_KAFKA_MAX_POLL_RECORDS

(Default 500) KafkaConsumer setting. Maximum number of records to poll.

Max Poll Interval (ms) (max_poll_interval_ms)

  • Environment variable: TAP_KAFKA_MAX_POLL_INTERVAL_MS

(Default 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.

Message Format (message_format)

  • Environment variable: TAP_KAFKA_MESSAGE_FORMAT

(Default json) Supported message formats are json and protobuf.

Protobuf Schema (proto_schema)

  • Environment variable: TAP_KAFKA_PROTO_SCHEMA

Protobuf message format in .proto syntax. Required if the message_format is protobuf.

Protobuf Classes Directory (proto_classes_dir)

  • Environment variable: TAP_KAFKA_PROTO_CLASSES_DIR

(Default current working dir)

Something missing?

This page is generated from a YAML file that you can contribute changes to.

Edit it on GitHub!

Looking for help?

If you're having trouble getting the tap-kafka extractor to work, look for an existing issue in its repository, file a new issue, or join the Meltano Slack community and ask for help in the
#plugins-general
channel.

Install

meltano add extractor tap-kafka

Maintenance Status

  • Maintenance Status
  • Stars
  • Forks
  • Open Issues
  • Open PRs
  • Contributors
  • License

Maintainer

  • Wise

Meltano Stats

  • Total Executions (Last 3 Months)
  • Projects (Last 3 Months)

PyPI Stats

  • PyPI Downloads
  • PyPI Package Version

Keywords

  • database