indexOrchEngine
class OrchEngine
import { OrchEngine } from ".";

Use OrchEngine to prepare SQL for orchestration steps and execute them using DuckDB CLI engine. Each method that does not have a @ieDescr.disregard() attribute is considered a "step" and each step is executed in the order it is declared. As each step is executed, its error or results are passed to the next method.

This Engine assumes that the Kernel observer will abort on Errors. If you want to continue after an error, throw a OrchResumableError and use the second cell argument (result) to test for it.

This class is introspected and run using SQLa's Notebook infrastructure. See: https://github.com/netspective-labs/sql-aide/tree/main/lib/notebook

Constructors

new
OrchEngine(iss: o.IngestSourcesSupplier<PotentialIngestSource, [string[] | undefined]>, govn: ddbo.DuckDbOrchGovernance, args: OrchEngineArgs)

Properties

readonly
duckdb: ddbo.DuckDbShell
protected
abstract
ingestables: { readonly psIndex: number; readonly source: PotentialIngestSource; readonly workflow: Awaited<ReturnType<PotentialIngestSource["workflow"]>>; readonly sessionEntryID: string; readonly sql: string; readonly issues: { readonly session_entry_id: string; readonly orch_session_issue_id: string; readonly issue_type: string; readonly issue_message: string; readonly invalid_value: string; }[]; }[]
protected
abstract
potentialSources: PotentialIngestSource[]
readonly
sqlPageNB: ReturnType<sp.SQLPageNotebook.create>

Methods

emitDiagnostics(): Promise<void>
emitResources(isc: OrchStepContext, ensureResult: Awaited<ReturnType<OrchEngine.prototype.ensureContent>>): Promise<void>
ensureContent(osc: OrchStepContext, ingestResult: Awaited<ReturnType<OrchEngine.prototype.ingest>>)

For all ingestions from the previous step that did not create any issues (meaning they were successfully ingested), prepare all cleansing, validation, transformation and other SQL and then execute entire SQL as a single DuckDB instance call.

The content SQL is separated from structural SQL to avoid syntax errors when a table or table column does not exist (which can happen if the format or structure of an ingested CSV, Excel, Parquet, or other source does not match our expectations).

ingest(osc: OrchStepContext)

Walk the root paths, find all types of files we can handle, generate ingestion SQL ("loading" part of ELT/ETL) and execute the SQL in a single DuckDB call. Then, for each successful execution (any ingestions that do not create issues in the issue table) prepare the list of subsequent steps for further cleansing, validation, transformations, etc.

The reason why we separate initial ingestion and structural validation from content validation, cleansing, and transformations is because content SQL relies on table names and table column names in CTEs and other SQL which must exist otherwise they cause syntax errors. Once we validate the structure and columns of ingested data, then the remainder of the SQL for cleansing, validation, or transformations will not cause syntax errors.

init(osc: OrchStepContext): Promise<void>

Initialize the DuckDB database by ensuring the admin tables such as tracking orchestration events (states), activities (which files are being loaded), ingest issues (errors, etc.), and related entities are created. If there are any errors during this process all other processing should stop and no other steps are executed.

prepareInit(_osc: OrchStepContext): Promise<void>

Prepare the DuckDB path/database for initialization. Typically this gives a chance for the path to the database to be created or removing the existing database in case we want to initialize from scratch.