Apache Kafka

tap-kafka (airbyte variant)🥈

Open-source distributed event streaming platform

The tap-kafka extractor pulls data from Apache Kafka that can then be sent to a destination using a loader.

Airbyte Usage Notice

This connector uses tap-airbyte-wrapper to call the underlying Airbyte source Docker container. This means you must have Docker installed and running prior to usage. We also recommend using Meltano version 2.13.0 or later.

Container-based connectors can introduce deployment challenges including the potential need to run Docker-in-Docker (not currently supported by services like AWS ECS, Meltano Cloud, etc. see FAQ and Airbyte's ECS deployment docs for more details). Before using this variant we recommend considering if/how you will be able to deploy container-based connectors to production.

For more context on how this Airbyte integration works please checkout out the FAQ in the Meltano Docs.

Alternate Implementations

Getting Started

Prerequisites

If you haven't already, follow the initial steps of the Getting Started guide:

  1. Install Meltano
  2. Create your Meltano project

Installation and configuration

  1. Add the tap-kafka extractor to your project using
    meltano add
    :
  2. meltano add extractor tap-kafka --variant airbyte
  3. Configure the tap-kafka settings using
    meltano config
    :
  4. meltano config tap-kafka set --interactive
  5. Test that extractor settings are valid using
    meltano config
    :
  6. meltano config tap-kafka test

Next steps

If you run into any issues, learn how to get help.

Capabilities

The current capabilities for tap-kafka may have been automatically set when originally added to the Hub. Please review the capabilities when using this extractor. If you find they are out of date, please consider updating them by making a pull request to the YAML file that defines the capabilities for this extractor.

This plugin has the following capabilities:

  • about
  • catalog
  • discover
  • schema-flattening
  • state
  • stream-maps

You can override these capabilities or specify additional ones in your meltano.yml by adding the capabilities key.

Settings

The tap-kafka settings that are known to Meltano are documented below. To quickly find the setting you're looking for, click on any setting name from the list:

You can also list these settings using

meltano config
with the list subcommand:

meltano config tap-kafka list

You can override these settings or specify additional ones in your meltano.yml by adding the settings key.

Please consider adding any settings you have defined locally to this definition on MeltanoHub by making a pull request to the YAML file that defines the settings for this plugin.

Airbyte Config Messageformat Deserialization Strategy (airbyte_config.MessageFormat.deserialization_strategy)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_DESERIALIZATION_STRATEGY
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.deserialization_strategy [value]

Airbyte Config Messageformat Deserialization Type (airbyte_config.MessageFormat.deserialization_type)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_DESERIALIZATION_TYPE

JSON, AVRO


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.deserialization_type [value]

Airbyte Config Messageformat Schema Registry Password (airbyte_config.MessageFormat.schema_registry_password)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_PASSWORD
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_password [value]

Airbyte Config Messageformat Schema Registry Url (airbyte_config.MessageFormat.schema_registry_url)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_URL
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_url [value]

Airbyte Config Messageformat Schema Registry Username (airbyte_config.MessageFormat.schema_registry_username)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_USERNAME
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_username [value]

Airbyte Config Auto Commit Interval Ms (airbyte_config.auto_commit_interval_ms)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_AUTO_COMMIT_INTERVAL_MS

The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config auto_commit_interval_ms [value]

Airbyte Config Auto Offset Reset (airbyte_config.auto_offset_reset)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_AUTO_OFFSET_RESET

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server - earliest: automatically reset the offset to the earliest offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset is found for the consumer's group, anything else: throw exception to the consumer.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config auto_offset_reset [value]

Airbyte Config Bootstrap Servers (airbyte_config.bootstrap_servers)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_BOOTSTRAP_SERVERS

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config bootstrap_servers [value]

Airbyte Config Client Dns Lookup (airbyte_config.client_dns_lookup)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_CLIENT_DNS_LOOKUP

Controls how the client uses DNS lookups. If set to use_all_dns_ips, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again. If set to resolve_canonical_bootstrap_servers_only, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same as use_all_dns_ips. If set to default (deprecated), attempt to connect to the first IP address returned by the lookup, even if the lookup returns multiple IP addresses.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config client_dns_lookup [value]

Airbyte Config Client Id (airbyte_config.client_id)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_CLIENT_ID

An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config client_id [value]

Airbyte Config Enable Auto Commit (airbyte_config.enable_auto_commit)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_ENABLE_AUTO_COMMIT

If true, the consumer's offset will be periodically committed in the background.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config enable_auto_commit [value]

Airbyte Config Group Id (airbyte_config.group_id)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_GROUP_ID

The Group ID is how you distinguish different consumer groups.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config group_id [value]

Airbyte Config Max Poll Records (airbyte_config.max_poll_records)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MAX_POLL_RECORDS

The maximum number of records returned in a single call to poll(). Note, that max_poll_records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config max_poll_records [value]

Airbyte Config Max Records Process (airbyte_config.max_records_process)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_MAX_RECORDS_PROCESS

The Maximum to be processed per execution


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config max_records_process [value]

Airbyte Config Polling Time (airbyte_config.polling_time)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_POLLING_TIME

Amount of time Kafka connector should try to poll for messages.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config polling_time [value]

Airbyte Config Protocol Sasl Jaas Config (airbyte_config.protocol.sasl_jaas_config)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SASL_JAAS_CONFIG

The JAAS login context parameters for SASL connections in the format used by JAAS configuration files., The JAAS login context parameters for SASL connections in the format used by JAAS configuration files.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.sasl_jaas_config [value]

Airbyte Config Protocol Sasl Mechanism (airbyte_config.protocol.sasl_mechanism)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SASL_MECHANISM

PLAIN, The SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.sasl_mechanism [value]

Airbyte Config Protocol Security Protocol (airbyte_config.protocol.security_protocol)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SECURITY_PROTOCOL

PLAINTEXT, SASL_PLAINTEXT, SASL_SSL


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.security_protocol [value]

Airbyte Config Receive Buffer Bytes (airbyte_config.receive_buffer_bytes)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_RECEIVE_BUFFER_BYTES

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config receive_buffer_bytes [value]

Airbyte Config Repeated Calls (airbyte_config.repeated_calls)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_REPEATED_CALLS

The number of repeated calls to poll() if no messages were received.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config repeated_calls [value]

Airbyte Config Request Timeout Ms (airbyte_config.request_timeout_ms)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_REQUEST_TIMEOUT_MS

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config request_timeout_ms [value]

Airbyte Config Retry Backoff Ms (airbyte_config.retry_backoff_ms)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_RETRY_BACKOFF_MS

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config retry_backoff_ms [value]

Airbyte Config Subscription Subscription Type (airbyte_config.subscription.subscription_type)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_SUBSCRIPTION_TYPE

assign, subscribe


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.subscription_type [value]

Airbyte Config Subscription Topic Partitions (airbyte_config.subscription.topic_partitions)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_TOPIC_PARTITIONS
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.topic_partitions [value]

Airbyte Config Subscription Topic Pattern (airbyte_config.subscription.topic_pattern)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_TOPIC_PATTERN
[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.topic_pattern [value]

Airbyte Config Test Topic (airbyte_config.test_topic)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_TEST_TOPIC

The Topic to test in case the Airbyte can consume messages.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config test_topic [value]

Airbyte Spec Image (airbyte_spec.image)

  • Environment variable: TAP_KAFKA_AIRBYTE_SPEC_IMAGE
  • Default Value: airbyte/source-kafka

Airbyte image to run


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_spec image [value]

Airbyte Spec Tag (airbyte_spec.tag)

  • Environment variable: TAP_KAFKA_AIRBYTE_SPEC_TAG
  • Default Value: latest

Airbyte image tag


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_spec tag [value]

Docker Mounts (docker_mounts)

  • Environment variable: TAP_KAFKA_DOCKER_MOUNTS

Docker mounts to make available to the Airbyte container. Expects a list of maps containing source, target, and type as is documented in the docker --mount documentation


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set docker_mounts [value]
Expand To Show SDK Settings

Flattening Enabled (flattening_enabled)

  • Environment variable: TAP_KAFKA_FLATTENING_ENABLED

'True' to enable schema flattening and automatically expand nested properties.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set flattening_enabled [value]

Flattening Max Depth (flattening_max_depth)

  • Environment variable: TAP_KAFKA_FLATTENING_MAX_DEPTH

The max depth to flatten schemas.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set flattening_max_depth [value]

Stream Map Config (stream_map_config)

  • Environment variable: TAP_KAFKA_STREAM_MAP_CONFIG

User-defined config values to be used within map expressions.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set stream_map_config [value]

Stream Maps (stream_maps)

  • Environment variable: TAP_KAFKA_STREAM_MAPS

Config object for stream maps capability. For more information check out Stream Maps.


Configure this setting directly using the following Meltano command:

meltano config tap-kafka set stream_maps [value]

Something missing?

This page is generated from a YAML file that you can contribute changes to.

Edit it on GitHub!

Looking for help?

If you're having trouble getting the tap-kafka extractor to work, read the Airbyte connector FAQ, look for an existing issue in the Airbyte repository, file a new issue, or join the Meltano Slack community and ask for help in the
#plugins-general
channel.

Install

meltano add extractor tap-kafka --variant airbyte

Maintenance Status

  • Maintenance Status
  • Built with the Meltano SDK
  • Based on an Airbyte Connector

Repo

https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-kafka
  • License

Maintainer

  • Airbyte

Meltano Stats

  • Total Executions (Last 3 Months)
  • Projects (Last 3 Months)

Keywords

  • airbyte_protocolmeltano_sdk