Skip to content

Spark S3 Connection

Bases: SparkFileDFConnection

Spark connection to S3 filesystem. |support_hooks|

Based on Hadoop-AWS module <https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html> and Spark integration with Cloud Infrastructures <https://spark.apache.org/docs/latest/cloud-integration.html>.

.. seealso::

Before using this connector please take into account :ref:`spark-s3-prerequisites`

.. note::

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc. For these operations,
use :obj:`S3 <onetl.connection.file_connection.s3.S3>` connection.

.. versionadded:: 0.9.0

Parameters:

  • host (str) –

    Host of S3 source. For example: domain.com

  • port (int) –

    Port of S3 source

  • bucket (str) –

    Bucket name in the S3 file source

  • protocol (str, default: : `https` ) –

    Connection protocol. Allowed values: https or http

  • access_key (str) –

    Access key (aka user ID) of an account in the S3 service

  • secret_key (str) –

    Secret key (aka password) of an account in the S3 service

  • region (str) –

    Region name of bucket in S3 service Optional for some S3 implementations (MinIO, Ozone), but could be mandatory for others.

  • path_style_access (bool) –

    True to connect to bucket as protocol://host/bucket, False to use protocol://bucket.host instead. This depends on S3 implementation.

  • session_token (str) –

    Session token generated by S3 STS service, if used.

  • extra (dict) –

    A dictionary of additional properties to be used when connecting to S3.

    These are Hadoop AWS specific properties, see links below:

    • Hadoop AWS <https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration>_
    • Hadoop AWS committers options <https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html>_

    Options are passed without prefixes spark.hadoop., fs.s3a. and fs.s3a.bucket.$BUCKET., for example:

    .. code:: python

    extra = {
        "committer.magic.enabled": True,
        "committer.name": "magic",
        "connection.timeout": 300000,
    }
    

    .. warning::

    Options that populated from connection
    attributes (like ``endpoint``, ``access.key``) are not allowed to override.
    
    But you may override ``aws.credentials.provider`` and pass custom credential options.
    
  • spark (:class:pyspark.sql.SparkSession) –

    Spark session

Examples:

.. tabs::

.. code-tab:: py Create S3 connection with bucket as subdomain (``my-bucket.domain.com``):

    from onetl.connection import SparkS3
    from pyspark.sql import SparkSession

    # Create Spark session with Hadoop AWS libraries loaded
    maven_packages = SparkS3.get_packages(spark_version="3.5.8")
    # Some packages are not used, but downloading takes a lot of time. Skipping them.
    excluded_packages = SparkS3.get_exclude_packages()
    spark = (
        SparkSession.builder.appName("spark-app-name")
        .config("spark.jars.packages", ",".join(maven_packages))
        .config("spark.jars.excludes", ",".join(excluded_packages))
        .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
        .config("spark.hadoop.fs.s3a.committer.name", "magic")
        .config(
            "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
            "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
        )
        .config(
            "spark.sql.parquet.output.committer.class",
            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
        )
        .config(
            "spark.sql.sources.commitProtocolClass",
            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
        )
        .getOrCreate()
    )

    # Create connection
    s3 = SparkS3(
        host="domain.com",
        protocol="http",
        bucket="my-bucket",
        access_key="ACCESS_KEY",
        secret_key="SECRET_KEY",
        path_style_access=False,
        region="us-east-1",
        spark=spark,
    ).check()

.. code-tab:: py Create S3 connection with bucket as subpath (``domain.com/my-bucket``)

    # Create Spark session with Hadoop AWS libraries loaded
    ...

    # Create connection
    s3 = SparkS3(
        host="domain.com",
        protocol="http",
        bucket="my-bucket",
        access_key="ACCESS_KEY",
        secret_key="SECRET_KEY",
        path_style_access=True,
        region="us-east-1",
        spark=spark,
    ).check()

check()

Check source availability. |support_hooks|

If not, an exception will be raised.

Returns:

  • Connection itself

Raises:

  • RuntimeError

    If the connection is not available

Examples:

.. code:: python

connection.check()

close()

Close all connections created to S3. |support_hooks|

Also resets all fs.s3a.bucket.$BUCKET.* properties of Hadoop configuration.

.. note::

Connection can be used again after it was closed.

Returns:

  • Connection itself

Examples:

Close connection automatically:

.. code:: python

with connection:
    ...

Close connection manually:

.. code:: python

connection.close()

get_packages(spark_version, scala_version=None) classmethod

Get package names to be downloaded by Spark. |support_hooks|

.. versionadded:: 0.9.0

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

Examples:

.. code:: python

from onetl.connection import SparkS3

SparkS3.get_packages(spark_version="3.5.8")
SparkS3.get_packages(spark_version="3.5.8", scala_version="2.12")

get_exclude_packages() classmethod

Get package names to be excluded by Spark. |support_hooks|

.. versionadded:: 0.13.0

Examples:

.. code:: python

from onetl.connection import SparkS3

SparkS3.get_exclude_packages()