Skip to content

Incremental Strategy

Bases: HWMStrategy

Incremental strategy for :ref:db-reader/:ref:file-downloader.

Used for fetching only new rows/files from a source by filtering items not covered by the previous :ref:HWM value.

For :ref:db-reader: First incremental run is just the same as :obj:SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>:

.. code:: sql

    SELECT id, data FROM mydata;

Then the max value of ``id`` column (e.g. ``1000``) will be saved as ``HWM`` to :ref:`HWM Store <hwm>`.

Next incremental run will read only new data from the source:

.. code:: sql

    SELECT id, data FROM mydata WHERE id > 1000; -- hwm value

Pay attention to resulting dataframe **does not include** row with ``id=1000`` because it has been read before.

.. warning::

    If code inside the context manager raised an exception, like:

    .. code:: python

        with IncrementalStrategy():
            df = reader.run()  # something went wrong here
            writer.run(df)  # or here
            # or here...

    When DBReader will **NOT** update HWM in HWM Store.
    This allows to resume reading process from the *last successful run*.

For :ref:file-downloader: Behavior depends on hwm type.

.. tabs::

    .. tab:: FileListHWM

        First incremental run is just the same as
        :obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>` - all files are downloaded:

        .. code:: bash

            $ hdfs dfs -ls /path

            /path/my/file1
            /path/my/file2

        .. code:: python

            DownloadResult(
                ...,
                successful={
                    LocalFile("/downloaded/file1"),
                    LocalFile("/downloaded/file2"),
                },
            )

        Then the list of original file paths is saved as ``FileListHWM`` object into :ref:`HWM Store <hwm>`:

        .. code:: python

            FileListHWM(
                ...,
                entity="/path",
                value=[
                    "/path/my/file1",
                    "/path/my/file2",
                ],
            )

        Next incremental run will download only new files which were added to the source since previous run:

        .. code:: bash

            $ hdfs dfs -ls /path

            /path/my/file1
            /path/my/file2
            /path/my/file3

        .. code:: python

            # only files which are not covered by FileListHWM
            DownloadResult(
                ...,
                successful={
                    LocalFile("/downloaded/file3"),
                },
            )

        Value of ``FileListHWM`` will be updated and saved to :ref:`HWM Store <hwm>`:

        .. code:: python

            FileListHWM(
                ...,
                directory="/path",
                value=[
                    "/path/my/file1",
                    "/path/my/file2",
                    "/path/my/file3",
                ],
            )

    .. tab:: FileModifiedTimeHWM

        First incremental run is just the same as
        :obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>` - all files are downloaded:

        .. code:: bash

            $ hdfs dfs -ls /path

            /path/my/file1
            /path/my/file2

        .. code:: python

            DownloadResult(
                ...,
                successful={
                    LocalFile("/downloaded/file1"),
                    LocalFile("/downloaded/file2"),
                },
            )

        Then the maximum modified time of original files is saved as
        ``FileModifiedTimeHWM`` object into :ref:`HWM Store <hwm>`:

        .. code:: python

            FileModifiedTimeHWM(
                ...,
                directory="/path",
                value=datetime.datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc),
            )

        Next incremental run will download only files from the source
        which were modified or created since previous run:

        .. code:: bash

            $ hdfs dfs -ls /path

            /path/my/file1
            /path/my/file2
            /path/my/file3

        .. code:: python

            # only files which are not covered by FileModifiedTimeHWM
            DownloadResult(
                ...,
                successful={
                    LocalFile("/downloaded/file3"),
                },
            )

        Value of ``FileModifiedTimeHWM`` will be updated and and saved to :ref:`HWM Store <hwm>`:

        .. code:: python

            FileModifiedTimeHWM(
                ...,
                directory="/path",
                value=datetime.datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone.utc),
            )

.. warning::

    FileDownloader updates HWM in HWM Store at the end of ``.run()`` call,
    **NOT** while exiting strategy context. This is because:

    * FileDownloader does not raise exceptions if some file cannot be downloaded.
    * FileDownloader creates files on local filesystem, and file content may differ for different
      :obj:`modes <onetl.file.file_downloader.file_downloader.FileDownloader.Options.mode>`.
    * It can remove files from the source
      if :obj:`delete_source <onetl.file.file_downloader.file_downloader.FileDownloader.Options.delete_source>`
      is set to ``True``.

.. versionadded:: 0.1.0

Parameters:

  • offset (Any, default: `None` ) –

    If passed, the offset value will be used to read rows which appeared in the source after the previous read.

    For example, previous incremental run returned rows:

    .. code::

    898
    899
    900
    1000
    

    Current HWM value is 1000.

    But since then few more rows appeared in the source:

    .. code::

    898
    899
    900
    901 # new
    902 # new
    ...
    999 # new
    1000
    

    and you need to read them too.

    So you can set offset=100, so a next incremental run will generate SQL query like:

    .. code:: sql

    SELECT id, data FROM public.mydata WHERE id > 900;
    -- 900 = 1000 - 100 = hwm - offset
    

    and return rows since 901 (not 900), including 1000 which was already captured by HWM.

    .. warning::

    This can lead to reading duplicated values from the table.
    You probably need additional deduplication step to handle them
    

    .. warning::

    Cannot be used with :ref:`file-downloader`
    

    .. note::

    ``offset`` value will be subtracted from the HWM, so it should have a proper type.
    
    For example, for ``TIMESTAMP`` column ``offset`` type should be :obj:`datetime.timedelta`, not :obj:`int`
    

Examples:

.. tabs::

.. tab:: Incremental run with :ref:`db-reader`

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from onetl.strategy import IncrementalStrategy

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["id", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with IncrementalStrategy():
            df = reader.run()
            writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- DBReader will generate query like:

        SELECT id, data
        FROM public.mydata
        WHERE id > 1000; --- from HWM (EXCLUDING first row)

.. tab:: Incremental run with :ref:`db-reader` and ``IncrementalStrategy(offset=...)``

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from onetl.strategy import IncrementalStrategy

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["id", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with IncrementalStrategy(offset=100):
            df = reader.run()
            writer.run(df)

    .. code:: sql

        -- previous HWM value was 1000
        -- DBReader will generate query like:

        SELECT id, data
        FROM public.mydata
        WHERE id > 900; -- from HWM-offset (EXCLUDING first row)

    ``offset`` and ``hwm.expression`` can be a date or datetime, not only integer:

    .. code:: python

        from onetl.db import DBReader, DBWriter
        from datetime import timedelta

        reader = DBReader(
            connection=postgres,
            source="public.mydata",
            columns=["business_dt", "data"],
            hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
        )

        writer = DBWriter(connection=hive, target="db.newtable")

        with IncrementalStrategy(offset=timedelta(days=1)):
            df = reader.run()
            writer.run(df)

    .. code:: sql

        -- previous HWM value was '2021-01-10'
        -- DBReader will generate query like:

        SELECT business_dt, data
        FROM public.mydata
        WHERE business_dt > CAST('2021-01-09' AS DATE); -- from HWM-offset (EXCLUDING first row)

.. code-tab:: py Incremental run with :ref:`db-reader` and :ref:`kafka`

    from onetl.db import DBReader, DBWriter
    from onetl.strategy import IncrementalStrategy

    reader = DBReader(
        connection=kafka,
        source="topic_name",
        hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="offset"),
    )

    writer = DBWriter(connection=hive, target="db.newtable")

    with IncrementalStrategy():
        df = reader.run()

    # current run will fetch only messages which were added since previous run

.. code-tab:: py Incremental run with :ref:`file-downloader` and ``hwm=FileListHWM(...)``

    from onetl.file import FileDownloader
    from onetl.strategy import SnapshotStrategy
    from etl_entities.hwm import FileListHWM

    downloader = FileDownloader(
        connection=sftp,
        source_path="/remote",
        local_path="/local",
        hwm=FileListHWM(  # mandatory for IncrementalStrategy
            name="my_unique_hwm_name",
        ),
    )

    with IncrementalStrategy():
        df = downloader.run()

    # current run will download only files which were added since previous run

.. code-tab:: py Incremental run with :ref:`file-downloader` and ``hwm=FileModifiedTimeHWM(...)``

    from onetl.file import FileDownloader
    from onetl.strategy import SnapshotStrategy
    from etl_entities.hwm import FileModifiedTimeHWM

    downloader = FileDownloader(
        connection=sftp,
        source_path="/remote",
        local_path="/local",
        hwm=FileModifiedTimeHWM(  # mandatory for IncrementalStrategy
            name="my_unique_hwm_name",
        ),
    )

    with IncrementalStrategy():
        df = downloader.run()

    # current run will download only files which were modified/created since previous run