Reading from MongoDB using DBReader¶
DBReader supports strategy for incremental data reading, but does not support custom pipelines, e.g. aggregation.
Warning
Please take into account MongoDB types
Supported DBReader features¶
- ❌
columns(for now, all document fields are read) - ✅︎
where(passed to{"$match": ...}aggregation pipeline) - ✅︎
hwm, supported strategies:- ✅︎ Snapshot strategy
- ✅︎ Incremental strategy
- ✅︎ Snapshot batch strategy
- ✅︎ Incremental batch strategy
- Note that
expressionfield of HWM can only be a field name, not a custom expression
- ✅︎
hint(see official documentation) - ✅︎
df_schema(mandatory) - ✅︎
options(see MongoDB.ReadOptions)
Examples¶
Snapshot strategy:
from onetl.connection import MongoDB
from onetl.db import DBReader
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
TimestampType,
)
mongodb = MongoDB(...)
# mandatory
df_schema = StructType(
[
StructField("_id", StringType()),
StructField("some", StringType()),
StructField(
"field",
StructType(
[
StructField("nested", IntegerType()),
],
),
),
StructField("updated_dt", TimestampType()),
]
)
reader = DBReader(
connection=mongodb,
source="some_collection",
df_schema=df_schema,
where={"field": {"$eq": 123}},
hint={"field": 1},
options=MongoDBReadOptions(batchSize=10000),
)
df = reader.run()
Incremental strategy:
from onetl.connection import MongoDB
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
TimestampType,
)
mongodb = MongoDB(...)
# mandatory
df_schema = StructType(
[
StructField("_id", StringType()),
StructField("some", StringType()),
StructField(
"field",
StructType(
[
StructField("nested", IntegerType()),
],
),
),
StructField("updated_dt", TimestampType()),
]
)
reader = DBReader(
connection=mongodb,
source="some_collection",
df_schema=df_schema,
where={"field": {"$eq": 123}},
hint={"field": 1},
hwm=DBReader.AutoDetectHWM(name="mongodb_hwm", expression="updated_dt"),
options=MongoDBReadOptions(batchSize=10000),
)
with IncrementalStrategy():
df = reader.run()
Recommendations¶
Pay attention to where value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper DBReader(where={"column": {"$eq": "value"}}) clause.
This both reduces the amount of data send from MongoDB to Spark, and may also improve performance of the query.
Especially if there are indexes for columns used in where clause.
Read options¶
MongoDBReadOptions
¶
Bases: GenericOptions
Reading options for MongoDB connector.
.. warning::
Options ``uri``, ``database``, ``collection``, ``pipeline``, ``hint`` are populated from connection
attributes, and cannot be overridden by the user in ``ReadOptions`` to avoid issues.
.. versionadded:: 0.7.0
Examples:
.. note ::
You can pass any value
`supported by connector <https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!
The set of supported options depends on connector version.
.. code:: python
from onetl.connection import MongoDB
options = MongoDB.ReadOptions(
sampleSize=100,
)