Kafka SSLProtocol¶
KafkaSSLProtocol
¶
Bases: KafkaProtocol, GenericOptions
Connect to Kafka using SSL or SASL_SSL security protocols.
For more details see:
Kafka Documentation <https://kafka.apache.org/documentation/#producerconfigs_ssl.keystore.location>_IBM Documentation <https://www.ibm.com/docs/en/cloud-paks/cp-biz-automation/19.0.x?topic=fcee-kafka-using-ssl-kerberos-authentication>_How to use PEM Certificates with Kafka <https://codingharbour.com/apache-kafka/using-pem-certificates-with-apache-kafka/>_
.. versionadded:: 0.9.0
Examples:
.. tabs::
.. tab:: TLS (verify only server public certificate)
Pass PEM certificate as files located on Spark driver host:
.. code:: python
from pathlib import Path
# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
truststore_type="PEM",
truststore_certificates=Path("/path/to/server.crt").read_text(),
)
Pass PEM certificate as raw string:
.. code:: python
protocol = Kafka.SSLProtocol(
truststore_type="PEM",
truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)
.. tab:: mTLS (mutual certificate check of client and server)
Pass PEM key and certificates as files located on Spark driver host:
.. code:: python
from pathlib import Path
# Just read existing files located on host, and pass key and certificates as strings
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_certificate_chain=Path("path/to/user.crt").read_text(),
keystore_key=Path("path/to/user.key").read_text(),
truststore_type="PEM",
truststore_certificates=Path("/path/to/server.crt").read_text(),
)
Pass PEM key and certificates as raw strings:
.. code:: python
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_certificate_chain="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
keystore_key="-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
truststore_type="PEM",
truststore_certificates="-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
)
.. tab:: Custom Kafka client options
.. code:: python
protocol = Kafka.SSLProtocol.parse(
{
# Just the same options as above, but using Kafka config naming with dots
"ssl.keystore.type": "PEM",
"ssl.keystore.certificate_chain": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
"ssl.keystore.key": "-----BEGIN PRIVATE KEY...\n...END PRIVATE KEY-----",
"ssl.truststore.type": "PEM",
"ssl.truststore.certificates": "-----BEGIN CERTIFICATE...\n...END CERTIFICATE-----",
# Any option starting from "ssl." is passed to Kafka client as-is
"ssl.protocol": "TLSv1.3",
}
)
.. dropdown:: Not recommended
These options are error-prone and have several drawbacks, so it is not recommended to use them.
Passing PEM certificates as files:
* ENCRYPT ``user.key`` file with password ``"some password"`` `using
PKCS#8 scheme <https://www.mkssoftware.com/docs/man1/openssl_pkcs8.1.asp>`_.
* Save encrypted key to file ``/path/to/user/encrypted_key_with_certificate_chain.pem``.
* Then append user certificate to the end of this file.
* Deploy this file (and server certificate too) to **EVERY** host Spark could run (both driver and executors).
* Then pass file locations and password for key decryption to options below.
.. code:: python
protocol = Kafka.SSLProtocol(
keystore_type="PEM",
keystore_location="/path/to/user/encrypted_key_with_certificate_chain.pem",
key_password="some password",
truststore_type="PEM",
truststore_location="/path/to/server.crt",
)
Passing JKS (Java Key Store) location:
* `Add user key and certificate to JKS keystore <https://stackoverflow.com/a/4326346>`_.
* `Add server certificate to JKS truststore <https://stackoverflow.com/a/373307>`_.
* This should be done on **EVERY** host Spark could run (both driver and executors).
* Pass keystore and truststore paths to options below, as well as passwords for accessing these stores:
.. code:: python
protocol = Kafka.SSLProtocol(
keystore_type="JKS",
keystore_location="/usr/lib/jvm/default/lib/security/keystore.jks",
keystore_password="changeit",
truststore_type="JKS",
truststore_location="/usr/lib/jvm/default/lib/security/truststore.jks",
truststore_password="changeit",
)
cleanup(kafka)
¶
This method is called while closing Kafka connection.
Implement it to cleanup resources like temporary files.
Parameters:
-
kafka(:obj:Kafka <onetl.connection.db_connection.kafka.connection.Kafka>) –Connection instance
get_options(kafka)
¶
Get options for Kafka connection
Parameters:
-
kafka(:obj:Kafka <onetl.connection.db_connection.kafka.connection.Kafka>) –Connection instance
Returns:
-
dict(dict) –Kafka client options
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised