Skip to content

Reading from MongoDB using MongoDB.pipeline

MongoDB.sql allows passing custom pipeline, but does not support incremental strategies.

Warning

Please take into account Mongodb types

Recommendations

Pay attention to pipeline value

Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper mongodb.pipeline(..., pipeline={"$match": {"column": {"$eq": "value"}}}) value. This both reduces the amount of data send from MongoDB to Spark, and may also improve performance of the query. Especially if there are indexes for columns used in pipeline value.

References

pipeline(collection, pipeline=None, df_schema=None, options=None)

Execute a pipeline for a specific collection, and return DataFrame. |support_hooks|

Almost like Aggregation pipeline syntax <https://www.mongodb.com/docs/manual/core/aggregation-pipeline/>_ in MongoDB:

.. code:: js

db.collection_name.aggregate([{"$match": ...}, {"$group": ...}])

but pipeline is executed on Spark executors, in a distributed way.

.. note::

This method does not support :ref:`strategy`,
use :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>` instead

.. versionadded:: 0.7.0

Parameters:

  • collection (str) –

    Collection name.

  • pipeline (dict | list[dict], default: None ) –

    Pipeline containing a database query. See Aggregation pipeline syntax <https://www.mongodb.com/docs/manual/core/aggregation-pipeline/>_.

  • df_schema (StructType, default: None ) –

    Schema describing the resulting DataFrame.

  • options (PipelineOptions | dict, default: None ) –

    Additional pipeline options, see :obj:MongoDB.PipelineOptions <onetl.connection.db_connection.mongodb.options.MongoDBPipelineOptions>.

Examples:

Get document with a specific field value:

.. code:: python

df = connection.pipeline(
    collection="collection_name",
    pipeline={"$match": {"field": {"$eq": 1}}},
)

Calculate aggregation and get result:

.. code:: python

df = connection.pipeline(
    collection="collection_name",
    pipeline={
        "$group": {
            "_id": 1,
            "min": {"$min": "$column_int"},
            "max": {"$max": "$column_int"},
        }
    },
)

Explicitly pass DataFrame schema:

.. code:: python

from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
    TimestampType,
)

df_schema = StructType(
    [
        StructField("_id", StringType()),
        StructField("some_string", StringType()),
        StructField("some_int", IntegerType()),
        StructField("some_datetime", TimestampType()),
        StructField("some_float", DoubleType()),
    ],
)

df = connection.pipeline(
    collection="collection_name",
    df_schema=df_schema,
    pipeline={"$match": {"some_int": {"$gt": 999}}},
)

Pass additional options to pipeline execution:

.. code:: python

df = connection.pipeline(
    collection="collection_name",
    pipeline={"$match": {"field": {"$eq": 1}}},
    options=MongoDB.PipelineOptions(hint={"field": 1}),
)

MongoDBPipelineOptions

Bases: GenericOptions

Aggregation pipeline options for MongoDB connector.

The only difference from :obj:MongoDB.ReadOptions <MongoDBReadOptions> that latter does not allow to pass the hint parameter.

.. warning::

Options ``uri``, ``database``, ``collection``, ``pipeline`` are populated from connection attributes,
and cannot be overridden by the user in ``PipelineOptions`` to avoid issues.

.. versionadded:: 0.7.0

Examples:

.. note ::

You can pass any value
`supported by connector <https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read-config/>`_,
even if it is not mentioned in this documentation. **Option names should be in** ``camelCase``!

The set of supported options depends on connector version.

.. code:: python

from onetl.connection import MongoDB

options = MongoDB.PipelineOptions(
    hint={"some_field": 1},
)