Apache Kafka

tap-kafka (airbyte variant)🥈

Open-source distributed event streaming platform

The tap-kafka extractor pulls data from Apache Kafka that can then be sent to a destination using a loader.

Airbyte Usage Notice

This connector uses tap-airbyte-wrapper to call the underlying Airbyte source Docker container. This means you must have Docker installed and running prior to usage. We also recommend using Meltano version 2.13.0 or later.

Container-based connectors can introduce deployment challenges including the potential need to run Docker-in-Docker (not currently supported by services like AWS ECS, Meltano Cloud, etc. see FAQ and Airbyte's ECS deployment docs for more details). Before using this variant we recommend considering if/how you will be able to deploy container-based connectors to production.

For more context on how this Airbyte integration works please checkout out the FAQ in the Meltano Docs.

Alternate Implementations

Getting Started


If you haven't already, follow the initial steps of the Getting Started guide:

  1. Install Meltano
  2. Create your Meltano project

Installation and configuration

  1. Add the tap-kafka extractor to your project using
    meltano add
  2. meltano add extractor tap-kafka --variant airbyte
  3. Configure the tap-kafka settings using
    meltano config
  4. meltano config tap-kafka set --interactive
  5. Test that extractor settings are valid using
    meltano config
  6. meltano config tap-kafka test

Next steps

If you run into any issues, learn how to get help.


The current capabilities for tap-kafka may have been automatically set when originally added to the Hub. Please review the capabilities when using this extractor. If you find they are out of date, please consider updating them by making a pull request to the YAML file that defines the capabilities for this extractor.

This plugin has the following capabilities:

  • about
  • catalog
  • discover
  • schema-flattening
  • state
  • stream-maps

You can override these capabilities or specify additional ones in your meltano.yml by adding the capabilities key.


The tap-kafka settings that are known to Meltano are documented below. To quickly find the setting you're looking for, click on any setting name from the list:

You can also list these settings using

meltano config
with the list subcommand:

meltano config tap-kafka list

You can override these settings or specify additional ones in your meltano.yml by adding the settings key.

Please consider adding any settings you have defined locally to this definition on MeltanoHub by making a pull request to the YAML file that defines the settings for this plugin.

Airbyte Config Messageformat Deserialization Strategy (airbyte_config.MessageFormat.deserialization_strategy)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.deserialization_strategy [value]

Airbyte Config Messageformat Deserialization Type (airbyte_config.MessageFormat.deserialization_type)



Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.deserialization_type [value]

Airbyte Config Messageformat Schema Registry Password (airbyte_config.MessageFormat.schema_registry_password)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_password [value]

Airbyte Config Messageformat Schema Registry Url (airbyte_config.MessageFormat.schema_registry_url)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_url [value]

Airbyte Config Messageformat Schema Registry Username (airbyte_config.MessageFormat.schema_registry_username)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_username [value]

Airbyte Config Auto Commit Interval Ms (airbyte_config.auto_commit_interval_ms)


The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config auto_commit_interval_ms [value]

Airbyte Config Auto Offset Reset (airbyte_config.auto_offset_reset)


What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server - earliest: automatically reset the offset to the earliest offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset is found for the consumer's group, anything else: throw exception to the consumer.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config auto_offset_reset [value]

Airbyte Config Bootstrap Servers (airbyte_config.bootstrap_servers)


A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config bootstrap_servers [value]

Airbyte Config Client Dns Lookup (airbyte_config.client_dns_lookup)


Controls how the client uses DNS lookups. If set to use_all_dns_ips, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again. If set to resolve_canonical_bootstrap_servers_only, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same as use_all_dns_ips. If set to default (deprecated), attempt to connect to the first IP address returned by the lookup, even if the lookup returns multiple IP addresses.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config client_dns_lookup [value]

Airbyte Config Client Id (airbyte_config.client_id)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_CLIENT_ID

An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config client_id [value]

Airbyte Config Enable Auto Commit (airbyte_config.enable_auto_commit)


If true, the consumer's offset will be periodically committed in the background.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config enable_auto_commit [value]

Airbyte Config Group Id (airbyte_config.group_id)

  • Environment variable: TAP_KAFKA_AIRBYTE_CONFIG_GROUP_ID

The Group ID is how you distinguish different consumer groups.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config group_id [value]

Airbyte Config Max Poll Records (airbyte_config.max_poll_records)


The maximum number of records returned in a single call to poll(). Note, that max_poll_records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config max_poll_records [value]

Airbyte Config Max Records Process (airbyte_config.max_records_process)


The Maximum to be processed per execution

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config max_records_process [value]

Airbyte Config Polling Time (airbyte_config.polling_time)


Amount of time Kafka connector should try to poll for messages.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config polling_time [value]

Airbyte Config Protocol Sasl Jaas Config (airbyte_config.protocol.sasl_jaas_config)


The JAAS login context parameters for SASL connections in the format used by JAAS configuration files., The JAAS login context parameters for SASL connections in the format used by JAAS configuration files.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.sasl_jaas_config [value]

Airbyte Config Protocol Sasl Mechanism (airbyte_config.protocol.sasl_mechanism)


PLAIN, The SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.sasl_mechanism [value]

Airbyte Config Protocol Security Protocol (airbyte_config.protocol.security_protocol)



Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config protocol.security_protocol [value]

Airbyte Config Receive Buffer Bytes (airbyte_config.receive_buffer_bytes)


The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config receive_buffer_bytes [value]

Airbyte Config Repeated Calls (airbyte_config.repeated_calls)


The number of repeated calls to poll() if no messages were received.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config repeated_calls [value]

Airbyte Config Request Timeout Ms (airbyte_config.request_timeout_ms)


The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config request_timeout_ms [value]

Airbyte Config Retry Backoff Ms (airbyte_config.retry_backoff_ms)


The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config retry_backoff_ms [value]

Airbyte Config Subscription Subscription Type (airbyte_config.subscription.subscription_type)


assign, subscribe

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.subscription_type [value]

Airbyte Config Subscription Topic Partitions (airbyte_config.subscription.topic_partitions)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.topic_partitions [value]

Airbyte Config Subscription Topic Pattern (airbyte_config.subscription.topic_pattern)

[No description provided.]

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config subscription.topic_pattern [value]

Airbyte Config Test Topic (airbyte_config.test_topic)


The Topic to test in case the Airbyte can consume messages.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_config test_topic [value]

Airbyte Spec Image (airbyte_spec.image)

  • Environment variable: TAP_KAFKA_AIRBYTE_SPEC_IMAGE
  • Default Value: airbyte/source-kafka

Airbyte image to run

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_spec image [value]

Airbyte Spec Tag (airbyte_spec.tag)

  • Environment variable: TAP_KAFKA_AIRBYTE_SPEC_TAG
  • Default Value: latest

Airbyte image tag

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set airbyte_spec tag [value]

Docker Mounts (docker_mounts)

  • Environment variable: TAP_KAFKA_DOCKER_MOUNTS

Docker mounts to make available to the Airbyte container. Expects a list of maps containing source, target, and type as is documented in the docker --mount documentation

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set docker_mounts [value]
Expand To Show SDK Settings

Flattening Enabled (flattening_enabled)

  • Environment variable: TAP_KAFKA_FLATTENING_ENABLED

'True' to enable schema flattening and automatically expand nested properties.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set flattening_enabled [value]

Flattening Max Depth (flattening_max_depth)

  • Environment variable: TAP_KAFKA_FLATTENING_MAX_DEPTH

The max depth to flatten schemas.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set flattening_max_depth [value]

Stream Map Config (stream_map_config)

  • Environment variable: TAP_KAFKA_STREAM_MAP_CONFIG

User-defined config values to be used within map expressions.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set stream_map_config [value]

Stream Maps (stream_maps)

  • Environment variable: TAP_KAFKA_STREAM_MAPS

Config object for stream maps capability. For more information check out Stream Maps.

Configure this setting directly using the following Meltano command:

meltano config tap-kafka set stream_maps [value]

Something missing?

This page is generated from a YAML file that you can contribute changes to.

Edit it on GitHub!

Looking for help?

If you're having trouble getting the tap-kafka extractor to work, read the Airbyte connector FAQ, look for an existing issue in the Airbyte repository, file a new issue, or join the Meltano Slack community and ask for help in the


meltano add extractor tap-kafka --variant airbyte

Maintenance Status

  • Maintenance Status
  • Built with the Meltano SDK
  • Based on an Airbyte Connector


  • License


  • Airbyte

Meltano Stats

  • Total Executions (Last 3 Months)
  • Projects (Last 3 Months)


  • airbyte_protocolmeltano_sdk