Skip to content

DB Reader

Bases: FrozenModel

Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe. |support_hooks|

.. note::

DBReader can return different results depending on :ref:`strategy`

.. note::

This class operates with only one source at a time. It does NOT support executing queries
to multiple source, like ``SELECT ... JOIN``.

.. versionadded:: 0.1.0

.. versionchanged:: 0.8.0 Moved onetl.core.DBReaderonetl.db.DBReader

Parameters:

  • connection (:obj:onetl.connection.BaseDBConnection) –

    Class which contains DB connection properties. See :ref:db-connections section

  • source (str) –

    Table/collection/etc name to read data from.

    If connection has schema support, you need to specify the full name of the source including the schema, e.g. schema.name.

    .. versionchanged:: 0.7.0 Renamed tablesource

  • columns (list[str], default: None ) –

    The list of columns to be read.

    If RDBMS supports any kind of expressions, you can pass them too.

    .. code:: python

    columns = [
        "mycolumn",
        "another_column as alias",
        "count(*) over ()",
        "some(function) as alias2",
    ]
    

    .. note::

    Some sources does not have columns.
    

    .. note::

    It is recommended to pass column names explicitly to avoid selecting too many columns,
    and to avoid adding unexpected columns to dataframe if source DDL is changed.
    

    .. deprecated:: 0.10.0

    Syntax ``DBReader(columns="col1, col2")`` (string instead of list) is not supported,
    and will be removed in v1.0.0
    
  • where (Any, default: `None` ) –

    Custom where for SQL query or MongoDB pipeline.

    where syntax depends on the source. For example, SQL sources accept where as a string, but MongoDB sources accept where as a dictionary.

    .. code:: python

    # SQL database connection
    where = "column_1 > 2"
    
    # MongoDB connection
    where = {
        "col_1": {"$gt": 1, "$lt": 100},
        "col_2": {"$gt": 2},
        "col_3": {"$eq": "hello"},
    }
    

    .. note::

    Some sources does not support data filtering.
    
  • hwm (type[HWM] | None, default: `None` ) –

    HWM class to be used as :etl-entities:HWM <hwm/index.html> value.

    .. code:: python

    hwm = DBReader.AutoDetectHWM(
        name="some_unique_hwm_name",
        expression="hwm_column",
    )
    

    HWM value will be fetched using hwm_column SQL query.

    If you want to use some SQL expression as HWM value, you can use it as well:

    .. code:: python

    hwm = DBReader.AutoDetectHWM(
        name="some_unique_hwm_name",
        expression="cast(hwm_column_orig as date)",
    )
    

    .. note::

    Some sources does not support passing expressions and can be used only with column/field
    names which present in the source.
    

    .. versionchanged:: 0.10.0 Replaces deprecated hwm_column and hwm_expression attributes

  • hint (Any, default: `None` ) –

    Hint expression used for querying the data.

    hint syntax depends on the source. For example, SQL sources accept hint as a string, but MongoDB sources accept hint as a dictionary.

    .. code:: python

    # SQL database connection
    hint = "index(myschema.mytable mycolumn)"
    
    # MongoDB connection
    hint = {
        "mycolumn": 1,
    }
    

    .. note::

    Some sources does not support hints.
    
  • df_schema (StructType, default: `None` ) –

    Spark DataFrame schema, used for proper type casting of the rows.

    .. code:: python

    from pyspark.sql.types import (
        DoubleType,
        IntegerType,
        StringType,
        StructField,
        StructType,
        TimestampType,
    )
    
    df_schema = StructType(
        [
            StructField("_id", IntegerType()),
            StructField("text_string", StringType()),
            StructField("hwm_int", IntegerType()),
            StructField("hwm_datetime", TimestampType()),
            StructField("float_value", DoubleType()),
        ],
    )
    
    reader = DBReader(
        connection=connection,
        source="fiddle.dummy",
        df_schema=df_schema,
    )
    

    .. note::

    Some sources does not support passing dataframe schema.
    
  • options (dict, :obj:onetl.connection.BaseDBConnection.ReadOptions, default: `None` ) –

    Spark read options, like partitioning mode.

    .. code:: python

    Postgres.ReadOptions(
        partitioningMode="hash",
        partitionColumn="some_column",
        numPartitions=20,
        fetchsize=1000,
    )
    

    .. note::

    Some sources does not support reading options.
    

Examples:

.. tabs::

.. code-tab:: py Minimal example

    from onetl.db import DBReader
    from onetl.connection import Postgres

    postgres = Postgres(...)

    # create reader
    reader = DBReader(connection=postgres, source="fiddle.dummy")

    # read data from table "fiddle.dummy"
    df = reader.run()

.. code-tab:: py With custom reading options

    from onetl.connection import Postgres
    from onetl.db import DBReader

    postgres = Postgres(...)
    options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

    # create reader and pass some options to the underlying connection object
    reader = DBReader(connection=postgres, source="fiddle.dummy", options=options)

    # read data from table "fiddle.dummy"
    df = reader.run()

.. code-tab:: py Full example

    from onetl.db import DBReader
    from onetl.connection import Postgres

    postgres = Postgres(...)
    options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")

    # create reader with specific columns, rows filter
    reader = DBReader(
        connection=postgres,
        source="default.test",
        where="d_id > 100",
        hint="NOWAIT",
        columns=["d_id", "d_name", "d_age"],
        options=options,
    )

    # read data from table "fiddle.dummy"
    df = reader.run()

.. tab:: Incremental reading

    See :ref:`strategy` for more examples

    .. code:: python

        from onetl.strategy import IncrementalStrategy

        ...

        reader = DBReader(
            connection=postgres,
            source="fiddle.dummy",
            hwm=DBReader.AutoDetectHWM(  # mandatory for IncrementalStrategy
                name="some_unique_hwm_name",
                expression="d_age",
            ),
        )

        # read data from table "fiddle.dummy"
        # but only with new rows (`WHERE d_age > previous_hwm_value`)
        with IncrementalStrategy():
            df = reader.run()

run()

Reads data from source table and saves as Spark dataframe. |support_hooks|

.. note::

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.

.. versionadded:: 0.1.0

Returns:

  • df ( DataFrame ) –

    Spark dataframe

Examples:

Read data to Spark dataframe:

.. code:: python

df = reader.run()

has_data()

Returns True if there is some data in the source, False otherwise. |support_hooks|

.. note::

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.

.. versionadded:: 0.10.0

Raises:

  • RuntimeError

    Current strategy is not compatible with HWM parameter.

Examples:

.. code:: python

reader = DBReader(...)

# handle situation when there is no data in the source
if reader.has_data():
    df = reader.run()
else:
    # implement your handling logic here
    ...

raise_if_no_data()

Raises exception NoDataError if source does not contain any data. |support_hooks|

.. note::

This method can return different results depending on :ref:`strategy`

.. warning::

If :etl-entities:`hwm <hwm/index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.

.. versionadded:: 0.10.0

Raises:

  • RuntimeError

    Current strategy is not compatible with HWM parameter.

  • :obj:`onetl.exception.NoDataError`

    There is no data in source.

Examples:

.. code:: python

reader = DBReader(...)

# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()