Avro¶
Bases: ReadWriteFileFormat
Avro file format. |support_hooks|
Based on Spark Avro <https://spark.apache.org/docs/latest/sql-data-sources-avro.html>_ file format.
Supports reading/writing files with .avro extension.
.. dropdown:: Version compatibility
* Spark versions: 2.4.x - 3.5.x
* Java versions: 8 - 20
See documentation from link above.
.. versionadded:: 0.9.0
Examples:
.. note ::
You can pass any option mentioned in
`official documentation <https://spark.apache.org/docs/latest/sql-data-sources-avro.html>`_.
**Option names should be in** ``camelCase``!
The set of supported options depends on Spark version.
.. tabs::
.. code-tab:: py Reading files
from pyspark.sql import SparkSession
from onetl.file.format import Avro
# Create Spark session with Avro package loaded
maven_packages = Avro.get_packages(spark_version="3.5.8")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
avro = Avro(avroSchema=schema) # or avroSchemaUrl=...
.. code-tab:: py Writing files
# Create Spark session with Avro package loaded
spark = ...
from onetl.file.format import Avro
schema = {
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
}
avro = Avro(
avroSchema=schema, # or avroSchemaUrl=...
compression="snappy",
)
schema_dict = Field(default=None, alias='avroSchema')
class-attribute
instance-attribute
¶
Avro schema in JSON format representation.
.. code:: python
avro = Avro(
avroSchema={
"type": "record",
"name": "Person",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
],
},
)
If set, all records should match this schema.
.. warning::
Mutually exclusive with :obj:`~schema_url`.
schema_url = Field(default=None, alias='avroSchemaUrl')
class-attribute
instance-attribute
¶
URL to Avro schema in JSON format. Usually points to Schema Registry, like:
.. code:: python
schema_registry = "http://some.schema.registry.domain"
name = "MyAwesomeSchema"
version = "latest"
schema_url = f"{schema_registry}/subjects/{name}/versions/{version}/schema"
avro = Avro(avroSchemaUrl=schema_url)
If set, schema is fetched before any records are parsed, so all records should match this schema.
.. warning::
Mutually exclusive with :obj:`~schema_dict`.
recordName = None
class-attribute
instance-attribute
¶
Record name in written Avro schema.
Default is topLevelRecord.
.. note::
Used only for writing files and by :obj:`~serialize_column`.
recordNamespace = None
class-attribute
instance-attribute
¶
Record namespace in written Avro schema. Default is not set.
.. note::
Used only for writing files and by :obj:`~serialize_column`.
compression = None
class-attribute
instance-attribute
¶
Compression codec.
By default, Spark config value spark.sql.avro.compression.codec (snappy) is used.
.. note::
Used only for writing files. Ignored by :obj:`~serialize_column`.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull.FAILFAST- throw an error immediately.
Default is FAILFAST.
.. note::
Used only by :obj:`~parse_column` method.
datetimeRebaseMode = None
class-attribute
instance-attribute
¶
While converting dates/timestamps from Julian to Proleptic Gregorian calendar, handle value ambiguity:
EXCEPTION- fail if ancient dates/timestamps are ambiguous between the two calendars.CORRECTED- load dates/timestamps without as-is.LEGACY- rebase ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.
By default, Spark config value spark.sql.avro.datetimeRebaseModeInRead (CORRECTED) is used.
.. note::
Used only for reading files and by :obj:`~parse_column`.
positionalFieldMatching = None
class-attribute
instance-attribute
¶
If True, match Avro schema field and DataFrame column by position.
If False, match by name.
Default is False.
enableStableIdentifiersForUnionType = None
class-attribute
instance-attribute
¶
Avro schema may contain union types, which are not supported by Spark. Different variants of union are split to separated DataFrame columns with respective type.
If option value is True, DataFrame column names are based on Avro variant names,
e.g. member_int, member_string.
If False, DataFrame column names are generated using field position, e.g. member0, member1.
Default is False.
.. note::
Used only for reading files and by :obj:`~parse_column`.
get_packages(spark_version, scala_version=None)
classmethod
¶
Get package names to be downloaded by Spark. |support_hooks|
See Maven package index <https://mvnrepository.com/artifact/org.apache.spark/spark-avro>_
for all available packages.
.. versionadded:: 0.9.0
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version.
Examples:
.. code:: python
from onetl.file.format import Avro
Avro.get_packages(spark_version="3.5.8")
Avro.get_packages(spark_version="3.5.8", scala_version="2.12")
parse_column(column)
¶
Parses an Avro binary column into a structured Spark SQL column using Spark's
from_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.from_avro.html>_ function,
based on the schema provided within the class.
.. note::
Can be used only with Spark 3.x+
.. warning::
If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL.
It should be installed manually, like this:
.. code:: bash
pip install requests
.. versionadded:: 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing Avro bytes to deserialize. Schema should match the provided Avro schema.
Returns:
-
Column with deserialized data. Schema is matching the provided Avro schema.– -
Column name is the same as input column.–
Raises:
-
ValueError–If neither
avroSchemanoravroSchemaUrlare defined. -
ImportError–If
schema_urlis used and therequestslibrary is not installed.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+----+----------------------+----------+---------+------+-----------------------+-------------+
|key |value |topic |partition|offset|timestamp |timestampType|
+----+----------------------+----------+---------+------+-----------------------+-------------+
|[31]|[0A 41 6C 69 63 65 28]|topicAvro |0 |0 |2024-04-24 13:02:25.911|0 |
|[32]|[06 42 6F 62 32] |topicAvro |0 |1 |2024-04-24 13:02:25.922|0 |
+----+----------------------+----------+---------+------+-----------------------+-------------+
>>> df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
>>> avro = Avro(
... avroSchema={ # or avroSchemaUrl=...
... "type": "record",
... "name": "Person",
... "fields": [
... {"name": "name", "type": "string"},
... {"name": "age", "type": "int"},
... ],
... }
... )
>>> parsed_df = df.select(decode("key", "UTF-8").alias("key"), avro.parse_column("value"))
>>> parsed_df.show(truncate=False)
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 |{Bob, 25} |
+---+-----------+
>>> parsed_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
serialize_column(column)
¶
Serializes a structured Spark SQL column into an Avro binary column using Spark's
to_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.to_avro.html#pyspark.sql.avro.functions.to_avro>_ function.
.. note::
Can be used only with Spark 3.x+
.. warning::
If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL.
It should be installed manually, like this:
.. code:: bash
pip install requests
.. versionadded:: 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing the data to serialize to Avro format.
Returns:
-
Column with binary Avro data. Column name is the same as input column.–
Raises:
-
ValueError–If the Spark version is less than 3.x.
-
ImportError–If
schema_urlis used and therequestslibrary is not installed.
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+---+-----------+
|key|value |
+---+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+---+-----------+
>>> df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
>>> # serializing data into Avro format
>>> avro = Avro(
... avroSchema={ # or avroSchemaUrl=...
... "type": "record",
... "name": "Person",
... "fields": [
... {"name": "name", "type": "string"},
... {"name": "age", "type": "int"},
... ],
... }
... )
>>> serialized_df = df.select("key", avro.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+---+----------------------+
|key|value |
+---+----------------------+
| 1|[0A 41 6C 69 63 65 28]|
| 2|[06 42 6F 62 32] |
+---+----------------------+
>>> serialized_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: binary (nullable = true)