Skip to content

XML

Bases: ReadWriteFileFormat

XML file format. |support_hooks|

Based on Databricks Spark XML <https://github.com/databricks/spark-xml>_ file format.

Supports reading/writing files with .xml extension.

.. versionadded:: 0.9.5

.. dropdown:: Version compatibility

* Spark versions: 3.2.x - 3.5.x
* Java versions: 8 - 20

See `official documentation <https://github.com/databricks/spark-xml>`_.

Examples:

.. note ::

You can pass any option mentioned in
`official documentation <https://github.com/databricks/spark-xml>`_.
**Option names should be in** ``camelCase``!

The set of supported options depends on ``spark-xml`` version.

.. tabs::

.. code-tab:: py Reading files

    from onetl.file.format import XML
    from pyspark.sql import SparkSession

    # Create Spark session with XML package loaded
    maven_packages = XML.get_packages(spark_version="3.5.8")
    spark = (
        SparkSession.builder.appName("spark-app-name")
        .config("spark.jars.packages", ",".join(maven_packages))
        .getOrCreate()
    )

    xml = XML(rowTag="item", mode="PERMISSIVE")

.. tab:: Writing files

    .. warning::

        Due to `bug <https://github.com/databricks/spark-xml/issues/664>`_ written files
        currently do not have ``.xml`` extension.

    .. code:: python

        # Create Spark session with XML package loaded
        spark = ...

        from onetl.file.format import XML

        xml = XML(rowTag="item", rootTag="data", compression="gzip")

arrayElementName = None class-attribute instance-attribute

If DataFrame column is ArrayType, its content will be written to XML inside <arrayElementName>...</arrayElementName> tag. Default is item.

.. note::

Used only for writing files.

attributePrefix = None class-attribute instance-attribute

While parsing tags containing attributes like <sometag attr="value">, attributes are stored as DataFrame schema columns with specified prefix, e.g. _attr. Default _.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

charset = None class-attribute instance-attribute

File encoding. Default is UTF-8

.. note::

Used only for reading files or by :obj:`~parse_column` function.

columnNameOfCorruptRecord = None class-attribute instance-attribute

Name of DataFrame column there corrupted row is stored with mode=PERMISSIVE.

.. warning::

If DataFrame schema is provided, this column should be added to schema explicitly:

.. code:: python

    from onetl.connection import SparkLocalFS
    from onetl.file import FileDFReader
    from onetl.file.format import XML

    from pyspark.sql.types import StructType, StructField, TImestampType, StringType

    spark = ...
    schema = StructType(
        [
            StructField("my_field", TimestampType()),
            StructField("_corrupt_record", StringType()),  # <-- important
        ]
    )
    xml = XML(rowTag="item", columnNameOfCorruptRecord="_corrupt_record")

    reader = FileDFReader(
        connection=connection,
        format=xml,
        df_schema=schema,  # < ---
    )
    df = reader.run(["/some/file.xml"])

.. note::

Used only for reading files or by :obj:`~parse_column` function.

compression = None class-attribute instance-attribute

Compression codec. By default no compression is used.

.. note::

Used only for writing files.

dateFormat = None class-attribute instance-attribute

Format string used for parsing or serializing date values. By default, ISO 8601 format is used (yyyy-MM-dd).

declaration = None class-attribute instance-attribute

Content of <?XML ... ?> declaration. Default is version="1.0" encoding="UTF-8" standalone="yes".

.. note::

Used only for writing files.

excludeAttribute = None class-attribute instance-attribute

If True, exclude attributes while parsing tags like <sometag attr="value">. Default false.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

ignoreNamespace = None class-attribute instance-attribute

If True, all namespaces like <ns:tag> will be ignored and treated as just <tag>. Default False.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

ignoreSurroundingSpaces = None class-attribute instance-attribute

If True, trim surrounding spaces while parsing values. Default false.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

inferSchema = None class-attribute instance-attribute

If True, try to infer the input schema by reading a sample of the file (see :obj:~samplingRatio). Default False which means that all parsed columns will be StringType().

.. note::

Used only for reading files. Ignored by :obj:`~parse_column` function.

mode = None class-attribute instance-attribute

How to handle parsing errors:

  • PERMISSIVE - set field value as null, move raw data to :obj:~columnNameOfCorruptRecord column.
  • DROPMALFORMED - skip the malformed row.
  • FAILFAST - throw an error immediately.

Default is PERMISSIVE.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

nullValue = None class-attribute instance-attribute

String value used to represent null. Default is null string.

rootTag = None class-attribute instance-attribute

XML tag that encloses content of all DataFrame. Default is ROWS.

.. note::

Used only for writing files.

row_tag = Field(alias='rowTag') class-attribute instance-attribute

XML tag that encloses each row in XML. Required.

rowValidationXSDPath = None class-attribute instance-attribute

Path to XSD file which should be used to validate each row. If row does not match XSD, it will be treated as error, behavior depends on :obj:~mode value.

Default is no validation.

.. note::

If Spark session is created with ``master=yarn`` or ``master=k8s``, XSD
file should be accessible from all Spark nodes. This can be achieved by calling:

.. code:: python

    spark.addFile("/path/to/file.xsd")

And then by passing ``rowValidationXSDPath=file.xsd`` (relative path).

.. note::

Used only for reading files or by :obj:`~parse_column` function.

samplingRatio = Field(default=None, ge=0, le=1) class-attribute instance-attribute

For inferSchema=True, read the specified fraction of rows to infer the schema. Default 1.

.. note::

Used only for reading files. Ignored by :obj:`~parse_column` function.

timestampFormat = None class-attribute instance-attribute

Format string used for parsing or serializing timestamp values. By default, ISO 8601 format is used (yyyy-MM-ddTHH:mm:ss.SSSZ).

valueTag = None class-attribute instance-attribute

Value used to replace missing values while parsing attributes like <sometag someattr>. Default _VALUE.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

wildcardColName = None class-attribute instance-attribute

Name of column or columns which should be preserved as raw XML string, and not parsed.

.. warning::

If DataFrame schema is provided, this column should be added to schema explicitly.
See :obj:`~columnNameOfCorruptRecord` example.

.. note::

Used only for reading files or by :obj:`~parse_column` function.

get_packages(spark_version, scala_version=None, package_version=None) classmethod

Get package names to be downloaded by Spark. |support_hooks|

.. note::

For Spark 4.x this is not required anymore.

.. versionadded:: 0.9.5

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

  • package_version (str, default: None ) –

    Package version in format major.minor.patch. Default is 0.18.0.

    See Maven index <https://mvnrepository.com/artifact/com.databricks/spark-xml>_ for list of available versions.

    .. warning::

    Version ``0.13`` and below are not supported.
    

    .. note::

    It is not guaranteed that custom package versions are supported.
    Tests are performed only for default version.
    

Examples:

.. code:: python

from onetl.file.format import XML

XML.get_packages(spark_version="3.5.8")
XML.get_packages(spark_version="3.5.8", scala_version="2.12")
XML.get_packages(
    spark_version="3.5.8",
    scala_version="2.12",
    package_version="0.18.0",
)

parse_column(column, schema)

Parses an XML string column into a structured Spark SQL column using the from_xml function provided by the Databricks Spark XML library <https://github.com/databricks/spark-xml#pyspark-notes>_ based on the provided schema.

.. note::

This method assumes that the ``spark-xml`` package is installed: :obj:`~get_packages`.

.. note::

This method parses each DataFrame row individually. Therefore, for a specific column,
each row must contain exactly one occurrence of the ``rowTag`` specified.

If your XML data includes a root tag that encapsulates multiple row tags, you can adjust the schema
to use an ``ArrayType`` to keep all child elements under the single root.

.. code-block:: xml

    <books>
        <book><title>Book One</title><author>Author A</author></book>
        <book><title>Book Two</title><author>Author B</author></book>
    </books>

And the corresponding schema in Spark using an ``ArrayType``:

.. code-block:: python

    from pyspark.sql.types import StructType, StructField, ArrayType, StringType
    from onetl.file.format import XML

    # each DataFrame row has exactly one <books> tag
    xml = XML(rowTag="books")
    # each <books> tag have multiple <book> tags, so using ArrayType for such field
    schema = StructType(
        [
            StructField(
                "book",
                ArrayType(
                    StructType(
                        [
                            StructField("title", StringType(), nullable=True),
                            StructField("author", StringType(), nullable=True),
                        ],
                    ),
                ),
                nullable=True,
            ),
        ],
    )

.. versionadded:: 0.11.0

Parameters:

  • column (str | Column) –

    The name of the column or the column object containing XML strings/bytes to parse.

  • schema (StructType) –

    The schema to apply when parsing the XML data. This defines the structure of the output DataFrame column.

Returns:

  • Column with deserialized data, with the same structure as the provided schema.
  • Column name is the same as input column.

Examples:

>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import XML
>>> df.show()
+--+------------------------------------------------+
|id|value                                           |
+--+------------------------------------------------+
|1 |<person><name>Alice</name><age>20</age></person>|
|2 |<person><name>Bob</name><age>25</age></person>  |
+--+------------------------------------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> xml = XML(rowTag="person")
>>> xml_schema = StructType(
...     [
...         StructField("name", StringType(), nullable=True),
...         StructField("age", IntegerType(), nullable=True),
...     ],
... )
>>> parsed_df = df.select("key", xml.parse_column("value", xml_schema))
>>> parsed_df.show()
+--+-----------+
|id|value      |
+--+-----------+
|1 |{Alice, 20}|
|2 |  {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
|    |-- name: string (nullable = true)
|    |-- age: integer (nullable = true)