Incremental Strategy¶
Bases: HWMStrategy
Incremental strategy for :ref:db-reader/:ref:file-downloader.
Used for fetching only new rows/files from a source
by filtering items not covered by the previous :ref:HWM value.
For :ref:db-reader:
First incremental run is just the same as
:obj:SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>:
.. code:: sql
SELECT id, data FROM mydata;
Then the max value of ``id`` column (e.g. ``1000``) will be saved as ``HWM`` to :ref:`HWM Store <hwm>`.
Next incremental run will read only new data from the source:
.. code:: sql
SELECT id, data FROM mydata WHERE id > 1000; -- hwm value
Pay attention to resulting dataframe **does not include** row with ``id=1000`` because it has been read before.
.. warning::
If code inside the context manager raised an exception, like:
.. code:: python
with IncrementalStrategy():
df = reader.run() # something went wrong here
writer.run(df) # or here
# or here...
When DBReader will **NOT** update HWM in HWM Store.
This allows to resume reading process from the *last successful run*.
For :ref:file-downloader:
Behavior depends on hwm type.
.. tabs::
.. tab:: FileListHWM
First incremental run is just the same as
:obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>` - all files are downloaded:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
.. code:: python
DownloadResult(
...,
successful={
LocalFile("/downloaded/file1"),
LocalFile("/downloaded/file2"),
},
)
Then the list of original file paths is saved as ``FileListHWM`` object into :ref:`HWM Store <hwm>`:
.. code:: python
FileListHWM(
...,
entity="/path",
value=[
"/path/my/file1",
"/path/my/file2",
],
)
Next incremental run will download only new files which were added to the source since previous run:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
/path/my/file3
.. code:: python
# only files which are not covered by FileListHWM
DownloadResult(
...,
successful={
LocalFile("/downloaded/file3"),
},
)
Value of ``FileListHWM`` will be updated and saved to :ref:`HWM Store <hwm>`:
.. code:: python
FileListHWM(
...,
directory="/path",
value=[
"/path/my/file1",
"/path/my/file2",
"/path/my/file3",
],
)
.. tab:: FileModifiedTimeHWM
First incremental run is just the same as
:obj:`SnapshotStrategy <onetl.strategy.snapshot_strategy.SnapshotStrategy>` - all files are downloaded:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
.. code:: python
DownloadResult(
...,
successful={
LocalFile("/downloaded/file1"),
LocalFile("/downloaded/file2"),
},
)
Then the maximum modified time of original files is saved as
``FileModifiedTimeHWM`` object into :ref:`HWM Store <hwm>`:
.. code:: python
FileModifiedTimeHWM(
...,
directory="/path",
value=datetime.datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc),
)
Next incremental run will download only files from the source
which were modified or created since previous run:
.. code:: bash
$ hdfs dfs -ls /path
/path/my/file1
/path/my/file2
/path/my/file3
.. code:: python
# only files which are not covered by FileModifiedTimeHWM
DownloadResult(
...,
successful={
LocalFile("/downloaded/file3"),
},
)
Value of ``FileModifiedTimeHWM`` will be updated and and saved to :ref:`HWM Store <hwm>`:
.. code:: python
FileModifiedTimeHWM(
...,
directory="/path",
value=datetime.datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone.utc),
)
.. warning::
FileDownloader updates HWM in HWM Store at the end of ``.run()`` call,
**NOT** while exiting strategy context. This is because:
* FileDownloader does not raise exceptions if some file cannot be downloaded.
* FileDownloader creates files on local filesystem, and file content may differ for different
:obj:`modes <onetl.file.file_downloader.file_downloader.FileDownloader.Options.mode>`.
* It can remove files from the source
if :obj:`delete_source <onetl.file.file_downloader.file_downloader.FileDownloader.Options.delete_source>`
is set to ``True``.
.. versionadded:: 0.1.0
Parameters:
-
offset(Any, default:`None`) –If passed, the offset value will be used to read rows which appeared in the source after the previous read.
For example, previous incremental run returned rows:
.. code::
898 899 900 1000Current HWM value is 1000.
But since then few more rows appeared in the source:
.. code::
898 899 900 901 # new 902 # new ... 999 # new 1000and you need to read them too.
So you can set
offset=100, so a next incremental run will generate SQL query like:.. code:: sql
SELECT id, data FROM public.mydata WHERE id > 900; -- 900 = 1000 - 100 = hwm - offsetand return rows since 901 (not 900), including 1000 which was already captured by HWM.
.. warning::
This can lead to reading duplicated values from the table. You probably need additional deduplication step to handle them.. warning::
Cannot be used with :ref:`file-downloader`.. note::
``offset`` value will be subtracted from the HWM, so it should have a proper type. For example, for ``TIMESTAMP`` column ``offset`` type should be :obj:`datetime.timedelta`, not :obj:`int`
Examples:
.. tabs::
.. tab:: Incremental run with :ref:`db-reader`
.. code:: python
from onetl.db import DBReader, DBWriter
from onetl.strategy import IncrementalStrategy
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["id", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalStrategy():
df = reader.run()
writer.run(df)
.. code:: sql
-- previous HWM value was 1000
-- DBReader will generate query like:
SELECT id, data
FROM public.mydata
WHERE id > 1000; --- from HWM (EXCLUDING first row)
.. tab:: Incremental run with :ref:`db-reader` and ``IncrementalStrategy(offset=...)``
.. code:: python
from onetl.db import DBReader, DBWriter
from onetl.strategy import IncrementalStrategy
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["id", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="id"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalStrategy(offset=100):
df = reader.run()
writer.run(df)
.. code:: sql
-- previous HWM value was 1000
-- DBReader will generate query like:
SELECT id, data
FROM public.mydata
WHERE id > 900; -- from HWM-offset (EXCLUDING first row)
``offset`` and ``hwm.expression`` can be a date or datetime, not only integer:
.. code:: python
from onetl.db import DBReader, DBWriter
from datetime import timedelta
reader = DBReader(
connection=postgres,
source="public.mydata",
columns=["business_dt", "data"],
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="business_dt"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalStrategy(offset=timedelta(days=1)):
df = reader.run()
writer.run(df)
.. code:: sql
-- previous HWM value was '2021-01-10'
-- DBReader will generate query like:
SELECT business_dt, data
FROM public.mydata
WHERE business_dt > CAST('2021-01-09' AS DATE); -- from HWM-offset (EXCLUDING first row)
.. code-tab:: py Incremental run with :ref:`db-reader` and :ref:`kafka`
from onetl.db import DBReader, DBWriter
from onetl.strategy import IncrementalStrategy
reader = DBReader(
connection=kafka,
source="topic_name",
hwm=DBReader.AutoDetectHWM(name="some_hwm_name", expression="offset"),
)
writer = DBWriter(connection=hive, target="db.newtable")
with IncrementalStrategy():
df = reader.run()
# current run will fetch only messages which were added since previous run
.. code-tab:: py Incremental run with :ref:`file-downloader` and ``hwm=FileListHWM(...)``
from onetl.file import FileDownloader
from onetl.strategy import SnapshotStrategy
from etl_entities.hwm import FileListHWM
downloader = FileDownloader(
connection=sftp,
source_path="/remote",
local_path="/local",
hwm=FileListHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
with IncrementalStrategy():
df = downloader.run()
# current run will download only files which were added since previous run
.. code-tab:: py Incremental run with :ref:`file-downloader` and ``hwm=FileModifiedTimeHWM(...)``
from onetl.file import FileDownloader
from onetl.strategy import SnapshotStrategy
from etl_entities.hwm import FileModifiedTimeHWM
downloader = FileDownloader(
connection=sftp,
source_path="/remote",
local_path="/local",
hwm=FileModifiedTimeHWM( # mandatory for IncrementalStrategy
name="my_unique_hwm_name",
),
)
with IncrementalStrategy():
df = downloader.run()
# current run will download only files which were modified/created since previous run