Loader Kafka
Loader Kafka sends input data as messages to Kafka brokers.
The component can produce two types of message payloads
-
JSON message payloads
-
Avro message payloads
The component builds upon the Python package for Kafka client. Specifically C/C++ implementation of librdkafka and confluent-python module.
In terms of authentication and communication encryption the component supports the following modes of operation
-
PLAINTEXT: No authentication and no encryption
-
SSL: Authentication and broker communication encryption using SSL/TLS certificates
-
SASL_SSL: Authentication using Kerberos (GSSAPI) with encryption using SSL/TLS certificates
-
SASL_PLAINTEXT: Authentication using Kerberos (GSSAPI) without communication encryption
Visit the relevant section of documentation as configuration parameters might have specific meaning or behavior depending on message payload type or authentication scheme
Warning: The topic must exist prior to loading. The component does not create a target topic automatically.
Warning: The component only supports schema discovery and message production for Avro payloads.
Warning: Avro schemas are only supported for message values, not for message keys
Warning: The component was tested against the Confluent Kafka Schema Registry. Other registry implementations were not tested but may work.
Data In/Data Out
Data In |
The component searches for input files in directory The files should be in newline-delimited JSON format (*.ndjson). Each input JSON object found in an input file is translated into a single Kafka message. The expected structure of the input JSON object depends on the chosen message payload format.
For Avro message format the input JSON object must provide properties required by Avro schema. Final message is then created according to the discovered schema by populating it with values from the JSON object. |
Data Out | N/A |
Learn more: about the folder structure here.
Parameters
Connection Settings
Broker SSL
Kerberos
See Troubleshooting Kerberos for common issues with Kerberos configuration
Schema Registry
Input
Configuration for Common Scenarios
Authentication: Plaintext config
Example
Authentication: Kerberos with SSL
Example
Authentication SSL-only
Example
Message production: JSON payload
Example
Message production: Avro payload
Example
Troubleshooting Kerberos
Invalid keytab file
In order to validate that provided base64 hash decodes into a valid binary keytab file the components runs the following command on the decoded file:
klist -k -t /data/in/files/sasl_kerbers.keytab
The output of the command will list entries in the decoded keytab files. A valid keytab file should yield the following log entries:main: Writing SASL Kerberos Keytab to file: /data/in/files/sasl_kerberos.keytab
main: Running klist over the newly created keytab file:
Keytab name: FILE:/data/in/files/sasl_kerberos.keytab
KVNO Timestamp Principal -----------------------------------------------------------------------------
3 01/01/70 00:00:00 kcpappmeiro@DTI.CO.ID
In case the command encounters an invalid keytab file the logs contain the following message: klist: Unsupported key table format version number while starting keytab scan
Loader fails to contact KDC or find Kerberos realm
The loader fails with the followIng log messages kinit: Cannot find KDC for realm "TEST.CONFLUENT.IO" while getting initial credentials
or kinit: Cannot contact any KDC for realm 'TEST.CONFLUENT.IO' while getting initial credentials
.
In this case you likely provided invalid krb5.conf configuration. The kinit command that librdkafka is using to obtain Kerberos ticket is failing. Make sure you provided valid Kerberos Configuration, specifically 'default_realm', 'realms' and 'domain_realm' entries.
Loader fails during Kerberos authentication or authorization
Make sure you configured the correct client principal, keytab and krb5.conf. Specifically:
-
Investigate component’s log to see whether the provided keytab was correctly parsed and whether corresponding entry for client’s principal is present in the keytab
-
Set Connection Log Level to security or all. These log levels log granular interactions while connection is getting established. Specifically, requests and responses from Kerberos servers are all logged. If SSL encryption is chosen, you can also observe a switch to SSL-encrypted traffic after Kerberos authentication succeeds.
-
If authentication is successful but the client fails to send a message make sure the Kerberos client principal has correct ACLs configured.
Example of granting ACLs for Kerberos principalmeiro_writer
that allow the principal to write to all available Kafka topics. Command output which list additional privileges is provided as well
kafka-acls --bootstrap-server broker:9092 --command-config /etc/kafka/command.properties --add --allow-principal User:meiro_writer --producer --topic=*"
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
(principal=User:meiro_writer, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:meiro_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:meiro_writer, host=*, operation=CREATE, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`:
(principal=User:kafka_consumer, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:kafka_producer, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:meiro_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:connect, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:connect, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:connect, host=*, operation=READ, permissionType=ALLOW)
(principal=User:kafka_producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:meiro_writer, host=*, operation=CREATE, permissionType=ALLOW)
(principal=User:kafka_producer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:kafka_consumer, host=*, operation=READ, permissionType=ALLOW)
(principal=User:meiro_writer, host=*, operation=DESCRIBE, permissionType=ALLOW)
(principal=User:connect, host=*, operation=DESCRIBE, permissionType=ALLOW)
Troubleshooting Kerberos credentials with 'kinit'
The underlying librdkfka
library uses regular kinit
to obtain Kerberos tickets. The kinit
command is executed automatically by librdkafka
when configured to authenticate with Kerberos.
In case of problems you can therefore perform quick validation of your Kerberos configuration using regular kinit on any machine with Kerberos utilities installed by running kinit -R -t "/path/to/your/keytab.file" -k principal@REALM.COM
then inspecting the resulting ticket cache with klist
.
The kinit
command is executed inside the running container with Kafka Loader component. The kinit
is preinstalled as a dependency when we build the container for the component.
Invalid Kerberos credentials will result in the following message:
meiro@kafka-meiro# kinit -k -t /var/lib/secret/kafka-client.keytab kafka_producer && klist
kinit: Password incorrect while getting initial credentials
Valid credentials will result in the contents of the ticket cache getting printed:
meiro@kafka-meiro# kinit -k -t /var/lib/secret/kafka-client.keytab kafka_consumer && klist
Ticket cache: FILE:/tmp/krb5cc_1000
Default principal: kafka_consumer@TEST.CONFLUENT.IO
Valid starting Expires Service principal
07/11/22 19:48:48 07/12/22 19:48:48 krbtgt/TEST.CONFLUENT.IO@TEST.CONFLUENT.IO
renew until 07/18/22 19:48:48