ePrivacy and GPDR Cookie Consent by Cookie Consent Skip to main content

Loader Kafka

Loader Kafka sends input data as messages to Kafka brokers.

The component can produce two types of message payloads

  • JSON message payloads 

  • Avro message payloads 

The component builds upon the Python package for Kafka client. Specifically C/C++ implementation of librdkafka and confluent-python module.

In terms of authentication and communication encryption the component supports the following modes of operation

  • PLAINTEXT: No authentication and no encryption

  • SSL: Authentication and broker communication encryption using SSL/TLS certificates 

  • SASL_SSL: Authentication using Kerberos (GSSAPI) with encryption using SSL/TLS certificates

  • SASL_PLAINTEXT: Authentication using Kerberos (GSSAPI) without communication encryption

Visit the relevant section of documentation as configuration parameters  might have specific meaning or behavior depending on message payload type or authentication scheme

Warning: The topic must exist prior to loading. The component does not create a target topic automatically.

Warning: The component only supports schema discovery and message production for Avro payloads.

Warning: Avro schemas are only supported for message values, not for message keys

Warning: The component was tested against the Confluent Kafka Schema Registry. Other registry implementations were not tested but may work.

Data In/Data Out

Data In

The component searches for input files in directory  /data/in/files .


The files should be in newline-delimited JSON format (*.ndjson). 


Each input JSON object found in an input file is translated into a single Kafka message.


The expected structure of the input JSON object depends on the chosen message payload format. 


For JSON message format the input JSON object is used simply as message payload. 


For Avro message format the input JSON object must provide properties required by Avro schema. Final message is then created according to the discovered schema by populating it with values from the JSON object.

Data Out N/A

Learn more: about the folder structure here.

Parameters

Connection Settings


Server(s)

(required)

List of Kafka brokers the loader should attempt initial connection with

If multiple hosts are provided the loader will attempt to establish connection in the same order as brokers are specified.

Format: host:port

Example: kafka-broker1.meiro.io:9094

Security protocol

(required)

Authentication and encryption protocol the loader should use for communication with Kafka brokers

Possible values:

  • plaintext

  • ssl

  • sasl_ssl

  • sasl_plaintext

Connection log level

Log level for diagnostic logging on Kafka connection. Value is passed to configuration property debug for librdkafka.

Possible options:

  • Default (normal level of logging)

  • Security (authentication and connection initiation messages)

  • All (full diagnostic logging,  very granular)

Broker SSL

Broker’s CA certificate

CA certificate string (PEM format) for verifying the broker's key.

Passing a directory value (for multiple CA certificates) is not supported.

Value is passed to ssl.ca.pem in librdkafka

Client’s private key

Client's private key string (PEM format) used for authentication. 

Value is passed to ssl.key.pem in librdkafka

Client’s public key

Client's public key string (PEM format) used for authentication. 

Values is passed to ssl.certificate.pem in librdkafka

Kerberos

See Troubleshooting Kerberos for common issues with Kerberos configuration


Client Principal

Kerberos client principal that will be used to authenticate with Kafka brokers.

The principal must have a matching entry in the provided keytab file. The principal must have been assigned the necessary Kafka ACL that allows writing access to selected topic.

Example: kafka_producer@TEST.CONFLUENT.IO

Service Name

Kerberos principal name that Kafka runs as, excluding /hostname@REALM

Example: kafka

Keytab (Base64)

Base64-encoded contents of Kerberos keytab file. The keytab file must contain an entry for the client's principal.

Contents will be automatically decoded and a corresponding binary keytab file will be created at runtime to be used with built-in Kafka Kerberos authentication.

To obtain the value for this parameter, run any base64 encoder on the contents of the keytab file you exported using kadmin. Then copy-paste the resulting base64 hash as the value of this parameter.

The base64 hash may contain newlines and spaces. They will be automatically removed during the decoding process.

Example: valid entry containing extra newlines which results in a valid keytab with multiple principals

BQIAAABsAAIAEVRFU1QuQ09ORkxVRU5ULklPAA5rYWZrYV9wcm9kdWNlcgAUc3RhZ2luZy5taS5t
ZWlyby5kZXYAAAABYshQYgIAEgAguT5/yyO6lRdzrMHOAnxuq3/iSmPV9Av0hREbn3CWLnQAAAAC
AAAAXAACABFURVNULkNPTkZMVUVOVC5JTwAOa2Fma2FfcHJvZHVjZXIAFHN0YWdpbmcubWkubWVpcm8uZGV2AAAAAWLIUGICABEAEDKWmM8OejBQgDN3k0gE3hUAAAAC

Kerberos Configuration (krb5.conf)

Contents of krb5.conf file. The configuration file will be passed to built-in Kerberos client-side utilities  (i.e. kinit) to perform authentication.

At the minimum, the file needs to contain correct definitions of the Kerberos realm and domain-to-realm or host-to-realm mapping entries.

Corresponding krb5.conf file will be created at runtime to be
used with built-in Kafka Kerberos authentication.

For additional documentation about what specific command is used to perform Kerberos authentication see sasl.kerberos.kinit.cmd in librdkafka

Schema Registry


Schema Registry URL

Avro schema registry URL
http(s)://kafka-registry.meiro.io:8081

Schema Registry CA certificate

CA certificate string (PEM format) for verifying the Schema Registry 's key.

Required when HTTPS is enabled.

 Input

Path (required)

Path to data files to be processed. Example: data/in/files/folder_name/file_name.ndjson

Topic (required)

Kafka topic where messages generated from the specified file(s) will be sent to.

The topic can be specified for each separate input file. If topics use different schemas, schemas will be dynamically discovered and used accordingly. 

Message key source property

Reference to property from input JSON files. The values of this property will be used to populate the generated message’s key.

If the key is configured and the input JSON object does not contain the specified property, the input object is skipped and no message is sent. Skipped JSON objects are logged.

If the source property is not configured a message is sent without a key  (i.e. key is set to null)

Example:


For input JSON object
{
“phoneNumber”: “607324567”
“firstName”: “Agatha”
“lastName” : “Christie”
}

and key source configured to phoneNumber the resulting Kafka message in JSON format will use “607324567” as message key and its payload will be
{
“phoneNumber”: “607324567”
“firstName”: “Agatha”
“lastName” : “Christie”
}

Remove key source property from message payload

Works in connection with Message key source property.

If set to true the property is stripped from message’s payload.

This property is only applicable for JSON message payloads, as in Avro mode the schema determines which input JSON properties are mapped to Avro payload.

Example:


For input JSON object
{
“phoneNumber”: “753324567”
“firstName”: “J.K.”
“lastName” : “Rowling”
}

key source configured to phoneNumber
and this flag set to true

the resulting Kafka message in JSON format will use “607324567” as message key, the phoneNumber will be removed from the payload so the final message payload will be as follows:

{
“firstName”: “Agatha”
“lastName” : “Christie”
}