CSV¶
Bases: ReadWriteFileFormat
CSV file format. |support_hooks|
Based on Spark CSV <https://spark.apache.org/docs/latest/sql-data-sources-csv.html>_ file format.
Supports reading/writing files with .csv extension with content like:
.. code-block:: csv :caption: example.csv
"some","value"
"another","value"
.. versionadded:: 0.9.0
Examples:
.. note ::
You can pass any option mentioned in
`official documentation <https://spark.apache.org/docs/latest/sql-data-sources-csv.html>`_.
**Option names should be in** ``camelCase``!
The set of supported options depends on Spark version.
.. tabs::
.. code-tab:: py Reading files
from onetl.file.format import CSV
csv = CSV(header=True, inferSchema=True, mode="PERMISSIVE")
.. code-tab:: py Writing files
from onetl.file.format import CSV
csv = CSV(header=True, compression="gzip")
charToEscapeQuoteEscaping = Field(default=None, max_length=1)
class-attribute
instance-attribute
¶
If CSV field value contains :obj:~escape character, it should be escaped as well.
For example, if escape="\", when line:
.. code:: csv
"some \" quoted value",other
"some \\ backslashed value",another
will be parsed as:
.. code:: python
[
('some " quoted value', "other"),
("some \ backslashed value", "another"),
]
And vice-versa, for writing CSV rows to file.
Default is same as :obj:~escape.
columnNameOfCorruptRecord = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Name of column to put corrupt records in.
Default is _corrupt_record.
.. warning::
If DataFrame schema is provided, this column should be added to schema explicitly:
.. code:: python
from onetl.connection import SparkLocalFS
from onetl.file import FileDFReader
from onetl.file.format import CSV
from pyspark.sql.types import StructType, StructField, TimestampType, StringType
spark = ...
schema = StructType(
[
StructField("my_field", TimestampType()),
StructField("_corrupt_record", StringType()), # <-- important
]
)
csv = CSV(mode="PERMISSIVE", columnNameOfCorruptRecord="_corrupt_record")
reader = FileDFReader(
connection=connection,
format=csv,
df_schema=schema, # < ---
)
df = reader.run(["/some/file.csv"])
.. note::
Used only for reading files and :obj:`~parse_column` method.
comment = Field(default=None, max_length=1)
class-attribute
instance-attribute
¶
If set, all lines starting with specified character (e.g. #) are considered a comment, and skipped.
Default is not set, meaning that CSV lines should not contain comments.
.. note::
Used only for reading files and :obj:`~parse_column` method.
compression = None
class-attribute
instance-attribute
¶
Compression codec of the CSV file.
Default none.
.. note::
Used only for writing files. Ignored by :obj:`~parse_column` method.
dateFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for DateType() representation.
Default is yyyy-MM-dd.
delimiter = Field(default=',', alias=(avoid_alias('sep')))
class-attribute
instance-attribute
¶
Character used to separate fields in CSV row.
emptyValue = None
class-attribute
instance-attribute
¶
Value used for empty string fields.
Defaults:
- empty string for reading.
""for writing.
enforceSchema = None
class-attribute
instance-attribute
¶
If True, inferred or user-provided schema has higher priority than CSV file headers.
This means that all input files should have the same structure.
If False, CSV headers are used as a primary source of information about column names and their position.
Default True.
.. note::
Used only for reading files. Ignored by :obj:`~parse_column` function.
escapeQuotes = None
class-attribute
instance-attribute
¶
If True, escape quotes within CSV field.
.. code:: csv
any,field with \"quote,123,
If False, wrap fields containing :obj:~quote symbols with quotes.
.. code:: csv
any,"field with ""quote ",123,
Default True.
.. note::
Used only for writing files.
header = None
class-attribute
instance-attribute
¶
If True, the first row of the file is considered a header.
Default False.
ignoreLeadingWhiteSpace = None
class-attribute
instance-attribute
¶
If True, trim leading whitespaces in field value.
Defaults:
Truefor reading.Falsefor writing.
ignoreTrailingWhiteSpace = None
class-attribute
instance-attribute
¶
If True, trim trailing whitespaces in field value.
Defaults:
Truefor reading.Falsefor writing.
inferSchema = None
class-attribute
instance-attribute
¶
If True, try to infer the input schema by reading a sample of the file (see :obj:~samplingRatio).
Default False which means that all parsed columns will be StringType().
.. note::
Used only for reading files, and only if user haven't provider explicit DataFrame schema.
Ignored by :obj:`~parse_column` function.
locale = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
Locale name used to parse dates and timestamps.
Default is en-US
.. note::
Used only for reading files and :obj:`~parse_column` method.
maxCharsPerColumn = None
class-attribute
instance-attribute
¶
Maximum number of characters to read per column.
Default is -1, which means no limit.
.. note::
Used only for reading files and :obj:`~parse_column` method.
mode = None
class-attribute
instance-attribute
¶
How to handle parsing errors:
PERMISSIVE- set field value asnull, move raw data to :obj:~columnNameOfCorruptRecordcolumn.DROPMALFORMED- skip the malformed row.FAILFAST- throw an error immediately.
Default is PERMISSIVE.
.. note::
Used only for reading files and :obj:`~parse_column` method.
multiLine = None
class-attribute
instance-attribute
¶
If True, fields may contain line separators.
If False, the input is expected to have one record per file.
Default is True.
.. note::
Used only for reading files.
Ignored by :obj:`~parse_column` method, as it expects that each DataFrame row will contain exactly one CSV line.
nanValue = Field(default=None)
class-attribute
instance-attribute
¶
If set, this string will be considered as Not-A-Number (NaN) value for FloatType() and DoubleType().
Default is NaN.
.. note::
Used only for reading files and :obj:`~parse_column` method.
negativeInf = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
If set, this string will be considered as negative infinity value for FloatType() and DoubleType().
Default is -Inf.
.. note::
Used only for reading files and :obj:`~parse_column` method.
nullValue = None
class-attribute
instance-attribute
¶
If set, this value will be converted to null.
Default is empty string.
positiveInf = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
If set, this string will be considered as positive infinity value for FloatType() and DoubleType().
Default is Inf.
.. note::
Used only for reading files and :obj:`~parse_column` method.
preferDate = None
class-attribute
instance-attribute
¶
If True and inferSchema=True and column does match :obj:~dateFormat, consider it as DateType().
For columns matching both :obj:~timestampFormat and :obj:~dateFormat, consider it as TimestampType().
If False, date and timestamp columns will be considered as StringType().
Default True.
.. note::
Used only for reading files. Ignored by :obj:`~parse_column` function.
quote = Field(default='"', max_length=1)
class-attribute
instance-attribute
¶
Character used to quote field values within CSV field.
Empty string is considered as \u0000 (NUL) character.
quoteAll = None
class-attribute
instance-attribute
¶
If True, all fields are quoted:
.. code:: csv
"some","field with \"quote","123",""
If False, only quote fields containing :obj:~quote symbols.
.. code:: csv
any,"field with \"quote",123,
Default False.
.. note::
Used only for writing files.
samplingRatio = Field(default=None, ge=0, le=1)
class-attribute
instance-attribute
¶
For inferSchema=True, read the specified fraction of rows to infer the schema.
Default 1.
.. note::
Used only for reading files. Ignored by :obj:`~parse_column` function.
timestampFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for `TimestampType()representation.
Default isyyyy-MM-dd'T'HHss[.SSS][XXX]``.
timestampNTZFormat = Field(default=None, min_length=1)
class-attribute
instance-attribute
¶
String format for `TimestampNTZType()representation.
Default isyyyy-MM-dd'T'HHss[.SSS]``.
.. note::
Added in Spark 3.2.0
unescapedQuoteHandling = None
class-attribute
instance-attribute
¶
Define how to handle unescaped quotes within CSV field.
STOP_AT_CLOSING_QUOTE- collect all characters until closing quote.BACK_TO_DELIMITER- collect all characters until delimiter or line end.STOP_AT_DELIMITER- collect all characters until delimiter or line end. If quotes are not closed, this may produce incorrect results (e.g. including delimiter inside field value).SKIP_VALUE- skip field and consider it as :obj:~nullValue.RAISE_ERROR- raise error immediately.
Default STOP_AT_DELIMITER.
.. note::
Used only for reading files and :obj:`~parse_column` method.
parse_column(column, schema)
¶
Parses a CSV string column to a structured Spark SQL column using Spark's
from_csv <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_csv.html>_ function,
based on the provided schema.
.. note::
Can be used only with Spark 3.x+
.. versionadded:: 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the column object containing CSV strings/bytes to parse.
-
schema(StructType) –The schema to apply when parsing the CSV data. This defines the structure of the output DataFrame column.
Returns:
-
Column with deserialized data, with the same structure as the provided schema.– -
Column name is the same as input column.–
Examples:
>>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType
>>> from onetl.file.format import CSV
>>> df.show()
+--+--------+
|id|value |
+--+--------+
|1 |Alice;20|
|2 |Bob;25 |
+--+--------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)
>>> csv = CSV(delimiter=";")
>>> csv_schema = StructType(
... [
... StructField("name", StringType(), nullable=True),
... StructField("age", IntegerType(), nullable=True),
... ],
... )
>>> parsed_df = df.select("id", csv.parse_column("value", csv_schema))
>>> parsed_df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> parsed_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
serialize_column(column)
¶
Serializes a structured Spark SQL column into a CSV string column using Spark's
to_csv <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_csv.html>_ function.
.. note::
Can be used only with Spark 3.x+
.. versionadded:: 0.11.0
Parameters:
-
column(str | Column) –The name of the column or the Column object containing the data to serialize to CSV.
Returns:
-
Column with string CSV data. Column name is the same as input column.–
Examples:
>>> from pyspark.sql.functions import decode
>>> from onetl.file.format import CSV
>>> df.show()
+--+-----------+
|id|value |
+--+-----------+
|1 |{Alice, 20}|
|2 | {Bob, 25}|
+--+-----------+
>>> df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- age: integer (nullable = true)
>>> # serializing data into CSV format
>>> csv = CSV(delimiter=";")
>>> serialized_df = df.select("id", csv.serialize_column("value"))
>>> serialized_df.show(truncate=False)
+--+--------+
|id|value |
+--+--------+
|1 |Alice;20|
|2 |Bob;25 |
+--+--------+
>>> serialized_df.printSchema()
root
|-- id: integer (nullable = true)
|-- value: string (nullable = true)