ePrivacy and GPDR Cookie Consent by Cookie Consent Skip to main content

Loader Kafka

Kafka is an open-source distributed event streaming platform designed to handle real-time data feeds and provide a scalable, fault-tolerant infrastructure for building data pipelines and streaming applications. t allows producers to publish streams of records and consumers to subscribe to those streams, facilitating the efficient transfer of data between applications and services.

Business value in CDP

In the context of CDP, Kafka can be a valuable loader for distributing customer data across various systems.  This is particularly valuable for use cases such as personalized marketing, analytics, and business intelligence, where up-to-date and accurate customer information is essential for informed decision-making.


Steps to set up Kafka

Follow this step-by-step guide to configure the component:

  1. The first step is to set up the loader within Meiro Integration by following the instructions given in this article.
  2. Set up the loader as a destination in CDP.
  3. Continue with exporting your segmented customer profiles.

Setting up the loader in MI

Loader Kafka sends input data as messages to Kafka brokers.

The component can produce two types of message payloads

  • JSON message payloads 

  • Avro message payloads 

The component builds upon the Python package for Kafka client. Specifically C/C++ implementation of librdkafka and confluent-python module.

In terms of authentication and communication encryption the component supports the following modes of operation

  • PLAINTEXT: No authentication and no encryption

  • SASL_SSL: Authentication using Kerberos (GSSAPI) with encryption using SSL/TLS certificates

  • SASL_PLAINTEXT: Authentication using Kerberos (GSSAPI) without communication encryption

Visit the relevant section of documentation as configuration parameters might have specific meaning or behavior depending on the message payload type or authentication scheme.

Known Limitations

  • The topic must exist prior to loading. The component does not create a target topic automatically.
  • The component only supports schema discovery and message production for Avro payloads.
  • Avro schemas are only supported for message values, not for message keys
  • The component was tested against the Confluent Kafka Schema Registry. Other registry implementations were not tested but may work.

Data In/Data Out

Data In

The component searches for input files in the directory  /data/in/files.


The files should be in newline-delimited JSON format (*.ndjson). 


Each input JSON object found in an input file is translated into a single Kafka message.


The expected structure of the input JSON object depends on the chosen message payload format. 


For JSON message format the input JSON object is used simply as a message payload. 


For Avro message format the input JSON object must provide properties required by Avro schema. Final message is then created according to the discovered schema by populating it with values from the JSON object.

Data Out N/A

Learn more: about the folder structure here.

Parameters

Connection Settings


Server(s)

(required)

List of Kafka brokers the loader should attempt initial connection with

If multiple hosts are provided the loader will attempt to establish connection in the same order as brokers are specified.

Format: host:port

Example: kafka-broker1.meiro.io:9094

Security protocol

(required)

Authentication and encryption protocol the loader should use for communication with Kafka brokers

Possible values:

  • plaintext

  • sasl_ssl

  • sasl_plaintext

SASL Mechanism

SASL mechanism to use for authentication.

Possible values:

  • PLAIN

  • GSSAPI

  • OAUTHBEARER

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Connection log level

Log level for diagnostic logging on Kafka connection. Value is passed to configuration property debug for librdkafka.

Possible options:

  • Default (normal level of logging)

  • Security (authentication and connection initiation messages)

  • All (full diagnostic logging,  very granular)

Broker SSL

Broker’s CA certificate

CA certificate string (PEM format) for verifying the broker's key.

Passing a directory value (for multiple CA certificates) is not supported.

Value is passed to ssl.ca.pem in librdkafka

Client’s private key

Client's private key string (PEM format) used for authentication. 

Value is passed to ssl.key.pem in librdkafka

Client’s public key

Client's public key string (PEM format) used for authentication. 

Values is passed to ssl.certificate.pem in librdkafka

Broker SASL


Username

SASL username for SASL PLAIN authentication mechanism for Kafka Broker

Password

SASL password for SASL PLAIN authentication mechanism for Kafka Broker

Kerberos

See Troubleshooting Kerberos for common issues with Kerberos configuration


Client Principal

Kerberos client principal that will be used to authenticate with Kafka brokers.

The principal must have a matching entry in the provided keytab file. The principal must have been assigned the necessary Kafka ACL that allows writing access to selected topic.

Example: kafka_producer@TEST.CONFLUENT.IO

Service Name

Kerberos principal name that Kafka runs as, excluding /hostname@REALM

Example: kafka

Keytab (Base64)

Base64-encoded contents of Kerberos keytab file. The keytab file must contain an entry for the client's principal.

Contents will be automatically decoded and a corresponding binary keytab file will be created at runtime to be used with built-in Kafka Kerberos authentication.

To obtain the value for this parameter, run any base64 encoder on the contents of the keytab file you exported using kadmin. Then copy-paste the resulting base64 hash as the value of this parameter.

The base64 hash may contain newlines and spaces. They will be automatically removed during the decoding process.

Example: valid entry containing extra newlines which results in a valid keytab with multiple principals

BQIAAABsAAIAEVRFU1QuQ09ORkxVRU5ULklPAA5rYWZrYV9wcm9kdWNlcgAUc3RhZ2luZy5taS5t
ZWlyby5kZXYAAAABYshQYgIAEgAguT5/yyO6lRdzrMHOAnxuq3/iSmPV9Av0hREbn3CWLnQAAAAC
AAAAXAACABFURVNULkNPTkZMVUVOVC5JTwAOa2Fma2FfcHJvZHVjZXIAFHN0YWdpbmcubWkubWVpcm8uZGV2AAAAAWLIUGICABEAEDKWmM8OejBQgDN3k0gE3hUAAAAC

Kerberos Configuration (krb5.conf)

Contents of krb5.conf file. The configuration file will be passed to built-in Kerberos client-side utilities  (i.e. kinit) to perform authentication.

At the minimum, the file needs to contain correct definitions of the Kerberos realm and domain-to-realm or host-to-realm mapping entries.

Corresponding krb5.conf file will be created at runtime to be
used with built-in Kafka Kerberos authentication.

For additional documentation about what specific command is used to perform Kerberos authentication see sasl.kerberos.kinit.cmd in librdkafka

Schema Registry


SASL username

SASL Username for SASL PLAIN authentication mechanism for Schema Registry

SASL password

SASL Password for SASL PLAIN authentication mechanism for Schema Registry

Schema Registry URL

Avro schema registry URL
http(s)://kafka-registry.meiro.io:8081

Schema Registry CA certificate

CA certificate string (PEM format) for verifying the Schema Registry's key.

Required when HTTPS is enabled.

 Input

Path (required)

Path to data files to be processed. Example: file_name.ndjson or leave *.ndjson by default.

Topic (required)

Kafka topic where messages generated from the specified file(s) will be sent to.

The topic can be specified for each separate input file. If topics use different schemas, schemas will be dynamically discovered and used accordingly. 

Message key source property

Reference to property from input JSON files. The values of this property will be used to populate the generated message’s key.

If the key is configured and the input JSON object does not contain the specified property, the input object is skipped and no message is sent. Skipped JSON objects are logged.

If the source property is not configured a message is sent without a key  (i.e. key is set to null)

Example: For input JSON object

{
 “phoneNumber”: “607324567”
 “firstName”: “Agatha”
 “lastName” : “Christie”
}

and key source configured to phoneNumber the resulting Kafka message in JSON format will use 607324567 as message key and its payload will be

{
 “phoneNumber”: “607324567”
 “firstName”: “Agatha”
 “lastName” : “Christie”
}

Remove key source property from message payload

Works in connection with Message key source property.

If set to true the property is stripped from message’s payload.

This property is only applicable for JSON message payloads, as in Avro mode the schema determines which input JSON properties are mapped to Avro payload.

Example: For input JSON object

{
 “phoneNumber”: “607324567”
 “firstName”: “Agatha”
 “lastName” : “Christie”
}

key source configured to phoneNumber
and this flag set to true

the resulting Kafka message in JSON format will use 607324567 as message key, the phoneNumber will be removed from the payload so the final message payload will be as follows:

{
 “firstName”: “Agatha”
 “lastName” : “Christie”
}

Configuration for Common  Scenarios

Authentication: Plaintext config

Example

Authentication: Kerberos with SSL

Example


Message production: JSON payload

Example

Message production: Avro payload

Example

Troubleshooting Kerberos

Invalid keytab file

In order to validate that provided base64 hash decodes into a valid binary keytab file the components runs the following command on the decoded file:

klist -k -t /data/in/files/sasl_kerbers.keytab

The output of the command will list entries in the decoded keytab files. A valid keytab file should yield the following log entries:

main: Writing SASL Kerberos Keytab to file: /data/in/files/sasl_kerberos.keytab 
main: Running klist over the newly created keytab file: 
Keytab name: FILE:/data/in/files/sasl_kerberos.keytab 

KVNO Timestamp Principal 
----------------------------------------------------------------------------- 
3 01/01/70 00:00:00 meiro_writer@TEST.CONFLUENT.IO

In case the command encounters an invalid keytab file the logs contain the following message: klist: Unsupported key table format version number while starting keytab scan

Loader fails to contact KDC or find Kerberos realm

The loader fails with the followIng log messages kinit: Cannot find KDC for realm "TEST.CONFLUENT.IO" while getting initial credentials or kinit: Cannot contact any KDC for realm 'TEST.CONFLUENT.IO' while getting initial credentials.

In this case you likely provided invalid krb5.conf configuration. The kinit command that librdkafka is using to obtain Kerberos ticket is failing. Make sure you provided valid Kerberos Configuration, specifically 'default_realm', 'realms' and 'domain_realm' entries.

Loader fails during Kerberos authentication or authorization

Make sure you configured the correct client principal, keytab and krb5.conf. Specifically:

  • Investigate component’s log to see whether the provided keytab was correctly parsed and whether corresponding entry for client’s principal is present in the keytab

  • Set Connection Log Level to security or all. These log levels log granular interactions while connection is getting established. Specifically, requests and responses from Kerberos servers are all logged. If SSL encryption is chosen, you can also observe a switch to SSL-encrypted traffic after Kerberos authentication succeeds.

  • If authentication is successful but the client fails to send a message make sure the Kerberos client principal has correct ACLs configured.

    Example of granting ACLs for Kerberos principal meiro_writer that allow the principal to write to all available Kafka topics. Command output which list additional privileges is provided as well

kafka-acls --bootstrap-server broker:9092 --command-config /etc/kafka/command.properties --add 
--allow-principal User:meiro_writer --producer --topic=*
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`: 
 	(principal=User:meiro_writer, host=*, operation=DESCRIBE, permissionType=ALLOW)
	(principal=User:meiro_writer, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:meiro_writer, host=*, operation=CREATE, permissionType=ALLOW) 

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`: 
 	(principal=User:kafka_consumer, host=*, operation=DESCRIBE, permissionType=ALLOW)
	(principal=User:kafka_producer, host=*, operation=CREATE, permissionType=ALLOW)
	(principal=User:meiro_writer, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:connect, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:connect, host=*, operation=CREATE, permissionType=ALLOW)
	(principal=User:connect, host=*, operation=READ, permissionType=ALLOW)
	(principal=User:kafka_producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
	(principal=User:meiro_writer, host=*, operation=CREATE, permissionType=ALLOW)
	(principal=User:kafka_producer, host=*, operation=WRITE, permissionType=ALLOW)
	(principal=User:kafka_consumer, host=*, operation=READ, permissionType=ALLOW)
	(principal=User:meiro_writer, host=*, operation=DESCRIBE, permissionType=ALLOW)
	(principal=User:connect, host=*, operation=DESCRIBE, permissionType=ALLOW)

Troubleshooting Kerberos credentials with 'kinit'

The underlying librdkfka library uses regular kinit to obtain Kerberos tickets. The kinit command is executed automatically by librdkafka when configured to authenticate with Kerberos. 

In case of problems you can therefore perform quick validation of your Kerberos configuration using regular kinit on any machine with Kerberos utilities installed by running kinit -R -t "/path/to/your/keytab.file" -k principal@REALM.COM then inspecting the resulting ticket cache with klist.

The kinit command is executed inside the running container with Kafka Loader component. The kinit is preinstalled as a dependency when we build the container for the component.

Invalid Kerberos credentials will result in the following message:

meiro@kafka-meiro# kinit -k -t  /var/lib/secret/kafka-client.keytab kafka_producer && klist
kinit: Password incorrect while getting initial credentials

Valid credentials will result in the contents of the ticket cache getting printed:

meiro@kafka-meiro# kinit -k -t  /var/lib/secret/kafka-client.keytab kafka_consumer && klist
Ticket cache: FILE:/tmp/krb5cc_1000
Default principal: kafka_consumer@TEST.CONFLUENT.IO
Valid starting     Expires            Service principal
07/11/22 19:48:48  07/12/22 19:48:48  krbtgt/TEST.CONFLUENT.IO@TEST.CONFLUENT.IO
	renew until 07/18/22 19:48:48