DB Reader¶
Bases: FrozenModel
Allows you to read data from a table with specified database connection and parameters, and return its content as Spark dataframe. |support_hooks|
.. note::
DBReader can return different results depending on :ref:`strategy`
.. note::
This class operates with only one source at a time. It does NOT support executing queries
to multiple source, like ``SELECT ... JOIN``.
.. versionadded:: 0.1.0
.. versionchanged:: 0.8.0
Moved onetl.core.DBReader → onetl.db.DBReader
Parameters:
-
connection(:obj:onetl.connection.BaseDBConnection) –Class which contains DB connection properties. See :ref:
db-connectionssection -
source(str) –Table/collection/etc name to read data from.
If connection has schema support, you need to specify the full name of the source including the schema, e.g.
schema.name... versionchanged:: 0.7.0 Renamed
table→source -
columns(list[str], default:None) –The list of columns to be read.
If RDBMS supports any kind of expressions, you can pass them too.
.. code:: python
columns = [ "mycolumn", "another_column as alias", "count(*) over ()", "some(function) as alias2", ].. note::
Some sources does not have columns... note::
It is recommended to pass column names explicitly to avoid selecting too many columns, and to avoid adding unexpected columns to dataframe if source DDL is changed... deprecated:: 0.10.0
Syntax ``DBReader(columns="col1, col2")`` (string instead of list) is not supported, and will be removed in v1.0.0 -
where(Any, default:`None`) –Custom
wherefor SQL query or MongoDB pipeline.wheresyntax depends on the source. For example, SQL sources acceptwhereas a string, but MongoDB sources acceptwhereas a dictionary... code:: python
# SQL database connection where = "column_1 > 2" # MongoDB connection where = { "col_1": {"$gt": 1, "$lt": 100}, "col_2": {"$gt": 2}, "col_3": {"$eq": "hello"}, }.. note::
Some sources does not support data filtering. -
hwm(type[HWM] | None, default:`None`) –HWM class to be used as :etl-entities:
HWM <hwm/index.html>value... code:: python
hwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="hwm_column", )HWM value will be fetched using
hwm_columnSQL query.If you want to use some SQL expression as HWM value, you can use it as well:
.. code:: python
hwm = DBReader.AutoDetectHWM( name="some_unique_hwm_name", expression="cast(hwm_column_orig as date)", ).. note::
Some sources does not support passing expressions and can be used only with column/field names which present in the source... versionchanged:: 0.10.0 Replaces deprecated
hwm_columnandhwm_expressionattributes -
hint(Any, default:`None`) –Hint expression used for querying the data.
hintsyntax depends on the source. For example, SQL sources accepthintas a string, but MongoDB sources accepthintas a dictionary... code:: python
# SQL database connection hint = "index(myschema.mytable mycolumn)" # MongoDB connection hint = { "mycolumn": 1, }.. note::
Some sources does not support hints. -
df_schema(StructType, default:`None`) –Spark DataFrame schema, used for proper type casting of the rows.
.. code:: python
from pyspark.sql.types import ( DoubleType, IntegerType, StringType, StructField, StructType, TimestampType, ) df_schema = StructType( [ StructField("_id", IntegerType()), StructField("text_string", StringType()), StructField("hwm_int", IntegerType()), StructField("hwm_datetime", TimestampType()), StructField("float_value", DoubleType()), ], ) reader = DBReader( connection=connection, source="fiddle.dummy", df_schema=df_schema, ).. note::
Some sources does not support passing dataframe schema. -
options(dict, :obj:onetl.connection.BaseDBConnection.ReadOptions, default:`None`) –Spark read options, like partitioning mode.
.. code:: python
Postgres.ReadOptions( partitioningMode="hash", partitionColumn="some_column", numPartitions=20, fetchsize=1000, ).. note::
Some sources does not support reading options.
Examples:
.. tabs::
.. code-tab:: py Minimal example
from onetl.db import DBReader
from onetl.connection import Postgres
postgres = Postgres(...)
# create reader
reader = DBReader(connection=postgres, source="fiddle.dummy")
# read data from table "fiddle.dummy"
df = reader.run()
.. code-tab:: py With custom reading options
from onetl.connection import Postgres
from onetl.db import DBReader
postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")
# create reader and pass some options to the underlying connection object
reader = DBReader(connection=postgres, source="fiddle.dummy", options=options)
# read data from table "fiddle.dummy"
df = reader.run()
.. code-tab:: py Full example
from onetl.db import DBReader
from onetl.connection import Postgres
postgres = Postgres(...)
options = Postgres.ReadOptions(sessionInitStatement="select 300", fetchsize="100")
# create reader with specific columns, rows filter
reader = DBReader(
connection=postgres,
source="default.test",
where="d_id > 100",
hint="NOWAIT",
columns=["d_id", "d_name", "d_age"],
options=options,
)
# read data from table "fiddle.dummy"
df = reader.run()
.. tab:: Incremental reading
See :ref:`strategy` for more examples
.. code:: python
from onetl.strategy import IncrementalStrategy
...
reader = DBReader(
connection=postgres,
source="fiddle.dummy",
hwm=DBReader.AutoDetectHWM( # mandatory for IncrementalStrategy
name="some_unique_hwm_name",
expression="d_age",
),
)
# read data from table "fiddle.dummy"
# but only with new rows (`WHERE d_age > previous_hwm_value`)
with IncrementalStrategy():
df = reader.run()
run()
¶
Reads data from source table and saves as Spark dataframe. |support_hooks|
.. note::
This method can return different results depending on :ref:`strategy`
.. warning::
If :etl-entities:`hwm <index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.
.. versionadded:: 0.1.0
Returns:
-
df(DataFrame) –Spark dataframe
Examples:
Read data to Spark dataframe:
.. code:: python
df = reader.run()
has_data()
¶
Returns True if there is some data in the source, False otherwise. |support_hooks|
.. note::
This method can return different results depending on :ref:`strategy`
.. warning::
If :etl-entities:`hwm <hwm/index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.
.. versionadded:: 0.10.0
Raises:
-
RuntimeError–Current strategy is not compatible with HWM parameter.
Examples:
.. code:: python
reader = DBReader(...)
# handle situation when there is no data in the source
if reader.has_data():
df = reader.run()
else:
# implement your handling logic here
...
raise_if_no_data()
¶
Raises exception NoDataError if source does not contain any data. |support_hooks|
.. note::
This method can return different results depending on :ref:`strategy`
.. warning::
If :etl-entities:`hwm <hwm/index.html>` is used,
then method should be called inside :ref:`strategy` context.
And vise-versa, if HWM is not used, this method should not be called within strategy.
.. versionadded:: 0.10.0
Raises:
-
RuntimeError–Current strategy is not compatible with HWM parameter.
-
:obj:`onetl.exception.NoDataError`–There is no data in source.
Examples:
.. code:: python
reader = DBReader(...)
# ensure that there is some data in the source before reading it using Spark
reader.raise_if_no_data()