Reading from object storage on WarehousePG v1.9

PGAA on WarehousePG supports three ways to access data in object storage. Each option is independent. Use the one that fits your setup.

Querying object storage using storage locations

Storage locations connect directly to object storage and support Parquet, Delta Lake, and Iceberg files. Create a storage location and a PGAA table on the coordinator, then query the data using standard SQL. WarehousePG selects the most efficient execution mode automatically, running the query on the coordinator or distributing it across segment hosts depending on the nature of the query.

Note

PGAA tables on WarehousePG are read-only. The data must already exist in object storage. Writing data to object storage using CREATE TABLE AS SELECT (CTAS) isn't supported.

For full details on configuring a storage location, see Reading tables in object storage.

Example

The following example connects to a public S3 bucket containing TPC-H SF1 data, registers three tables, and runs an analytical query to demonstrate both DirectScan (pure PGAA) and CompatScan (PGAA joined with a native heap table).

  1. Create a storage location on the coordinator:

    SELECT pgfs.create_storage_location(
      'sample-data',
      's3://beacon-analytics-demo-data-us-east-1-prod',
      '{"aws_skip_signature": "true"}'
    );
  2. Create PGAA tables pointing to the data:

    CREATE TABLE customer () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/customer');
    
    CREATE TABLE orders () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/orders');
    
    CREATE TABLE lineitem () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/lineitem');
  3. Run an analytical query:

    SELECT
        l_orderkey,
        sum(l_extendedprice * (1 - l_discount)) AS revenue,
        o_orderdate,
        o_shippriority
    FROM
        customer,
        orders,
        lineitem
    WHERE
        c_mktsegment = 'BUILDING'
        AND c_custkey = o_custkey
        AND l_orderkey = o_orderkey
        AND o_orderdate < DATE '1995-03-15'
        AND l_shipdate > DATE '1995-03-15'
    GROUP BY
        l_orderkey,
        o_orderdate,
        o_shippriority
    ORDER BY
        revenue DESC,
        o_orderdate
    LIMIT 10;
  4. Use EXPLAIN to observe the query plan. PGAA attempts DirectScan first, offloading execution entirely to Seafowl on the coordinator. The plan shows the full Seafowl logical plan, including predicate pushdown, aggregation, and sort. Notice that no Optimizer line appears as the WarehousePG planner wasn't involved:

                                          QUERY PLAN
    --------------------------------------------------------------------------------------------
     SeafowlDirectScan: Logical Plan
       Limit:  skip=0, fetch=10
         Sort:  revenue DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
           Projection:  lineitem.l_orderkey, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue, orders.o_orderdate, orders.o_shippriority
             Aggregate:  groupBy=[[lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority]], aggr=[[sum(lineitem.l_extendedprice * (Int64(1) - lineitem.l_discount))]]
               Filter:  customer.c_mktsegment = Utf8("BUILDING") AND customer.c_custkey = orders.o_custkey AND lineitem.l_orderkey = orders.o_orderkey AND orders.o_orderdate < CAST(Utf8("1995-03-15") AS Date32) AND lineitem.l_shipdate > CAST(Utf8("1995-03-15") AS Date32)
                 Cross Join:
                   Cross Join:
                     TableScan:  customer
                     TableScan:  orders
                   TableScan:  lineitem
    (11 rows)

    PGAA falls back to CompatScan when the query involves operations Seafowl cannot handle, such as a join with a native heap table. For example, joining customer with a native segments table:

    CREATE TABLE segments (seg_id INT, seg_name TEXT) DISTRIBUTED BY (seg_id);
    INSERT INTO segments VALUES (1, 'BUILDING'), (2, 'MACHINERY');
    
    EXPLAIN SELECT c_name, seg_name
    FROM customer
    JOIN segments ON c_mktsegment = seg_name;

    PGAA first attempts DirectScan, then falls back to CompatScan with a notice:

    NOTICE:  PGAA DirectScan failed: QueryPlanningError("Error during planning: table 'default.public.segments' not found"). Running the query in compatibility mode.
                                                   QUERY PLAN
    --------------------------------------------------------------------------------------------------------
     Gather Motion 4:1  (slice1; segments: 4)  (cost=1414.00..246348.40 rows=748960 width=64)
       ->  Hash Join  (cost=1414.00..236986.40 rows=187240 width=64)
             Hash Cond: (customer.c_mktsegment = segments.seg_name)
             ->  Custom Scan (SeafowlCompatScan) on customer  (cost=25.00..1131.25 rows=37500 width=64)
                   SeafowlPlan: Logical Plan
                     Projection:  r1.c_name, r1.c_mktsegment
                       SubqueryAlias:  r1
                         TableScan:  public.customer
                   SeafowlQuery: select r1.c_name, r1.c_mktsegment from public.customer as r1
             ->  Hash  (cost=769.00..769.00 rows=49600 width=32)
                   ->  Broadcast Motion 4:4  (slice2; segments: 4)  (cost=0.00..769.00 rows=49600 width=32)
                         ->  Seq Scan on segments  (cost=0.00..149.00 rows=12400 width=32)
     Optimizer: Postgres-based planner
    (13 rows)

    The Gather Motion 4:1 shows a Strewn locus plan. The SeafowlCompatScan runs on each of the 4 segment hosts in parallel, each reading a slice of the customer data, with results gathered to the coordinator. The join with the heap table is handled by the WarehousePG planner. The Optimizer: Postgres-based planner line at the bottom confirms that the Postgres planner was in charge of the overall plan, unlike DirectScan where Seafowl handles everything independently.

    For more on how PGAA chooses between General and Strewn locus, see Query execution.

Connecting to an Iceberg catalog

Attach an external Iceberg REST catalog on the WarehousePG coordinator to make catalog-managed tables queryable across the cluster.

You can use PGAA on WarehousePG to read data that EDB Postgres Distributed (PGD) has replicated to object storage. PGD writes Iceberg-format data and registers it in an Iceberg REST catalog, which WarehousePG can attach and query at MPP scale with no data movement. For the PGD side of this setup, see Replicating with PGD.

Note

Automatic catalog sync using pgaa.attach_catalog() isn't supported on WarehousePG. You must manually run pgaa.import_catalog() whenever the catalog changes.

For full setup instructions, including credential configuration and supported catalog types, see Integrating with Iceberg catalogs.

Example

  1. Register the catalog on the coordinator:

    SELECT pgaa.add_catalog(
      'my_catalog',
      'iceberg-rest',
      '{"uri": "https://iceberg-catalog.example.com"}'
    );
  2. Import the table metadata:

    SELECT pgaa.import_catalog('my_catalog');
  3. Optionally, limit the import to a specific namespace:

    SELECT pgaa.import_catalog('my_catalog', 'my_schema');

    Once imported, tables are queryable from any WarehousePG session. The same query plan patterns described in the storage locations example apply. Use EXPLAIN to observe whether PGAA is using DirectScan or CompatScan and which locus is selected.

Accelerating with Spark

Route queries to an external Apache Spark cluster via Spark Connect. In this mode, the coordinator forwards queries to Spark directly, segment hosts are not involved in execution, and Spark handles its own planning and distribution independently of the WarehousePG planner.

For full setup instructions, GPU acceleration options, and performance guidance, see Accelerating with Spark.

Example

  1. Set pgaa.spark_connect_url and switch the executor engine on the coordinator:

    SET pgaa.executor_engine = 'spark_connect';
    SET pgaa.spark_connect_url = 'sc://spark-connect-hostname:15002';
  2. Create a storage location and PGAA tables using the same public TPC-H sample data as the storage locations example. Spark Connect only supports Parquet, so you must set pgaa.format explicitly:

    SELECT pgfs.create_storage_location(
      'sample-data',
      's3://beacon-analytics-demo-data-us-east-1-prod',
      '{"aws_skip_signature": "true"}'
    );
    
    CREATE TABLE customer () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/customer', pgaa.format = 'parquet');
    
    CREATE TABLE orders () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/orders', pgaa.format = 'parquet');
    
    CREATE TABLE lineitem () USING PGAA
    WITH (pgaa.storage_location = 'sample-data', pgaa.path = 'tpch_sf_1/lineitem', pgaa.format = 'parquet');
  3. Run the same TPC-H Q3 query. The coordinator forwards execution to Spark:

    SELECT
        l_orderkey,
        sum(l_extendedprice * (1 - l_discount)) AS revenue,
        o_orderdate,
        o_shippriority
    FROM
        customer,
        orders,
        lineitem
    WHERE
        c_mktsegment = 'BUILDING'
        AND c_custkey = o_custkey
        AND l_orderkey = o_orderkey
        AND o_orderdate < DATE '1995-03-15'
        AND l_shipdate > DATE '1995-03-15'
    GROUP BY
        l_orderkey,
        o_orderdate,
        o_shippriority
    ORDER BY
        revenue DESC,
        o_orderdate
    LIMIT 10;
  4. Use EXPLAIN to observe the query plan. When Spark is the active engine, PGAA produces a SparkDirectScan node. The plan shows Spark's own execution, including predicate pushdown, joins, and aggregations handled entirely within the Spark cluster. No Optimizer line appears, confirming the WarehousePG planner was not involved:

                                              QUERY PLAN
    --------------------------------------------------------------------------------------------
    SparkDirectScan: Logical Plan
      AdaptiveSparkPlan:  isFinalPlan=false
      +- TakeOrderedAndProject: (limit=10, orderBy=[revenue DESC, o_orderdate ASC])
          +- HashAggregate: (keys=[l_orderkey, o_orderdate, o_shippriority], functions=[sum(...)])
            +- HashAggregate: (keys=[l_orderkey, o_orderdate, o_shippriority], functions=[partial_sum(...)])
                +- Project:  [o_orderdate, o_shippriority, l_orderkey, l_extendedprice, l_discount]
                  +- SortMergeJoin:  [o_orderkey], [l_orderkey], Inner
                      :- Sort [o_orderkey ASC NULLS FIRST]
                      :  +- Exchange hashpartitioning(o_orderkey, 200)
                      :     +- BroadcastHashJoin [c_custkey], [o_custkey], Inner, BuildLeft
                      :        :- BroadcastExchange
                      :        :  +- Filter (c_mktsegment = BUILDING)
                      :        :     +- FileScan parquet [c_custkey, c_mktsegment]
                      :        :           Location: s3a://beacon-analytics-demo-data-us-east-1-prod/tpch_sf_1/customer
                      :        +- Filter (o_orderdate < 1995-03-15)
                      :           +- FileScan parquet [o_orderkey, o_custkey, o_orderdate, o_shippriority]
                      :                 Location: s3a://beacon-analytics-demo-data-us-east-1-prod/tpch_sf_1/orders
                      +- Sort [l_orderkey ASC NULLS FIRST]
                        +- Exchange hashpartitioning(l_orderkey, 200)
                            +- Filter (l_shipdate > 1995-03-15)
                              +- FileScan parquet [l_orderkey, l_extendedprice, l_discount, l_shipdate]
                                    Location: s3a://beacon-analytics-demo-data-us-east-1-prod/tpch_sf_1/lineitem
    (22 rows)
  5. To inspect the full Spark execution plan, open the Spark UI in your browser at http://<spark-host>:4040. Navigate to the SQL tab to see a visual breakdown of the query plan, including stage boundaries, shuffle operations, and per-stage metrics.