Examples Innovation Release

Pipelines are the primary way to define AI workflows in EDB Postgres® AI. Each pipeline consists of one or more sequential steps where data flows linearly: the output of step 1 becomes the input for step 2, and so on.

Creating a pipeline (table source)

To create a pipeline using a standard Postgres table as your data source, use the aidb.create_pipeline() function. The following examples show how to set up both a single-step and a multi-step pipeline.

Single-step pipeline

This example creates a knowledge base from a Postgres table. After running the pipeline, you can query the resulting embeddings for semantic search.

SELECT aidb.create_pipeline(
  name => 'kb_pipeline_table',
  source => 'source_table',
  source_key_column => 'id',
  source_data_column => 'content',
  step_1 => 'KnowledgeBase',
  step_1_options => aidb.knowledge_base_config(
     model => 'bert',  -- this is a pre-defined locally running model
     data_format => 'Text'
   )
);

Multi-step pipeline

You can chain multiple operations. This example first parses HTML and then chunks the resulting text.

SELECT aidb.create_pipeline(
   name => 'html_processing_pipeline',
   source => 'web_data_table',
   source_key_column => 'id',
   source_data_column => 'html_content',
   step_1 => 'ParseHtml',
   step_2 => 'ChunkText'
);

Creating a pipeline (volume source)

If your data lives in external storage (like an S3 bucket), you must first define a storage location and a volume.

Step 1: Define storage and volume

Create the PGFS storage location and AIDB volume.

SELECT pgfs.create_storage_location('s3_bucket_location', 's3://my-ai-data',
 options => '{"region": "us-east-1", "skip_signature": "true"}'
 )
SELECT aidb.create_volume('source_volume', 's3_bucket_location', '/', 'Text');

Step 2: Create the pipeline

SELECT aidb.create_pipeline(
   name => 'pipeline_from_s3',
   source => 'source_volume',
   step_1 => 'KnowledgeBase',
   step_1_options => aidb.knowledge_base_config(
     model => 'dummy',
     data_format => 'Text'
   )
);

Running and updating pipelines

Pipelines are disabled by default upon creation. You can trigger them manually or enable automatic processing.

Manual execution

Run the Pipeline once on all existing data:

SELECT aidb.run_pipeline('kb_pipeline_table');

Enable auto-processing

Keep the pipeline up-to-date as source data changes:

SELECT aidb.update_pipeline(
  name => 'kb_pipeline_table',
  auto_processing => 'Live'
);

Monitoring and deletion

To view your existing pipelines and their configurations, query the aidb.pipelines view:

SELECT * FROM aidb.pipelines;
Output
   name      | source_type | source_schema |       source       | source_key_column | source_data_column | destination_type | destination_schema |       destination       | destination_key_column | destination_data_column |                                                 steps                                                  | auto_processing | batch_size | background_sync_interval |              owner_role              
----------------+-------------+---------------+--------------------+-------------------+--------------------+------------------+--------------------+-------------------------+------------------------+-------------------------+--------------------------------------------------------------------------------------------------------+-----------------+------------+--------------------------+--------------------------------------
pipeline__8989 | Table       | public        | source_table__8989 | id                | content            | Table            | public             | pipeline_pipeline__8989 | source_id              | value                   | [{"options": {"max_length": null, "desired_length": 1000}, "operation": "ChunkText", "step_order": 1}] | Live            |         42 | @ 42 mins                | role_pipeline_management_single_step
(1 row)
SELECT name, source, auto_processing FROM aidb.pipelines;
Output
  name       |     source.         | auto_processing
----------------+-------------------+-----------------
pipeline__8989 | source_table__8989 | Live
(1 row)

Deleting a pipeline will also drop any destination tables generated by the Pipeline steps. To delete a Pipeline:

SELECT aidb.delete_pipeline('kb_pipeline_table');

End-to-end example: Multi-step pipeline with intermediate storage

This end-to-end example demonstrates how to configure a multi-step pipeline in EDB Postgres AI that utilizes intermediate storage.

By defining an intermediate_destination, you can persist the results of individual steps (like text chunking) before they pass to the next stage (like summarization), which is useful for debugging or reusing processed data.

Key features of this example:

  • Multi-step workflow: Chains ChunkText (Step 1) into SummarizeText (Step 2).

  • Auto-processing: The pipeline automatically triggers whenever new data is inserted into the source table.

  • Intermediate storage: Demonstrates how to explicitly name an intermediate table versus allowing the system to auto-generate one.

Example 1: Named intermediate destination

In this scenario, we will manually name the table that stores the output of the chunking step.

  • Create a source table for your raw text:

    CREATE TABLE source_table_demo(
      id INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
      content TEXT NOT NULL
    );
  • Define the pipeline with a custom intermediate table:

    SELECT * from aidb.create_pipeline(
      name => 'complex_pipeline_v1',
      source => 'source_table_demo',
      source_key_column => 'id',
      source_data_column => 'content',
      auto_processing => 'Live',
      step_1 => 'ChunkText',
      step_1_options => jsonb_build_object(
          'desired_length', 20,
          'intermediate_destination', jsonb_build_object(
              'enabled', true,
              'destination', 'my_custom_chunk_storage' -- Explicitly named
          )
      ),
      step_2 => 'SummarizeText'
     );
  • Insert data to trigger auto-processing:

    INSERT INTO source_table_demo (content)
    VALUES ('This is a long text example that will be split into segments for easier processing.');
  • View the chunks and final summaries:

    -- View intermediate chunked output
    SELECT * FROM my_custom_chunk_storage;
    
    -- View the final summarized output
    SELECT * FROM pipeline_complex_pipeline_v1;

Example 2: Automatic intermediate destination

If you enable intermediate storage but do not provide a name, the system generates a table following the pattern pipeline_[name]_step_[n].

  • Create a pipeline with auto-named intermediate storage:

    SELECT * from aidb.create_pipeline(
      name => 'auto_named_pipeline',
      source => 'source_table_demo_2',
      source_key_column => 'id',
      source_data_column => 'content',
      auto_processing => 'Live',
      step_1 => 'ChunkText',
      step_1_options => jsonb_build_object(
          'desired_length', 20,
          'intermediate_destination', jsonb_build_object('enabled', true)
      ),
      step_2 => 'SummarizeText'
    );
  • Access the auto-generated intermediate table:

    -- View intermediate chunked output (auto-generated table name)
    SELECT * FROM pipeline_auto_named_pipeline_step_1;