Skip to content

Incremental Batch Strategy

Bases: BatchHWMStrategy

Incremental batch strategy for :ref:db-reader.

.. note::

Cannot be used with :ref:`file-downloader`

Same as :obj:IncrementalStrategy <onetl.strategy.incremental_strategy.IncrementalStrategy>, but reads data from the source in sequential batches (1..N) like:

.. code:: sql

1: SELECT id, data
   FROM public.mydata
   WHERE id > 1000 AND id <= 1100; -- previous HWM value is 1000, step is 100

2: WHERE id > 1100 AND id <= 1200; -- + step
3: WHERE id > 1200 AND id <= 1300; -- + step
N: WHERE id > 1300 AND id <= 1400; -- until stop

This allows to use less CPU and RAM than reading all the data in the one batch, but takes proportionally more time.

.. warning::

Unlike :obj:`SnapshotBatchStrategy <onetl.strategy.snapshot_strategy.SnapshotBatchStrategy>`,
it **saves** current HWM value after **each batch** into :ref:`HWM Store <hwm>`.

So if code inside the context manager raised an exception, like:

.. code:: python

    with IncrementalBatchStrategy() as batches:
        for _ in batches:
            df = reader.run()  # something went wrong here
            writer.run(df)  # or here
            # or here...

DBReader will **NOT** update HWM in HWM Store for the failed batch.

All of that allows to resume reading process from the *last successful batch*.

.. warning::

Not every :ref:`DB connection <db-connections>`
supports batch strategy. For example, Kafka connection doesn't support it.
Make sure the connection you use is compatible with the IncrementalBatchStrategy.

.. versionadded:: 0.1.0

Parameters:

  • step (Any) –

    Step size used for generating batch SQL queries like:

    .. code:: sql

    SELECT id, data
    FROM public.mydata
    WHERE id > 1000 AND id <= 1100; -- 1000 is previous HWM value, step is 100
    

    .. note::

    Step defines a range of values will be fetched by each batch. This is **not**
    a number of rows, it depends on a table content and value distribution across the rows.
    

    .. note::

    ``step`` value will be added to the HWM, so it should have a proper type.
    
    For example, for ``TIMESTAMP`` column ``step`` type should be :obj:`datetime.timedelta`, not :obj:`int`
    
  • stop (Any, default: `None` ) –

    If passed, the value will be used for generating WHERE clauses with hwm.expression filter, as a stop value for the last batch.

    If not set, the value is determined by a separated query:

    .. code:: sql

    SELECT MAX(id) as stop
    FROM public.mydata
    WHERE id > 1000; -- 1000 is previous HWM value (if any)
    

    .. note::

    ``stop`` should be the same type as ``hwm.expression`` value,
    e.g. :obj:`datetime.datetime` for ``TIMESTAMP`` column, :obj:`datetime.date` for ``DATE``, and so on
    
  • offset (Any, default: `None` ) –

    If passed, the offset value will be used to read rows which appeared in the source after the previous read.

    For example, previous incremental run returned rows:

    .. code::

    898
    899
    900
    1000
    

    Current HWM value is 1000.

    But since then few more rows appeared in the source:

    .. code::

    898
    899
    900
    901 # new
    902 # new
    ...
    999 # new
    1000
    

    and you need to read them too.

    So you can set offset=100, so the first batch of a next incremental run will look like:

    .. code:: sql

    SELECT id, data
    FROM public.mydata
    WHERE id > 900 AND id <= 1000; -- 900 = 1000 - 100 = HWM - offset
    

    and return rows from 901 (not 900) to 1000 (duplicate).

    .. warning::

    This can lead to reading duplicated values from the table.
    You probably need additional deduplication step to handle them
    

    .. note::

    ``offset`` value will be subtracted from the HWM, so it should have a proper type.
    
    For example, for ``TIMESTAMP`` column ``offset`` type should be :obj:`datetime.timedelta`, not :obj:`int`
    

Examples:

.. tabs::

.. tab:: IncrementalBatch run

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from onetl.strategy import IncrementalBatchStrategy

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["id", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with IncrementalBatchStrategy(step=100) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id > 1100 AND id <= 1200; --- from HWM to HWM+step (EXCLUDING first row)

        2:  WHERE id > 1200 AND id <= 1300; -- + step
        N:  WHERE id > 1300 AND id <= 1400; -- until max value of HWM column

.. tab:: IncrementalBatch run with ``stop`` value

    .. code:: python

        ...

        with IncrementalBatchStrategy(step=100, stop=2000) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id > 1000 AND id <= 1100; --- from HWM to HWM+step (EXCLUDING first row)

        2:  WHERE id > 1100 AND id <= 1200; -- + step
        ...
        N:  WHERE id > 1900 AND id <= 2000; -- until stop

.. tab:: IncrementalBatch run with ``offset`` value

    .. code:: python

        ...

        with IncrementalBatchStrategy(step=100, offset=100) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id >  900 AND id <= 1000; --- from HWM-offset to HWM-offset+step (EXCLUDING first row)

        2:  WHERE id > 1000 AND id <= 1100; -- + step
        3:  WHERE id > 1100 AND id <= 1200; -- + step
        ...
        N:  WHERE id > 1300 AND id <= 1400; -- until max value of HWM column

.. tab:: IncrementalBatch run with all possible options

    .. code:: python

        ...

        with IncrementalBatchStrategy(
            step=100,
            stop=2000,
            offset=100,
        ) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- each batch (1..N) will perform a query which return some part of input data

        1:  SELECT id, data
            FROM public.mydata
            WHERE id > 900 AND id <= 1000; --- from HWM-offset to HWM-offset+step (EXCLUDING first row)

        2:  WHERE id > 1000 AND id <= 1100; -- + step
        3:  WHERE id > 1100 AND id <= 1200; -- + step
        ...
        N:  WHERE id > 1900 AND id <= 2000; -- until stop

.. tab:: IncrementalBatch run over non-integer column

    ``hwm.expression``, ``offset`` and ``stop`` can be a date or datetime, not only integer:

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from datetime import date, timedelta

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["business_dt", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with IncrementalBatchStrategy(
            step=timedelta(days=5),
            stop=date("2021-01-31"),
            offset=timedelta(days=1),
        ) as batches:
            for _ in batches:
                df = reader.run()
                writer.run(df)

    .. code:: sql

        -- previous HWM value was '2021-01-10'
        -- each batch (1..N) will perform a query which return some part of input data

        1:  SELECT business_dt, data
            FROM public.mydata
            WHERE business_dt  > CAST('2021-01-09' AS DATE)  -- from HWM-offset (EXCLUDING first row)
            AND   business_dt <= CAST('2021-01-14' AS DATE); -- to HWM-offset+step

        2:  WHERE business_dt  > CAST('2021-01-14' AS DATE) -- + step
            AND   business_dt <= CAST('2021-01-19' AS DATE);

        3:  WHERE business_dt  > CAST('2021-01-19' AS DATE) -- + step
            AND   business_dt <= CAST('2021-01-24' AS DATE);

        ...

        N:  WHERE business_dt  > CAST('2021-01-29' AS DATE)
            AND   business_dt <= CAST('2021-01-31' AS DATE); -- until stop