XML¶
Bases: ReadWriteFileFormat
XML file format. |support_hooks|
Based on Databricks Spark XML <https://github.com/databricks/spark-xml>_ file format.
Supports reading/writing files with .xml extension.
.. versionadded:: 0.9.5
.. dropdown:: Version compatibility
* Spark versions: 3.2.x - 3.5.x
* Java versions: 8 - 20
See `official documentation <https://github.com/databricks/spark-xml>`_.
Examples:
.. note ::
You can pass any option mentioned in
`official documentation <https://github.com/databricks/spark-xml>`_.
**Option names should be in** ``camelCase``!
The set of supported options depends on ``spark-xml`` version.
.. tabs::
.. code-tab:: py Reading files
from onetl.file.format import XML
from pyspark.sql import SparkSession
# Create Spark session with XML package loaded
maven_packages = XML.get_packages(spark_version="3.5.8")
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
xml = XML(rowTag="item", mode="PERMISSIVE")
.. tab:: Writing files
.. warning::
Due to `bug <https://github.com/databricks/spark-xml/issues/664>`_ written files
currently do not have ``.xml`` extension.
.. code:: python
# Create Spark session with XML package loaded
spark = ...
from onetl.file.format import XML
xml = XML(rowTag="item", rootTag="data", compression="gzip")
arrayElementName = None
class-attribute
instance-attribute
¶
If DataFrame column is ArrayType, its content will be written to XML
inside <arrayElementName>...</arrayElementName> tag.
Default is item.
.. note::
Used only for writing files.
attributePrefix = None
class-attribute
instance-attribute
¶
While parsing tags containing attributes like <sometag attr="value">, attributes are stored as
DataFrame schema columns with specified prefix, e.g. _attr.
Default _.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
charset = None
class-attribute
instance-attribute
¶
File encoding. Default is UTF-8
.. note::
Used only for reading files or by :obj:`~parse_column` function.
columnNameOfCorruptRecord = None
class-attribute
instance-attribute
¶
Name of DataFrame column there corrupted row is stored with mode=PERMISSIVE.
.. warning::
If DataFrame schema is provided, this column should be added to schema explicitly:
.. code:: python
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import XML
from pyspark.sql.types import StructType, StructField, TImestampType, StringType
spark = ...
schema = StructType(
[
StructField("my_field", TimestampType()),
StructField("_corrupt_record", StringType()), # <-- important
]
)
xml = XML(rowTag="item", columnNameOfCorruptRecord="_corrupt_record")
reader = FileDFReader(
connection=connection,
format=xml,
df_schema=schema, # < ---
)
df = reader.run(["/some/file.xml"])
.. note::
Used only for reading files or by :obj:`~parse_column` function.
compression = None
class-attribute
instance-attribute
¶
Compression codec. By default no compression is used.
.. note::
Used only for writing files.
dateFormat = None
class-attribute
instance-attribute
¶
Format string used for parsing or serializing date values.
By default, ISO 8601 format is used (yyyy-MM-dd).
declaration = None
class-attribute
instance-attribute
¶
Content of <?XML ... ?> declaration.
Default is version="1.0" encoding="UTF-8" standalone="yes".
.. note::
Used only for writing files.
excludeAttribute = None
class-attribute
instance-attribute
¶
If True, exclude attributes while parsing tags like <sometag attr="value">.
Default false.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
ignoreNamespace = None
class-attribute
instance-attribute
¶
If True, all namespaces like <ns:tag> will be ignored and treated as just <tag>.
Default False.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
ignoreSurroundingSpaces = None
class-attribute
instance-attribute
¶
If True, trim surrounding spaces while parsing values. Default false.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
inferSchema = None
class-attribute
instance-attribute
¶
If True, try to infer the input schema by reading a sample of the file (see :obj:~samplingRatio).
Default False which means that all parsed columns will be StringType().
.. note::
Used only for reading files. Ignored by :obj:`~parse_column` function.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull, move raw data to :obj:~columnNameOfCorruptRecordcolumn.DROPMALFORMED- skip the malformed row.FAILFAST- throw an error immediately.
Default is PERMISSIVE.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
nullValue = None
class-attribute
instance-attribute
¶
String value used to represent null. Default is null string.
rootTag = None
class-attribute
instance-attribute
¶
XML tag that encloses content of all DataFrame. Default is ROWS.
.. note::
Used only for writing files.
row_tag = Field(alias='rowTag')
class-attribute
instance-attribute
¶
XML tag that encloses each row in XML. Required.
rowValidationXSDPath = None
class-attribute
instance-attribute
¶
Path to XSD file which should be used to validate each row.
If row does not match XSD, it will be treated as error, behavior depends on :obj:~mode value.
Default is no validation.
.. note::
If Spark session is created with ``master=yarn`` or ``master=k8s``, XSD
file should be accessible from all Spark nodes. This can be achieved by calling:
.. code:: python
spark.addFile("/path/to/file.xsd")
And then by passing ``rowValidationXSDPath=file.xsd`` (relative path).
.. note::
Used only for reading files or by :obj:`~parse_column` function.
samplingRatio = Field(default=None, ge=0, le=1)
class-attribute
instance-attribute
¶
For inferSchema=True, read the specified fraction of rows to infer the schema.
Default 1.
.. note::
Used only for reading files. Ignored by :obj:`~parse_column` function.
timestampFormat = None
class-attribute
instance-attribute
¶
Format string used for parsing or serializing timestamp values.
By default, ISO 8601 format is used (yyyy-MM-ddTHH:mm:ss.SSSZ).
valueTag = None
class-attribute
instance-attribute
¶
Value used to replace missing values while parsing attributes like <sometag someattr>.
Default _VALUE.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
wildcardColName = None
class-attribute
instance-attribute
¶
Name of column or columns which should be preserved as raw XML string, and not parsed.
.. warning::
If DataFrame schema is provided, this column should be added to schema explicitly.
See :obj:`~columnNameOfCorruptRecord` example.
.. note::
Used only for reading files or by :obj:`~parse_column` function.
get_packages(spark_version, scala_version=None, package_version=None)
classmethod
¶
Get package names to be downloaded by Spark. |support_hooks|
.. note::
For Spark 4.x this is not required anymore.
.. versionadded:: 0.9.5
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version. -
package_version(str, default:None) –Package version in format
major.minor.patch. Default is0.18.0.See
Maven index <https://mvnrepository.com/artifact/com.databricks/spark-xml>_ for list of available versions... warning::
Version ``0.13`` and below are not supported... note::
It is not guaranteed that custom package versions are supported. Tests are performed only for default version.
Examples:
.. code:: python
from onetl.file.format import XML
XML.get_packages(spark_version="3.5.8")
XML.get_packages(spark_version="3.5.8", scala_version="2.12")
XML.get_packages(
spark_version="3.5.8",
scala_version="2.12",
package_version="0.18.0",
)
parse_column(column, schema)
¶
Parses an XML string column into a structured Spark SQL column using the from_xml function
provided by the Databricks Spark XML library <https://github.com/databricks/spark-xml#pyspark-notes>_
based on the provided schema.
.. note::
This method assumes that the ``spark-xml`` package is installed: :obj:`~get_packages`.
.. note::
This method parses each DataFrame row individually. Therefore, for a specific column,
each row must contain exactly one occurrence of the ``rowTag`` specified.
If your XML data includes a root tag that encapsulates multiple row tags, you can adjust the schema
to use an ``ArrayType`` to keep all child elements under the single root.
.. code-block:: xml
<books>
<book><title>Book One</title><author>Author A</author></book>
<book><title>Book Two</title><author>Author B</author></book>
</books>
And the corresponding schema in Spark using an ``ArrayType``:
.. code-block:: python
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from onetl.file.format import XML
# each DataFrame row has exactly one <books> tag
xml = XML(rowTag="books")
# each <books> tag have multiple <book> tags, so using ArrayType for such field
schema = StructType(
[
StructField(
"book",
ArrayType(
StructType(
[
StructField("title", StringType(), nullable=True),
StructField("author", StringType(), nullable=True),
],
),
),
nullable=True,
),
],
)
.. versionadded:: 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing XML strings/bytes to parse.
-
schema(StructType) –The schema to apply when parsing the XML data. This defines the structure of the output DataFrame column.
Returns:
-
Column with deserialized data, with the same structure as the provided schema.– -
Column name is the same as input column.–
Examples:
>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import XML
>>> df.show()
+--+------------------------------------------------+
|id|value |
+--+------------------------------------------------+
|1 |<person><name>Alice</name><age>20</age></person>|
|2 |<person><name>Bob</name><age>25</age></person> |
+--+------------------------------------------------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> xml = XML(rowTag="person")
>>> xml_schema = StructType(
... [
... StructField("name", StringType(), nullable=True),
... StructField("age", IntegerType(), nullable=True),
... ],
... )
>>> parsed_df = df.select("key", xml.parse_column("value", xml_schema))
>>> parsed_df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)