About the Kafka transport
Kafka is a distributed streaming platform. See https://kafka.apache.org/ for detailed information.
Apama provides a connectivity plug-in, the Kafka transport, which can be used to communicate with the Kafka distributed streaming platform. Kafka messages can be transformed to and from Apama events by listening for and sending events to channels such as prefix:topic
(where the prefix is configurable).
You configure the Kafka connectivity plug-in by editing the files that come with the Kafka bundle. The properties file defines the substitution variables that are used in the YAML configuration file which also comes with the bundle. See Adding the Kafka connectivity plug-in to a project for further information.
apama_project
command-line tool. See Creating and managing an Apama project from the command line for more information.This transport provides a dynamic chain manager which creates chains automatically when EPL subscribes or sends to a correlator channel with the configured prefix, typically kafka:
. For the Kafka transport, there must be exactly one chain definition provided in the dynamicChains
section of the YAML configuration file.
For more information on YAML configuration files, see Using connectivity plug-ins and especially Configuration file for connectivity plug-ins.
Loading the Kafka transport
You can load the Kafka transport by adding the Kafka connectivity bundle to your project in Apama Plugin for Eclipse (see Adding the Kafka connectivity plug-in to a project). Alternatively, you can load the transport with the following connectivityPlugins
stanza in your YAML configuration file:
kafkaTransport:
classpath: ${APAMA_HOME}/lib/kafka-clients.jar;${APAMA_HOME}/lib/connectivity-kafka.jar
class: com.apama.kafka.ChainManager
Configuring the connection to Kafka (dynamicChainManagers)
You configure one or more dynamicChainManagers
to connect to different Kafka brokers. For example:
dynamicChainManagers:
kafkaManager:
transport: kafkaTransport
managerConfig:
channelPrefix: "kafka:"
bootstrap.servers: "localhost:9092"
Connection-related configuration is specified in the managerConfig
stanza on the dynamicChainManagers
instance. The following configuration options are available for managerConfig
:
Configuration option |
Description |
---|---|
|
This is the only option in the Kafka configuration for which you must specify a value. You can either set it under
Type: |
|
Prefix for dynamic mapping. If the prefix ends with a colon (:), it needs to be enclosed in quotation marks (see also Using YAML configuration files). When the channel is mapped to a Kafka topic, the prefix is not used. For example, if the prefix is Type: Default: |
|
Keys and values of the consumer configuration options in Kafka. See the Kafka documentation at https://kafka.apache.org/documentation/ for detailed information on the consumer configs. Some default values are provided by the Kafka transport, but you can override them by specifying different values. The default values are:
Type: |
|
Keys and values of the producer configuration options in Kafka. See the Kafka documentation at https://kafka.apache.org/documentation/ for detailed information on the producer configs. Some default values are provided by the Kafka transport, but you can override them by specifying different values. The default values are:
Type: |
Kafka allows clients to connect over SSL. You use the consumerConfig
and producerConfig
configuration options of the Kafka transport to specify the SSL configuration. See the Kafka documentation at https://kafka.apache.org/ for detailed information on how to configure Kafka clients to use SSL.
Configuring message transformations (dynamicChains)
You configure exactly one dynamicChains
section to handle transforming messages from the Kafka broker into the correlator, and vice versa. For example:
dynamicChains:
kafkaChain:
- apama.eventMap:
defaultEventType: Evt
suppressLoopback: true
- jsonCodec
- kafkaTransport
We recommend use of the suppressLoopback
configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.
Payload for the Kafka message
As with all other transports, the translation between EPL events and Kafka payloads is based on the choice of host plug-in and codecs. See Host plug-ins and configuration and Codec Connectivity Plug-ins for further information.
The default payload for the Kafka message is a string (with conversion from the underlying bytes using the classes StringDeserializer
and StringSerializer
from the org.apache.kafka.common.serialization
package).
The following is a simple example of a YAML configuration file where the payload of the Kafka message will be the string form of the Apama event:
dynamicChainManagers:
kafkaManager:
transport: kafkaTransport
managerConfig:
bootstrap.servers: "localhost:9092"
dynamicChains:
myChain:
- apama.eventString:
- kafkaTransport:
You can configure alternative serializers and deserializers using the consumerConfig
and producerConfig
options of the Kafka connectivity plug-in (see also Configuring the connection to Kafka (dynamicChainManagers) ). You can use a third-party serializer/deserializer implementation or you can create your own. You just need to include the relevant classes in the same classpath of the Kafka plug-in itself so that it can locate them. See the Kafka documentation for more information about Kafka serializers and deserializers. Additional transformations (for example, from a string containing JSON to a map) can be performed after the Kafka transport using connectivity codec plug-ins.
Metadata for the Kafka message
Messages going from/to the transport have useful pieces of information inserted into their metadata. This information is stored as a map associated with the kafka
key. This map contains the following information:
Field | Description |
---|---|
key |
Contains the Kafka record key. This works in both directions. If a message from Kafka has a key, then the metadata will contain it. If a message that is being sent to Kafka has the key in the metadata, then the Kafka record key will be set with it. |