Skip to content

Writing to MySQL using DBWriter

For writing data to MySQL, use DBWriter.

Warning

Please take into account MySQL types

Warning

It is always recommended to create table explicitly using MySQL.execute instead of relying on Spark's table DDL generation.

This is because Spark's DDL generator can create columns with different precision and types than it is expected, causing precision loss or other issues.

Examples

from onetl.connection import MySQL
from onetl.db import DBWriter

mysql = MySQL(...)

df = ...  # data is here

writer = DBWriter(
    connection=mysql,
    target="schema.table",
    options=MySQL.WriteOptions(
        if_exists="append",
        # ENGINE is required by MySQL
        createTableOptions="ENGINE = MergeTree() ORDER BY id",
    ),
)

writer.run(df)

Options

Method above accepts MySQL.WriteOptions

MySQLWriteOptions

Bases: JDBCWriteOptions

Spark JDBC writing options.

.. versionadded:: 0.5.0 Replace MySQL.OptionsMySQL.WriteOptions

Examples:

.. note ::

You can pass any value
`supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on Spark version.

.. code:: python

from onetl.connection import MySQL

options = MySQL.WriteOptions(
    if_exists="append",
    batchsize=20_000,
    customSparkOption="value",
)

__doc__ = JDBCWriteOptions.__doc__.replace('SomeDB', 'MySQL') class-attribute instance-attribute

_forward_refs_updated = False class-attribute

batchsize = 20000 class-attribute instance-attribute

How many rows can be inserted per round trip.

Tuning this option can influence performance of writing.

.. warning::

Default value is different from Spark.

Spark uses quite small value ``1000``, which is absolutely not usable
in BigData world.

Thus we've overridden default value with ``20_000``,
which should increase writing performance.

You can increase it even more, up to ``50_000``,
but it depends on your database load and number of columns in the row.
Higher values does not increase performance.

.. versionchanged:: 0.4.0 Changed default value from 1000 to 20_000

if_exists = Field(default=(JDBCTableExistBehavior.APPEND), alias=(avoid_alias('mode'))) class-attribute instance-attribute

Behavior of writing data into existing table.

Possible values:

  • append (default) Adds new rows into existing table.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``createTableOptions``, ``createTableColumnTypes``, etc).
    
    * Table exists
        Data is appended to a table. Table has the same DDL as before writing data
    
        .. warning::
    
            This mode does not check whether table already contains
            rows from dataframe, so duplicated rows can be created.
    
            Also Spark does not support passing custom options to
            insert statement, like ``ON CONFLICT``, so don't try to
            implement deduplication using unique indexes or constraints.
    
            Instead, write to staging table and perform deduplication
            using :obj:`~execute` method.
    
  • replace_entire_table Table is dropped and then created, or truncated.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``createTableOptions``, ``createTableColumnTypes``, etc).
    
    * Table exists
        Table content is replaced with dataframe content.
    
        After writing completed, target table could either have the same DDL as
        before writing data (``truncate=True``), or can be recreated (``truncate=False``
        or source does not support truncation).
    
  • ignore Ignores the write operation if the table already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``createTableOptions``, ``createTableColumnTypes``, etc).
    
    * Table exists
        The write operation is ignored, and no data is written to the table.
    
  • error Raises an error if the table already exists.

    .. dropdown:: Behavior in details

    * Table does not exist
        Table is created using options provided by user
        (``createTableOptions``, ``createTableColumnTypes``, etc).
    
    * Table exists
        An error is raised, and no data is written to the table.
    

.. versionchanged:: 0.9.0 Renamed modeif_exists

isolation_level = Field(default='READ_UNCOMMITTED', alias='isolationLevel') class-attribute instance-attribute

The transaction isolation level, which applies to current connection.

Possible values:

  • NONE (as string, not Python's None)
  • READ_COMMITTED
  • READ_UNCOMMITTED
  • REPEATABLE_READ
  • SERIALIZABLE

Values correspond to transaction isolation levels defined by JDBC standard. Please refer the documentation for java.sql.Connection <https://docs.oracle.com/javase/8/docs/api/java/sql/Connection.html>_.

query_timeout = Field(default=None, alias='queryTimeout') class-attribute instance-attribute

The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.

This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.

Config

__init__(**kwargs)

_check_options_allowed(values)

_forward_refs() classmethod

_get_matching_options(values, matches) classmethod

_mode_is_deprecated(values)

_strip_prefix(key, prefix) staticmethod

_strip_prefixes(values)

_warn_unknown_options(values)

parse(options) classmethod

If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.

Otherwise, an exception will be raised