Snapshot Batch Strategy¶
Bases: BatchHWMStrategy
Snapshot batch strategy for :ref:db-reader.
.. note::
Cannot be used with :ref:`file-downloader`
Same as :obj:SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>,
but reads data from the source in sequential batches (1..N) like:
.. code:: sql
1: SELECT id, data
FROM public.mydata
WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)
2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1200; -- + step
N: WHERE id > 1300 AND id <= 1400; -- until stop
This allows to use less CPU and RAM on Spark cluster than reading all the data in parallel, but takes proportionally more time.
.. note::
This strategy uses HWM column value to filter data for each batch,
but does **NOT** save it into :ref:`HWM Store <hwm>`.
So every run starts from the beginning, not from the previous HWM value.
.. note::
If you only need to reduce number of rows read by Spark from opened cursor,
use :obj:`onetl.connection.db_connection.postgres.Postgres.ReadOptions.fetchsize` instead
.. warning::
Not every :ref:`DB connection <db-connections>`
supports batch strategy. For example, Kafka connection doesn't support it.
Make sure the connection you use is compatible with the SnapshotBatchStrategy.
.. versionadded:: 0.1.0
Parameters:
-
step(Any) –Step size used for generating batch SQL queries like:
.. code:: sql
SELECT id, data FROM public.mydata WHERE id >= 1000 AND id <= 1100; -- 1000 is start value, step is 100.. note::
Step defines a range of values will be fetched by each batch. This is **not** a number of rows, it depends on a table content and value distribution across the rows... note::
``step`` value will be added to the HWM, so it should have a proper type. For example, for ``TIMESTAMP`` column ``step`` type should be :obj:`datetime.timedelta`, not :obj:`int` -
start(Any, default:`None`) –If passed, the value will be used for generating WHERE clauses with
hwm.expressionfilter, as a start value for the first batch.If not set, the value is determined by a separated query:
.. code:: sql
SELECT MIN(id) as start FROM public.mydata WHERE id <= 1400; -- 1400 here is stop value (if set).. note::
``start`` should be the same type as ``hwm.expression`` value, e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on -
stop(Any, default:`None`) –If passed, the value will be used for generating WHERE clauses with
hwm.expressionfilter, as a stop value for the last batch.If not set, the value is determined by a separated query:
.. code:: sql
SELECT MAX(id) as stop FROM public.mydata WHERE id >= 1000; -- 1000 here is start value (if set).. note::
``stop`` should be the same type as ``hwm.expression`` value, e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on
Examples:
.. tabs::
.. tab:: SnapshotBatch run
.. code:: python
from onetl.db import DBReader, DBWriter
from onetl.strategy import SnapshotBatchStrategy
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["id", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with SnapshotBatchStrategy(step=100) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
.. code:: sql
-- get start and stop values
SELECT MIN(id) as start, MAX(id) as stop
FROM public.mydata;
-- for example, start=1000 and stop=2345
-- when each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)
2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1300; -- + step
N: WHERE id > 2300 AND id <= 2345; -- until stop
.. tab:: SnapshotBatch run with ``stop`` value
.. code:: python
...
with SnapshotBatchStrategy(step=100, stop=1234) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
.. code:: sql
-- stop value is set, so there is no need to fetch it from DB
-- get start value
SELECT MIN(id) as start
FROM public.mydata
WHERE id <= 1234; -- until stop
-- for example, start=1000.
-- when each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)
2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1300; -- + step
N: WHERE id > 1300 AND id <= 1234; -- until stop
.. tab:: SnapshotBatch run with ``start`` value
.. code:: python
...
with SnapshotBatchStrategy(step=100, start=500) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
.. code:: sql
-- start value is set, so there is no need to fetch it from DB
-- get only stop value
SELECT MAX(id) as stop
FROM public.mydata
WHERE id >= 500; -- from start
-- for example, stop=2345.
-- when each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id >= 500 AND id <= 600; -- from start to start+step (INCLUDING first row)
2: WHERE id > 600 AND id <= 700; -- + step
3: WHERE id > 700 AND id <= 800; -- + step
...
N: WHERE id > 2300 AND id <= 2345; -- until stop
.. tab:: SnapshotBatch run with all options
.. code:: python
...
with SnapshotBatchStrategy(
start=1000,
step=100,
stop=2000,
) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
.. code:: sql
-- start and stop values are set, so no need to fetch boundaries from DB
-- each batch (1..N) will perform a query which return some part of input data
1: SELECT id, data
FROM public.mydata
WHERE id >= 1000 AND id <= 1100; -- from start to start+step (INCLUDING first row)
2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1300; -- + step
...
N: WHERE id > 1900 AND id <= 2000; -- until stop
.. tab:: SnapshotBatch run over non-integer column
``hwm.expression``, ``start`` and ``stop`` can be a date or datetime, not only integer:
.. code:: python
from datetime import date, timedelta
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["business_dt", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
)
with SnapshotBatchStrategy(
start=date("2021-01-01"),
step=timedelta(days=5),
stop=date("2021-01-31"),
) as batches:
for _ in batches:
df = reader.run()
writer.run(df)
.. code:: sql
-- start and stop values are set, so no need to fetch boundaries from DB
-- each batch will perform a query which return some part of input data
-- HWM value will casted to match column type
1: SELECT business_dt, data
FROM public.mydata
WHERE business_dt >= CAST('2020-01-01' AS DATE) -- from start to start+step (INCLUDING first row)
AND business_dt <= CAST('2021-01-05' AS DATE);
2: WHERE business_dt > CAST('2021-01-05' AS DATE) -- + step
AND business_dt <= CAST('2021-01-10' AS DATE);
3: WHERE business_dt > CAST('2021-01-10' AS DATE) -- + step
AND business_dt <= CAST('2021-01-15' AS DATE);
...
N: WHERE business_dt > CAST('2021-01-30' AS DATE)
AND business_dt <= CAST('2021-01-31' AS DATE); -- until stop