Loader Kafka
Loader Kafka sends input data as messages to Kafka brokers.
The component can produce two types of message payloads
-
JSON message payloads
-
Avro message payloads
The component builds upon the Python package for Kafka client. Specifically C/C++ implementation of librdkafka and confluent-python module.
In terms of authentication and communication encryption the component supports the following modes of operation
-
PLAINTEXT: No authentication and no encryption
-
SSL: Authentication and broker communication encryption using SSL/TLS certificates
-
SASL_SSL: Authentication using Kerberos (GSSAPI) with encryption using SSL/TLS certificates
-
SASL_PLAINTEXT: Authentication using Kerberos (GSSAPI) without communication encryption
Visit the relevant section of documentation as configuration parameters might have specific meaning or behavior depending on message payload type or authentication scheme
Warning: The topic must exist prior to loading. The component does not create a target topic automatically.
Warning: The component only supports schema discovery and message production for Avro payloads.
Warning: Avro schemas are only supported for message values, not for message keys
Warning: The component was tested against the Confluent Kafka Schema Registry. Other registry implementations were not tested but may work.
Data In/Data Out
Data In |
The component searches for input files in directory /data/in/files .
The files should be in newline-delimited JSON format (*.ndjson).
Each input JSON object found in an input file is translated into a single Kafka message.
The expected structure of the input JSON object depends on the chosen message payload format.
For JSON message format the input JSON object is used simply as message payload.
For Avro message format the input JSON object must provide properties required by Avro schema. Final message is then created according to the discovered schema by populating it with values from the JSON object.
|
Data Out |
N/A |
Learn more: about the folder structure here.
Parameters
Connection Settings
Server(s)
(required)
|
List of Kafka brokers the loader should attempt initial connection with
If multiple hosts are provided the loader will attempt to establish connection in the same order as brokers are specified.
Format: host:port
Example: kafka-broker1.meiro.io:9094
|
Security protocol
(required)
|
Authentication and encryption protocol the loader should use for communication with Kafka brokers
Possible values:
-
plaintext
-
ssl
-
sasl_ssl
-
sasl_plaintext
|
Connection log level
|
Log level for diagnostic logging on Kafka connection. Value is passed to configuration property debug for librdkafka.
Possible options:
-
Default (normal level of logging)
-
Security (authentication and connection initiation messages)
-
All (full diagnostic logging, very granular)
|
Broker SSL
Broker’s CA certificate
|
CA certificate string (PEM format) for verifying the broker's key.
Passing a directory value (for multiple CA certificates) is not supported.
Value is passed to ssl.ca.pem in librdkafka
|
Client’s private key
|
Client's private key string (PEM format) used for authentication.
Value is passed to ssl.key.pem in librdkafka
|
Client’s public key
|
Client's public key string (PEM format) used for authentication.
Values is passed to ssl.certificate.pem in librdkafka
|
Kerberos
See Troubleshooting Kerberos for common issues with Kerberos configuration
Client Principal
|
Kerberos client principal that will be used to authenticate with Kafka brokers.
The principal must have a matching entry in the provided keytab file. The principal must have been assigned the necessary Kafka ACL that allows writing access to selected topic.
Example: kafka_producer@TEST.CONFLUENT.IO
|
Service Name
|
Kerberos principal name that Kafka runs as, excluding /hostname@REALM
Example: kafka
|
Keytab (Base64)
|
Base64-encoded contents of Kerberos keytab file. The keytab file must contain an entry for the client's principal.
Contents will be automatically decoded and a corresponding binary keytab file will be created at runtime to be used with built-in Kafka Kerberos authentication.
To obtain the value for this parameter, run any base64 encoder on the contents of the keytab file you exported using kadmin . Then copy-paste the resulting base64 hash as the value of this parameter.
The base64 hash may contain newlines and spaces. They will be automatically removed during the decoding process.
Example: valid entry containing extra newlines which results in a valid keytab with multiple principals
BQIAAABsAAIAEVRFU1QuQ09ORkxVRU5ULklPAA5rYWZrYV9wcm9kdWNlcgAUc3RhZ2luZy5taS5t ZWlyby5kZXYAAAABYshQYgIAEgAguT5/yyO6lRdzrMHOAnxuq3/iSmPV9Av0hREbn3CWLnQAAAAC AAAAXAACABFURVNULkNPTkZMVUVOVC5JTwAOa2Fma2FfcHJvZHVjZXIAFHN0YWdpbmcubWkubWVpcm8uZGV2AAAAAWLIUGICABEAEDKWmM8OejBQgDN3k0gE3hUAAAAC
|
Kerberos Configuration (krb5.conf)
|
Contents of krb5.conf file. The configuration file will be passed to built-in Kerberos client-side utilities (i.e. kinit ) to perform authentication.
At the minimum, the file needs to contain correct definitions of the Kerberos realm and domain-to-realm or host-to-realm mapping entries.
Corresponding krb5.conf file will be created at runtime to be used with built-in Kafka Kerberos authentication.
For additional documentation about what specific command is used to perform Kerberos authentication see sasl.kerberos.kinit.cmd in librdkafka
|
Schema Registry
Schema Registry URL
|
Avro schema registry URL
http(s)://kafka-registry.meiro.io:8081
|
Schema Registry CA certificate
|
CA certificate string (PEM format) for verifying the Schema Registry's key.
Required when HTTPS is enabled.
|
Input
Path (required)
|
Path to data files to be processed. Example: data/in/files/folder_name/file_name.ndjson
|
Topic (required)
|
Kafka topic where messages generated from the specified file(s) will be sent to.
The topic can be specified for each separate input file. If topics use different schemas, schemas will be dynamically discovered and used accordingly.
|
Message key source property
|
Reference to property from input JSON files. The values of this property will be used to populate the generated message’s key.
If the key is configured and the input JSON object does not contain the specified property, the input object is skipped and no message is sent. Skipped JSON objects are logged.
If the source property is not configured a message is sent without a key (i.e. key is set to null )
Example: For input JSON object
{
“phoneNumber”: “607324567”
“firstName”: “Agatha”
“lastName” : “Christie”
}
and key source configured to phoneNumber the resulting Kafka message in JSON format will use 607324567 as message key and its payload will be
{
“phoneNumber”: “607324567”
“firstName”: “Agatha”
“lastName” : “Christie”
}
|
Remove key source property from message payload
|
Works in connection with Message key source property.
If set to true the property is stripped from message’s payload.
This property is only applicable for JSON message payloads, as in Avro mode the schema determines which input JSON properties are mapped to Avro payload.
Example: For input JSON object
{
“phoneNumber”: “607324567”
“firstName”: “J.K.”
“lastName” : “Rowling”
}
key source configured to phoneNumber and this flag set to true
the resulting Kafka message in JSON format will use 607324567 as message key, the phoneNumber will be removed from the payload so the final message payload will be as follows:
{
“firstName”: “Agatha”
“lastName” : “Christie”
}
|
Configuration for Common Scenarios
Authentication: Plaintext config
Example
Authentication: Kerberos with SSL
Example