Spark S3 Connection¶
Bases: SparkFileDFConnection
Spark connection to S3 filesystem. |support_hooks|
Based on Hadoop-AWS module <https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html>
and Spark integration with Cloud Infrastructures <https://spark.apache.org/docs/latest/cloud-integration.html>.
.. seealso::
Before using this connector please take into account :ref:`spark-s3-prerequisites`
.. note::
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc. For these operations,
use :obj:`S3 <onetl.connection.file_connection.s3.S3>` connection.
.. versionadded:: 0.9.0
Parameters:
-
host(str) –Host of S3 source. For example:
domain.com -
port(int) –Port of S3 source
-
bucket(str) –Bucket name in the S3 file source
-
protocol(str, default:: `https`) –Connection protocol. Allowed values:
httpsorhttp -
access_key(str) –Access key (aka user ID) of an account in the S3 service
-
secret_key(str) –Secret key (aka password) of an account in the S3 service
-
region(str) –Region name of bucket in S3 service Optional for some S3 implementations (MinIO, Ozone), but could be mandatory for others.
-
path_style_access(bool) –Trueto connect to bucket asprotocol://host/bucket,Falseto useprotocol://bucket.hostinstead. This depends on S3 implementation. -
session_token(str) –Session token generated by S3 STS service, if used.
-
extra(dict) –A dictionary of additional properties to be used when connecting to S3.
These are Hadoop AWS specific properties, see links below:
Hadoop AWS <https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration>_Hadoop AWS committers options <https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html>_
Options are passed without prefixes
spark.hadoop.,fs.s3a.andfs.s3a.bucket.$BUCKET., for example:.. code:: python
extra = { "committer.magic.enabled": True, "committer.name": "magic", "connection.timeout": 300000, }.. warning::
Options that populated from connection attributes (like ``endpoint``, ``access.key``) are not allowed to override. But you may override ``aws.credentials.provider`` and pass custom credential options. -
spark(:class:pyspark.sql.SparkSession) –Spark session
Examples:
.. tabs::
.. code-tab:: py Create S3 connection with bucket as subdomain (``my-bucket.domain.com``):
from onetl.connection import SparkS3
from pyspark.sql import SparkSession
# Create Spark session with Hadoop AWS libraries loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.8")
# Some packages are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = SparkS3.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(excluded_packages))
.config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config(
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
)
.config(
"spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
)
.config(
"spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
)
.getOrCreate()
)
# Create connection
s3 = SparkS3(
host="domain.com",
protocol="http",
bucket="my-bucket",
access_key="ACCESS_KEY",
secret_key="SECRET_KEY",
path_style_access=False,
region="us-east-1",
spark=spark,
).check()
.. code-tab:: py Create S3 connection with bucket as subpath (``domain.com/my-bucket``)
# Create Spark session with Hadoop AWS libraries loaded
...
# Create connection
s3 = SparkS3(
host="domain.com",
protocol="http",
bucket="my-bucket",
access_key="ACCESS_KEY",
secret_key="SECRET_KEY",
path_style_access=True,
region="us-east-1",
spark=spark,
).check()
check()
¶
Check source availability. |support_hooks|
If not, an exception will be raised.
Returns:
-
Connection itself–
Raises:
-
RuntimeError–If the connection is not available
Examples:
.. code:: python
connection.check()
close()
¶
Close all connections created to S3. |support_hooks|
Also resets all fs.s3a.bucket.$BUCKET.* properties of Hadoop configuration.
.. note::
Connection can be used again after it was closed.
Returns:
-
Connection itself–
Examples:
Close connection automatically:
.. code:: python
with connection:
...
Close connection manually:
.. code:: python
connection.close()
get_packages(spark_version, scala_version=None)
classmethod
¶
Get package names to be downloaded by Spark. |support_hooks|
.. versionadded:: 0.9.0
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version.
Examples:
.. code:: python
from onetl.connection import SparkS3
SparkS3.get_packages(spark_version="3.5.8")
SparkS3.get_packages(spark_version="3.5.8", scala_version="2.12")
get_exclude_packages()
classmethod
¶
Get package names to be excluded by Spark. |support_hooks|
.. versionadded:: 0.13.0
Examples:
.. code:: python
from onetl.connection import SparkS3
SparkS3.get_exclude_packages()