Skip to content

Avro

Bases: ReadWriteFileFormat

Avro file format. |support_hooks|

Based on Spark Avro <https://spark.apache.org/docs/latest/sql-data-sources-avro.html>_ file format.

Supports reading/writing files with .avro extension.

.. dropdown:: Version compatibility

* Spark versions: 2.4.x - 3.5.x
* Java versions: 8 - 20

See documentation from link above.

.. versionadded:: 0.9.0

Examples:

.. note ::

You can pass any option mentioned in
`official documentation <https://spark.apache.org/docs/latest/sql-data-sources-avro.html>`_.
**Option names should be in** ``camelCase``!

The set of supported options depends on Spark version.

.. tabs::

.. code-tab:: py Reading files

    from pyspark.sql import SparkSession
    from onetl.file.format import Avro

    # Create Spark session with Avro package loaded
    maven_packages = Avro.get_packages(spark_version="3.5.8")
    spark = (
        SparkSession.builder.appName("spark-app-name")
        .config("spark.jars.packages", ",".join(maven_packages))
        .getOrCreate()
    )

    schema = {
        "type": "record",
        "name": "Person",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
        ],
    }
    avro = Avro(avroSchema=schema)  # or avroSchemaUrl=...

.. code-tab:: py Writing files

    # Create Spark session with Avro package loaded
    spark = ...

    from onetl.file.format import Avro

    schema = {
        "type": "record",
        "name": "Person",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
        ],
    }
    avro = Avro(
        avroSchema=schema,  # or avroSchemaUrl=...
        compression="snappy",
    )

schema_dict = Field(default=None, alias='avroSchema') class-attribute instance-attribute

Avro schema in JSON format representation.

.. code:: python

avro = Avro(
    avroSchema={
        "type": "record",
        "name": "Person",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "age", "type": "int"},
        ],
    },
)

If set, all records should match this schema.

.. warning::

Mutually exclusive with :obj:`~schema_url`.

schema_url = Field(default=None, alias='avroSchemaUrl') class-attribute instance-attribute

URL to Avro schema in JSON format. Usually points to Schema Registry, like:

.. code:: python

schema_registry = "http://some.schema.registry.domain"
name = "MyAwesomeSchema"
version = "latest"

schema_url = f"{schema_registry}/subjects/{name}/versions/{version}/schema"
avro = Avro(avroSchemaUrl=schema_url)

If set, schema is fetched before any records are parsed, so all records should match this schema.

.. warning::

Mutually exclusive with :obj:`~schema_dict`.

recordName = None class-attribute instance-attribute

Record name in written Avro schema. Default is topLevelRecord.

.. note::

Used only for writing files and by :obj:`~serialize_column`.

recordNamespace = None class-attribute instance-attribute

Record namespace in written Avro schema. Default is not set.

.. note::

Used only for writing files and by :obj:`~serialize_column`.

compression = None class-attribute instance-attribute

Compression codec. By default, Spark config value spark.sql.avro.compression.codec (snappy) is used.

.. note::

Used only for writing files. Ignored by :obj:`~serialize_column`.

mode = None class-attribute instance-attribute

How to handle parsing errors:

  • PERMISSIVE - set field value as null.
  • FAILFAST - throw an error immediately.

Default is FAILFAST.

.. note::

Used only by :obj:`~parse_column` method.

datetimeRebaseMode = None class-attribute instance-attribute

While converting dates/timestamps from Julian to Proleptic Gregorian calendar, handle value ambiguity:

  • EXCEPTION - fail if ancient dates/timestamps are ambiguous between the two calendars.
  • CORRECTED - load dates/timestamps without as-is.
  • LEGACY - rebase ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.

By default, Spark config value spark.sql.avro.datetimeRebaseModeInRead (CORRECTED) is used.

.. note::

Used only for reading files and by :obj:`~parse_column`.

positionalFieldMatching = None class-attribute instance-attribute

If True, match Avro schema field and DataFrame column by position. If False, match by name.

Default is False.

enableStableIdentifiersForUnionType = None class-attribute instance-attribute

Avro schema may contain union types, which are not supported by Spark. Different variants of union are split to separated DataFrame columns with respective type.

If option value is True, DataFrame column names are based on Avro variant names, e.g. member_int, member_string. If False, DataFrame column names are generated using field position, e.g. member0, member1.

Default is False.

.. note::

Used only for reading files and by :obj:`~parse_column`.

get_packages(spark_version, scala_version=None) classmethod

Get package names to be downloaded by Spark. |support_hooks|

See Maven package index <https://mvnrepository.com/artifact/org.apache.spark/spark-avro>_ for all available packages.

.. versionadded:: 0.9.0

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

Examples:

.. code:: python

from onetl.file.format import Avro

Avro.get_packages(spark_version="3.5.8")
Avro.get_packages(spark_version="3.5.8", scala_version="2.12")

parse_column(column)

Parses an Avro binary column into a structured Spark SQL column using Spark's from_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.from_avro.html>_ function, based on the schema provided within the class.

.. note::

Can be used only with Spark 3.x+

.. warning::

If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL.
It should be installed manually, like this:

.. code:: bash

    pip install requests

.. versionadded:: 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing Avro bytes to deserialize. Schema should match the provided Avro schema.

Returns:

  • Column with deserialized data. Schema is matching the provided Avro schema.
  • Column name is the same as input column.

Raises:

  • ValueError

    If neither avroSchema nor avroSchemaUrl are defined.

  • ImportError

    If schema_url is used and the requests library is not installed.

Examples:

>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+----+----------------------+----------+---------+------+-----------------------+-------------+
|key |value                 |topic     |partition|offset|timestamp              |timestampType|
+----+----------------------+----------+---------+------+-----------------------+-------------+
|[31]|[0A 41 6C 69 63 65 28]|topicAvro |0        |0     |2024-04-24 13:02:25.911|0            |
|[32]|[06 42 6F 62 32]      |topicAvro |0        |1     |2024-04-24 13:02:25.922|0            |
+----+----------------------+----------+---------+------+-----------------------+-------------+
>>> df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: integer (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
>>> avro = Avro(
...     avroSchema={  # or avroSchemaUrl=...
...         "type": "record",
...         "name": "Person",
...         "fields": [
...             {"name": "name", "type": "string"},
...             {"name": "age", "type": "int"},
...         ],
...     }
... )
>>> parsed_df = df.select(decode("key", "UTF-8").alias("key"), avro.parse_column("value"))
>>> parsed_df.show(truncate=False)
+---+-----------+
|key|value      |
+---+-----------+
|1  |{Alice, 20}|
|2  |{Bob, 25}  |
+---+-----------+
>>> parsed_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)

serialize_column(column)

Serializes a structured Spark SQL column into an Avro binary column using Spark's to_avro <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.avro.functions.to_avro.html#pyspark.sql.avro.functions.to_avro>_ function.

.. note::

Can be used only with Spark 3.x+

.. warning::

If ``schema_url`` is provided, ``requests`` library is used to fetch the schema from the URL.
It should be installed manually, like this:

.. code:: bash

    pip install requests

.. versionadded:: 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing the data to serialize to Avro format.

Returns:

  • Column with binary Avro data. Column name is the same as input column.

Raises:

  • ValueError

    If the Spark version is less than 3.x.

  • ImportError

    If schema_url is used and the requests library is not installed.

Examples:

>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import Avro
>>> df.show()
+---+-----------+
|key|value      |
+---+-----------+
|1  |{Alice, 20}|
|2  |  {Bob, 25}|
+---+-----------+
>>> df.printSchema()
root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)
>>> # serializing data into Avro format
>>> avro = Avro(
...     avroSchema={  # or avroSchemaUrl=...
...         "type": "record",
...         "name": "Person",
...         "fields": [
...             {"name": "name", "type": "string"},
...             {"name": "age", "type": "int"},
...         ],
...     }
... )
>>> serialized_df = df.select("key", avro.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+---+----------------------+
|key|value                 |
+---+----------------------+
|  1|[0A 41 6C 69 63 65 28]|
|  2|[06 42 6F 62 32]      |
+---+----------------------+
>>> serialized_df.printSchema()
root
|-- key: string (nullable = true)
|-- value: binary (nullable = true)