Example 1

The following example is from Postgre-CDC sink to Icebreg (AWS + Gluecatalog) via Zeta

env { 
    job.mode = "streaming"
    parallelism = 1
    checkpoint.interval = 3000
    checkpoint.timeout = 10000000
}

source {
  Postgres-CDC {
    plugin_output         = "ny_taxi_source"
    username              = "root"
    password              = "root"
    database-names        = ["ny_taxi"]
    schema-names          = ["public"]
    base-url              = "jdbc:postgresql://localhost:5432/ny_taxi"
    decoding.plugin.name  = "pgoutput"

    table-names           = ["ny_taxi.public.yellow_taxi_trips"]

    exactly_once          = false
  }
}

sink {
  Iceberg {
    plugin_input = "ny_taxi_source"

    catalog_name = "glue_cat"         
    iceberg.catalog.config = {
      warehouse    = "s3://****"
      catalog-impl = "org.apache.iceberg.aws.glue.GlueCatalog"
      io-impl      = "org.apache.iceberg.aws.s3.S3FileIO"

      "client.region" = "****"        
    }

    namespace = "analytics_db"
    table     = "ny_taxi_table"
  }
}

image.png

image.png

./bin/seatunnel.sh --config config/postgre_to_iceberg.conf -m local   

image.png

image.png

image.png

Example 2

But yeah, btw, you will also need to do as https://github.com/apache/seatunnel/pull/8547 in order to make the Postgres CDC worked, so I gonna give you another example from SFTP → Iceberg AWS (Zeta + GlueCatlog)

image.png

Now I will try to sink the the_file.jsonl to the Iceberg

the_file.jsonl is one of group-by-catagory reviews from https://amazon-reviews-2023.github.io

env { 
    job.mode = "batch"
}

source {
    SftpFile {
        host = "localhost"
        port = 2222
        user = "foo"
        password = "Passw0rd"
        path = "/the_file.jsonl"
        file_format_type = "json"
        plugin_output = "sftp_source"
        schema = {
            fields {
                rating             = float
                title              = string
                text               = string
                images             = "array<map<string,string>>"
                asin               = string
                parent_asin        = string
                user_id            = string
                timestamp          = bigint
                verified_purchase  = boolean
                helpful_vote       = int
            }
        }

    }
}

sink {
  Iceberg {
    plugin_input = "sql_transformed"

    catalog_name = "glue_cat"         
    iceberg.catalog.config = {
      warehouse    = "s3://*****"
      catalog-impl = "org.apache.iceberg.aws.glue.GlueCatalog"
      io-impl      = "org.apache.iceberg.aws.s3.S3FileIO"

      "client.region" = "*****"        
    }

    namespace = "analytics_db"
    table     = "amazan_reviews_software_rating_helpful_votes"
  }
}

And after

 ./bin/seatunnel.sh --config config/sftp_to_iceberg.conf  -m local   

image.png

image.png