ingestr ingest
The ingest command is a core feature of the ingestr tool, allowing users to transfer data from a source to a destination with optional support for incremental updates.
Example
The following example demonstrates how to use the ingest command to transfer data from a source to a destination.
ingestr ingest \
--source-uri '<your-source-uri-here>' \
--source-table '<your-schema>.<your-table>' \
--dest-uri '<your-destination-uri-here>'Required flags
--source-uri TEXT: Required. Specifies the URI of the data source.--dest-uri TEXT: Required. Specifies the URI of the destination where data will be ingested.--source-table TEXT: Required. Defines the source table to fetch data from.
Optional flags
--dest-table TEXT: Designates the destination table to save the data. If not specified, defaults to the value of--source-table.--incremental-key TEXT: Identifies the key used for incremental data strategies. Defaults toNone.--incremental-strategy TEXT: Defines the strategy for incremental updates. Options includereplace,append,delete+insert, ormerge. The default strategy isreplace.--interval-start: Sets the start of the interval for the incremental key. Defaults toNone.--interval-end: Sets the end of the interval for the incremental key. Defaults toNone.--primary-key TEXT: Specifies the primary key for the merge operation. Defaults toNone.--columns <name>:<type>:<source>: Specifies the columns to be ingested. Usename:typeto override a column's type,name:type:sourceto renamesourcetonamewith a type, orname::sourceto rename only. Multiple entries are comma-separated. Defaults toNone.--no-inference: Skips schema inference for schema-less sources and uses--columnsas the source schema. Requires--columns.--mask <column_name>:<algorithm>[:param]: Applies data masking to specified columns. Can be used multiple times for different columns. See the Data Masking documentation for available algorithms and usage examples. Defaults toNone.--trim-whitespace: Trims leading and trailing whitespace from all string column values before writing to the destination. This applies to regular batch ingestions and CDC ingestions, preserves nulls and column types, and leaves non-string columns unchanged. Defaults tofalse. Can also be set withTRIM_WHITESPACE=trueorINGESTR_TRIM_WHITESPACE=true.--schema-namingSpecifies what naming convention to use for table and column names on the destination. Can bedefaultordirect.default is snake_case. `direct is case sensitive and doesn't contract underscores.--stream: Runs continuous (streaming) ingestion instead of a one-shot load. Supported by CDC sources (postgres+cdc,mssql+cdc) and message brokers (kafka,amqp). The process runs until interrupted (SIGINT/SIGTERM), flushing buffered records to the destination on an interval or record-count trigger. See Streaming ingestion below.--flush-interval: In streaming mode, flush buffered records to the destination at least this often. Defaults to30s. Only valid with--stream.--flush-records: In streaming mode, flush when this many records have been buffered. Defaults to50000. Only valid with--stream.
The interval-start and interval-end options support various datetime formats, here are some examples:
%Y-%m-%d:2023-01-31%Y-%m-%dT%H:%M:%S:2023-01-31T15:00:00%Y-%m-%dT%H:%M:%S%z:2023-01-31T15:00:00+00:00%Y-%m-%dT%H:%M:%S.%f:2023-01-31T15:00:00.000123%Y-%m-%dT%H:%M:%S.%f%z:2023-01-31T15:00:00.000123+00:00
INFO
For the details around the incremental key and the various strategies, please refer to the Incremental Loading section.
Streaming ingestion
The --stream flag turns ingest into a long-running process that continuously pulls changes from the source and flushes them to the destination, rather than running once and exiting. It is supported by:
- CDC sources (
postgres+cdc,mssql+cdc): captures every insert, update, and delete across all tables in the publication/capture set and applies them with themergestrategy. - Message brokers (
kafka,amqp): consumes messages into a fixed envelope schema — amsg_idprimary key, a JSONdatacolumn holding the decoded body and metadata, and an_ingestr_ordercolumn (source offset / delivery tag) — and applies them withmergekeyed onmsg_id, keeping the latest record per key within each flush window. Schema inference is skipped (a never-ending stream has no end to infer from).
A flush happens whenever either --flush-interval (default 30s) or --flush-records (default 50000) is reached, whichever comes first. --flush-records is the memory bound: records are buffered until a flush.
Each flush writes the buffered records, merges them into the destination, and only then confirms the source position as durable. This gives at-least-once delivery: a crash before a flush completes re-delivers the un-flushed changes on restart, and the merge (by primary key / msg_id) makes replays idempotent. The stream resumes automatically — CDC from the destination's last recorded LSN, brokers from their committed offset / unacknowledged messages.
Stop a stream with Ctrl+C (SIGINT) or SIGTERM; ingestr performs a final flush of buffered data and exits cleanly.
# Stream all changes from a Postgres publication into BigQuery, flushing
# every 15 seconds or 100k changes, whichever comes first.
ingestr ingest \
--source-uri 'postgres+cdc://user:pass@localhost:5432/mydb?publication=my_pub' \
--dest-uri 'bigquery://my_project?credentials_path=/path/to/sa.json' \
--stream \
--flush-interval 15s \
--flush-records 100000INFO
Schema changes are picked up at startup. If the source schema changes while a stream is running, restart the stream to apply the new schema. Run streaming ingestion under a supervisor (systemd, Kubernetes, etc.) so it restarts after transient source/destination outages.
General flags
--help: Displays the help message and exits the command.
Examples
Ingesting a CSV file to DuckDB
ingestr ingest \
--source-uri 'csv://input.csv' \
--source-table 'sample' \
--dest-uri 'duckdb://output.duckdb'Copy a table from Postgres to DuckDB
ingestr ingest \
--source-uri 'postgresql://myuser:mypassword@localhost:5432/mydatabase?sslmode=disable' \
--source-table 'public.input_table' \
--dest-uri 'duckdb://output.duckdb' \
--dest-table 'public.output_table'Incrementally ingest a table from Postgres to BigQuery
ingestr ingest
--source-uri 'postgresql://myuser:mypassword@localhost:5432/mydatabase?sslmode=disable' \
--source-table 'public.users' \
--dest-uri 'bigquery://my_project?credentials_path=/path/to/service/account.json&location=EU' \
--dest-table 'raw.users' \
--incremental-key 'updated_at' \
--incremental-strategy 'delete+insert'Load an interval of data from Postgres to BigQuery using a date column
ingestr ingest
--source-uri 'postgresql://myuser:mypassword@localhost:5432/mydatabase?sslmode=disable' \
--source-table 'public.users' \
--dest-uri 'bigquery://my_project?credentials_path=/path/to/service/account.json&location=EU' \
--dest-table 'raw.users' \
--incremental-key 'dt' \
--incremental-strategy 'delete+insert' \
--interval-start '2023-01-01' \
--interval-end '2023-01-31' \
--columns 'dt:date'Load a specific query from Postgres to Snowflake
ingestr ingest
--source-uri 'postgresql://myuser:mypassword@localhost:5432/mydatabase?sslmode=disable' \
--dest-uri 'snowflake://user:password@account/dbname?warehouse=COMPUTE_WH&role=my_role' \
--source-table 'query:SELECT * FROM public.users as pu JOIN public.orders as o ON pu.id = o.user_id WHERE pu.dt BETWEEN :interval_start AND :interval_end' \
--dest-table 'raw.users' \
--incremental-key 'dt' \
--incremental-strategy 'delete+insert' \
--interval-start '2023-01-01' \
--interval-end '2023-01-31' \
--columns 'dt:date'Ingesting with Data Masking
ingestr ingest \
--source-uri 'postgresql://user:pass@localhost/customers' \
--source-table 'customer_data' \
--dest-uri 'duckdb:///masked_customers.db' \
--dest-table 'masked_customers' \
--mask 'email:hash' \
--mask 'phone:partial:3' \
--mask 'ssn:redact' \
--mask 'salary:round:5000'This example demonstrates masking sensitive customer data:
- Email addresses are hashed for consistent anonymization
- Phone numbers show only first and last 3 digits
- SSNs are completely redacted
- Salaries are rounded to nearest $5000
Trimming whitespace from string values
ingestr ingest \
--source-uri 'postgresql://user:pass@localhost/app?sslmode=disable' \
--source-table 'public.customers' \
--dest-uri 'duckdb:///warehouse.duckdb' \
--dest-table 'raw.customers' \
--trim-whitespaceThis trims leading and trailing whitespace from string values as data streams through ingestr. For example, " Alice " becomes "Alice" and "\tA-123\n" becomes "A-123". Interior whitespace, such as "ACME Inc", is preserved.
INFO
For more examples, please refer to the specific platforms' documentation on the sidebar.