The Kafka Transport Connectivity Plug-in

About the Kafka transport

Info
The Kafka transport is deprecated and will be removed in a future release.

Kafka is a distributed streaming platform. See https://kafka.apache.org/ for detailed information.

Apama provides a connectivity plug-in, the Kafka transport, which can be used to communicate with the Kafka distributed streaming platform. Kafka messages can be transformed to and from Apama events by listening for and sending events to channels such as prefix:topic (where the prefix is configurable).

You configure the Kafka connectivity plug-in by editing the files that come with the Kafka bundle. The properties file defines the substitution variables that are used in the YAML configuration file which also comes with the bundle. See Adding the Kafka connectivity plug-in to a project for further information.

Info
In addition to using Apama Plugin for Eclipse to add bundles, you can also do this using the apama_project command-line tool. See Creating and managing an Apama project from the command line for more information.

This transport provides a dynamic chain manager which creates chains automatically when EPL subscribes or sends to a correlator channel with the configured prefix, typically kafka:. For the Kafka transport, there must be exactly one chain definition provided in the dynamicChains section of the YAML configuration file.

For more information on YAML configuration files, see Using connectivity plug-ins and especially Configuration file for connectivity plug-ins.

Info
The Kafka connectivity plug-in does not support reliable messaging.

Loading the Kafka transport

You can load the Kafka transport by adding the Kafka connectivity bundle to your project in Apama Plugin for Eclipse (see Adding the Kafka connectivity plug-in to a project). Alternatively, you can load the transport with the following connectivityPlugins stanza in your YAML configuration file:

kafkaTransport:
  classpath: ${APAMA_HOME}/lib/kafka-clients.jar;${APAMA_HOME}/lib/connectivity-kafka.jar
  class: com.apama.kafka.ChainManager

Configuring the connection to Kafka (dynamicChainManagers)

You configure one or more dynamicChainManagers to connect to different Kafka brokers. For example:

dynamicChainManagers:
  kafkaManager:
    transport: kafkaTransport
    managerConfig:
      channelPrefix: "kafka:"
      bootstrap.servers: "localhost:9092"

Connection-related configuration is specified in the managerConfig stanza on the dynamicChainManagers instance. The following configuration options are available for managerConfig:

Configuration option

Description

bootstrap.servers

This is the only option in the Kafka configuration for which you must specify a value. You can either set it under managerConfig (to be used as the default for all consumers and producers) or in the configuration of a specific consumer or producer (which overrides any default given in the parent chain manager). When this option is set under managerConfig, it is used as the default. It needs to be enclosed in quotation marks. Example:

bootstrap.servers: "localhost:62618"

Type: string.

channelPrefix

Prefix for dynamic mapping. If the prefix ends with a colon (:), it needs to be enclosed in quotation marks (see also Using YAML configuration files). When the channel is mapped to a Kafka topic, the prefix is not used. For example, if the prefix is "kafka:", then the channel kafka:test/a maps to the Kafka topic test/a.

Type: string.

Default: "kafka:".

consumerConfig

Keys and values of the consumer configuration options in Kafka. See the Kafka documentation at https://kafka.apache.org/documentation/ for detailed information on the consumer configs. Some default values are provided by the Kafka transport, but you can override them by specifying different values.

The default values are:

  • group.id: A unique identifier for every instance.
  • session.timeout.ms: "30000"
  • key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
  • value.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"

Type: map.

producerConfig

Keys and values of the producer configuration options in Kafka. See the Kafka documentation at https://kafka.apache.org/documentation/ for detailed information on the producer configs. Some default values are provided by the Kafka transport, but you can override them by specifying different values.

The default values are:

  • linger.ms: 0
  • key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
  • value.serializer: "org.apache.kafka.common.serialization.StringSerializer"

Type: map.

Kafka allows clients to connect over SSL. You use the consumerConfig and producerConfig configuration options of the Kafka transport to specify the SSL configuration. See the Kafka documentation at https://kafka.apache.org/ for detailed information on how to configure Kafka clients to use SSL.

Configuring message transformations (dynamicChains)

You configure exactly one dynamicChains section to handle transforming messages from the Kafka broker into the correlator, and vice versa. For example:

dynamicChains:
  kafkaChain:
    - apama.eventMap:
        defaultEventType: Evt
        suppressLoopback: true
    - jsonCodec
    - kafkaTransport

We recommend use of the suppressLoopback configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.

Payload for the Kafka message

As with all other transports, the translation between EPL events and Kafka payloads is based on the choice of host plug-in and codecs. See Host plug-ins and configuration and Codec Connectivity Plug-ins for further information.

The default payload for the Kafka message is a string (with conversion from the underlying bytes using the classes StringDeserializer and StringSerializer from the org.apache.kafka.common.serialization package).

The following is a simple example of a YAML configuration file where the payload of the Kafka message will be the string form of the Apama event:

dynamicChainManagers:
  kafkaManager:
    transport: kafkaTransport
    managerConfig:
      bootstrap.servers: "localhost:9092"

dynamicChains:
  myChain:
    - apama.eventString:
    - kafkaTransport:

You can configure alternative serializers and deserializers using the consumerConfig and producerConfig options of the Kafka connectivity plug-in (see also Configuring the connection to Kafka (dynamicChainManagers) ). You can use a third-party serializer/deserializer implementation or you can create your own. You just need to include the relevant classes in the same classpath of the Kafka plug-in itself so that it can locate them. See the Kafka documentation for more information about Kafka serializers and deserializers. Additional transformations (for example, from a string containing JSON to a map) can be performed after the Kafka transport using connectivity codec plug-ins.

Metadata for the Kafka message

Messages going from/to the transport have useful pieces of information inserted into their metadata. This information is stored as a map associated with the kafka key. This map contains the following information:

Field Description
key Contains the Kafka record key. This works in both directions. If a message from Kafka has a key, then the metadata will contain it. If a message that is being sent to Kafka has the key in the metadata, then the Kafka record key will be set with it.