Reading from Postgres using Postgres.sql¶
Postgres.sql allows passing custom SQL query, but does not support incremental strategies.
Warning
Please take into account Postgres types
Warning
Statement is executed in read-write connection, so if you're calling some functions/procedures with DDL/DML statements inside, they can change data in your database.
Syntax support¶
Only queries with the following syntax are supported:
- ✅︎
SELECT ... FROM ... - ✅︎
WITH alias AS (...) SELECT ... - ❌
SET ...; SELECT ...;- multiple statements not supported
Examples¶
from onetl.connection import Postgres
postgres = Postgres(...)
df = postgres.sql(
"""
SELECT
id,
key,
CAST(value AS text) value,
updated_at
FROM
some.mytable
WHERE
key = 'something'
""",
options=Postgres.SQLOptions(
partitionColumn="id",
numPartitions=10,
lowerBound=0,
upperBound=1000,
),
)
Recommendations¶
Select only required columns¶
Instead of passing SELECT * FROM ... prefer passing exact column names SELECT col1, col2, ....
This reduces the amount of data passed from Postgres to Spark.
Pay attention to where value¶
Instead of filtering data on Spark side using df.filter(df.column == 'value') pass proper WHERE column = 'value' clause.
This both reduces the amount of data send from Postgres to Spark, and may also improve performance of the query.
Especially if there are indexes or partitions for columns used in where clause.
Options¶
PostgresSQLOptions
¶
Bases: JDBCSQLOptions
Options specifically for SQL queries
These options allow you to specify configurations for executing SQL queries without relying on Spark's partitioning mechanisms.
.. versionadded:: 0.11.0
Split up Postgres.ReadOptions to Postgres.SQLOptions
Examples:
.. note::
You can pass any JDBC configuration
`supported by Spark <https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html>`_,
tailored to optimize SQL query execution. **Option names should be in** ``camelCase``!
.. code:: python
from onetl.connection import Postgres
options = Postgres.SQLOptions(
partitionColumn="reg_id",
numPartitions=10,
lowerBound=0,
upperBound=1000,
customSparkOption="value",
)
fetchsize = 100000
class-attribute
instance-attribute
¶
Fetch N rows from an opened cursor per one read round.
Tuning this option can influence performance of reading.
.. warning::
Default value is different from Spark.
Spark uses driver's own value, and it may be different in different drivers,
and even versions of the same driver. For example, Oracle has
default ``fetchsize=10``, which is absolutely not usable.
Thus we've overridden default value with ``100_000``, which should increase reading performance.
.. versionchanged:: 0.2.0
Set explicit default value to 100_000
lower_bound = Field(default=None, alias='lowerBound')
class-attribute
instance-attribute
¶
Defines the lower boundary for partitioning the query's data. Mandatory if :obj:~partition_column is set
num_partitions = Field(default=None, alias='numPartitions')
class-attribute
instance-attribute
¶
Number of jobs created by Spark to read the table content in parallel.
partition_column = Field(default=None, alias='partitionColumn')
class-attribute
instance-attribute
¶
Column used to partition data across multiple executors for parallel query processing.
.. warning:: It is highly recommended to use primary key, or column with an index to avoid performance issues.
.. dropdown:: Example of using partitionColumn="id" with partitioning_mode="range"
.. code-block:: sql
-- If partition_column is 'id', with numPartitions=4, lowerBound=1, and upperBound=100:
-- Executor 1 processes IDs from 1 to 25
SELECT ... FROM table WHERE id >= 1 AND id < 26
-- Executor 2 processes IDs from 26 to 50
SELECT ... FROM table WHERE id >= 26 AND id < 51
-- Executor 3 processes IDs from 51 to 75
SELECT ... FROM table WHERE id >= 51 AND id < 76
-- Executor 4 processes IDs from 76 to 100
SELECT ... FROM table WHERE id >= 76 AND id <= 100
-- General case for Executor N
SELECT ... FROM table
WHERE partition_column >= (lowerBound + (N-1) * stride)
AND partition_column <= upperBound
-- Where ``stride`` is calculated as ``(upperBound - lowerBound) / numPartitions``.
query_timeout = Field(default=None, alias='queryTimeout')
class-attribute
instance-attribute
¶
The number of seconds the driver will wait for a statement to execute. Zero means there is no limit.
This option depends on driver implementation, some drivers can check the timeout of each query instead of an entire JDBC batch.
session_init_statement = Field(default=None, alias='sessionInitStatement')
class-attribute
instance-attribute
¶
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block).
Use this to implement session initialization code.
Example:
.. code:: python
sessionInitStatement = """
BEGIN
execute immediate
'alter session set "_serial_direct_read"=true';
END;
"""
upper_bound = Field(default=None, alias='upperBound')
class-attribute
instance-attribute
¶
Sets the lower boundary for data partitioning. Mandatory if :obj:~partition_column is set
parse(options)
classmethod
¶
If a parameter inherited from the ReadOptions class was passed, then it will be returned unchanged. If a Dict object was passed it will be converted to ReadOptions.
Otherwise, an exception will be raised