Reading from Oracle using DBReader¶
DBReader supports strategy for incremental data reading, but does not support custom queries, like JOIN.
Warning
Please take into account Oracle types
Supported DBReader features¶
- ✅︎
columns - ✅︎
where - ✅︎
hwm, supported strategies: - ✅︎
hint(see official documentation) - ❌
df_schema - ✅︎
options(see Oracle.ReadOptions)
Examples¶
Snapshot strategy:
from onetl.connection import Oracle
from onetl.db import DBReader
oracle = Oracle(...)
reader = DBReader(
connection=oracle,
source="schema.table",
columns=["id", "key", "CAST(value AS VARCHAR2(4000)) value", "updated_dt"],
where="key = 'something'",
hint="INDEX(schema.table key_index)",
options=Oracle.ReadOptions(partitionColumn="id", numPartitions=10),
)
df = reader.run()
Incremental strategy:
from onetl.connection import Oracle
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
oracle = Oracle(...)
reader = DBReader(
connection=oracle,
source="schema.table",
columns=["id", "key", "CAST(value AS VARCHAR2(4000)) value", "updated_dt"],
where="key = 'something'",
hint="INDEX(schema.table key_index)",
hwm=DBReader.AutoDetectHWM(name="oracle_hwm", expression="updated_dt"),
options=Oracle.ReadOptions(partitionColumn="id", numPartitions=10),
)
with IncrementalStrategy():
df = reader.run()
Recommendations¶
Select only required columns¶
Instead of passing "*" in DBReader(columns=[...]) prefer passing exact column names. This reduces the amount of data passed from Oracle to Spark.
Pay attention to where value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where="column = 'value'") clause.
This both reduces the amount of data send from Oracle to Spark, and may also improve performance of the query.
Especially if there are indexes or partitions for columns used in where clause.
Options¶
OracleReadOptions
¶
Bases: JDBCReadOptions
Spark JDBC reading options.
.. versionadded:: 0.5.0
Replace Oracle.Options → Oracle.ReadOptions
Examples:
.. note ::
You can pass any value
`supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on Spark version.
.. code:: python
from onetl.connection import Oracle
options = Oracle.ReadOptions(
partitioning_mode="range",
partitionColumn="reg_id",
numPartitions=10,
customSparkOption="value",
)
fetchsize = 100000
class-attribute
instance-attribute
¶
Fetch N rows from an opened cursor per one read round.
Tuning this option can influence performance of reading.
.. warning::
Default value is different from Spark.
Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default ``fetchsize=10``, which is absolutely not usable.
Thus we've overridden default value with ``100_000``, which should increase reading performance.
.. versionchanged:: 0.2.0
Set explicit default value to 100_000
lower_bound = Field(default=None, alias='lowerBound')
class-attribute
instance-attribute
¶
See documentation for :obj:~partitioning_mode for more details
num_partitions = Field(default=1, alias='numPartitions')
class-attribute
instance-attribute
¶
Number of jobs created by Spark to read the table content in parallel.
See documentation for :obj:~partitioning_mode for more details
partition_column = Field(default=None, alias='partitionColumn')
class-attribute
instance-attribute
¶
Column used to parallelize reading from a table.
.. warning:: It is highly recommended to use primary key, or column with an index to avoid performance issues.
.. note::
Column type depends on :obj:~partitioning_mode.
* ``partitioning_mode="range"`` requires column to be an integer,
date or timestamp (can be NULL, but not recommended).
* ``partitioning_mode="hash"`` accepts any column type (NOT NULL).
* ``partitioning_mode="mod"`` requires column to be an integer (NOT NULL).
See documentation for :obj:~partitioning_mode for more details
partitioning_mode = JDBCPartitioningMode.RANGE
class-attribute
instance-attribute
¶
Defines how Spark will parallelize reading from table.
Possible values:
-
range(default) Allocate each executor a range of values from column passed into :obj:~partition_column... dropdown:: Spark generates for each executor an SQL query
Executor 1: .. code:: sql SELECT ... FROM table WHERE (partition_column >= lowerBound OR partition_column IS NULL) AND partition_column < (lowerBound + stride) Executor 2: .. code:: sql SELECT ... FROM table WHERE partition_column >= (lowerBound + stride) AND partition_column < (lowerBound + 2 * stride) ... Executor N: .. code:: sql SELECT ... FROM table WHERE partition_column >= (lowerBound + (N-1) * stride) AND partition_column <= upperBound Where ``stride=(upperBound - lowerBound) / numPartitions``.Column type must be integer, date or timestamp.
.. note::
:obj:`~lower_bound`, :obj:`~upper_bound` and :obj:`~num_partitions` are used just to calculate the partition stride, **NOT** for filtering the rows in table. So all rows in the table will be returned (unlike *Incremental* :ref:`strategy`)... note::
All queries are executed in parallel. To execute them sequentially, use *Batch* :ref:`strategy`. -
hashAllocate each executor a set of values based on hash of the :obj:~partition_columncolumn... dropdown:: Spark generates for each executor an SQL query
Executor 1: .. code:: sql SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = 0 -- lower_bound Executor 2: .. code:: sql SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = 1 -- lower_bound + 1 ... Executor N: .. code:: sql SELECT ... FROM table WHERE (some_hash(partition_column) mod num_partitions) = num_partitions-1 -- upper_bound.. note::
The hash function implementation depends on RDBMS. It can be ``MD5`` or any other fast hash function, or expression based on this function call. Usually such functions accepts any column type as an input. -
modAllocate each executor a set of values based on modulus of the :obj:~partition_columncolumn... dropdown:: Spark generates for each executor an SQL query
Executor 1: .. code:: sql SELECT ... FROM table WHERE (partition_column mod num_partitions) = 0 -- lower_bound Executor 2: .. code:: sql SELECT ... FROM table WHERE (partition_column mod num_partitions) = 1 -- lower_bound + 1 Executor N: .. code:: sql SELECT ... FROM table WHERE (partition_column mod num_partitions) = num_partitions-1 -- upper_bound.. note::
Can be used only with columns of integer type.
.. versionadded:: 0.5.0
Examples:
Read data in 10 parallel jobs by range of values in id_column column:
.. code:: python
ReadOptions(
partitioning_mode="range", # default mode, can be omitted
partitionColumn="id_column",
numPartitions=10,
# Options below can be discarded because they are
# calculated automatically as MIN and MAX values of `partitionColumn`
lowerBound=0,
upperBound=100_000,
)
Read data in 10 parallel jobs by hash of values in some_column column:
.. code:: python
ReadOptions(
partitioning_mode="hash",
partitionColumn="some_column",
numPartitions=10,
# lowerBound and upperBound are automatically set to `0` and `9`
)
Read data in 10 parallel jobs by modulus of values in id_column column:
.. code:: python
ReadOptions(
partitioning_mode="mod",
partitionColumn="id_column",
numPartitions=10,
# lowerBound and upperBound are automatically set to `0` and `9`
)
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
¶
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
session_init_statement = Field(default=None, alias='sessionInitStatement')
class-attribute
instance-attribute
¶
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).
Use this to implement session initialization code.
Example:
.. code:: python
sessionInitStatement = """
BEGIN
execute immediate
'alter session set "_serial_direct_read"=true';
END;
"""
upper_bound = Field(default=None, alias='upperBound')
class-attribute
instance-attribute
¶
See documentation for :obj:~partitioning_mode for more details
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised