diff --git a/CHANGELOG.md b/CHANGELOG.md index b9eb1a45c..916238567 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ ## [Unreleased] +### Added + +- New caching layer that wraps any `Reader` with an in-memory, writeable cache + backend (currently duckdb or sqlite), making write-constrained databases + usable and avoiding repeated remote reads during interactive iteration. + Memoized reads are bounded by a TTL and an LRU byte budget, configurable per + connection. The cache can be cleared mid-session with the `-- @uncache` meta-command. + ## 0.4.1 - 2026-06-22 ### Changed diff --git a/ggsql-cli/CLAUDE.md b/ggsql-cli/CLAUDE.md index c6ed59703..17dba8134 100644 --- a/ggsql-cli/CLAUDE.md +++ b/ggsql-cli/CLAUDE.md @@ -32,6 +32,8 @@ The binary name is `ggsql` (not `ggsql-cli`) — that's what release artifacts a Only public `ggsql::*` API is used (`reader`, `writer`, `validate`, `parser`, `VERSION`) — this crate has no awareness of internal modules. +`exec`/`run` build their reader via the library factory `ggsql::reader::connection::reader_from_uri`. They accept an in-memory caching layer (off by default) selected either by the composite connection scheme `+://…` (e.g. `odbc+duckdb://…`) or the `--cache ` flag; the two cannot be combined. + ## Build & install ```sh diff --git a/ggsql-cli/src/main.rs b/ggsql-cli/src/main.rs index d1b0cb0de..0a004fbce 100644 --- a/ggsql-cli/src/main.rs +++ b/ggsql-cli/src/main.rs @@ -34,10 +34,14 @@ pub enum Commands { /// The ggsql query to execute query: String, - /// Data source connection string (duckdb://, sqlite://, odbc://) + /// Data source connection string (duckdb://, sqlite://, odbc://). #[arg(long, default_value = "duckdb://memory")] reader: String, + /// In-memory cache backend wrapping the reader (duckdb, sqlite). Off by default. + #[arg(long)] + cache: Option, + /// Output format (vegalite) #[arg(long, default_value = "vegalite")] writer: String, @@ -56,10 +60,14 @@ pub enum Commands { /// Path to .sql file containing ggsql query file: PathBuf, - /// Data source connection string (duckdb://, sqlite://, odbc://) + /// Data source connection string (duckdb://, sqlite://, odbc://). #[arg(long, default_value = "duckdb://memory")] reader: String, + /// In-memory cache backend wrapping the reader (duckdb, sqlite). Off by default. + #[arg(long)] + cache: Option, + /// Output format (vegalite) #[arg(long, default_value = "vegalite")] writer: String, @@ -147,6 +155,7 @@ fn main() -> anyhow::Result<()> { Commands::Exec { query, reader, + cache, writer, output, verbose, @@ -154,12 +163,13 @@ fn main() -> anyhow::Result<()> { if verbose { eprintln!("Executing query: {}", query); } - cmd_exec(query, reader, writer, output, verbose); + cmd_exec(query, reader, cache, writer, output, verbose); } Commands::Run { file, reader, + cache, writer, output, verbose, @@ -167,7 +177,7 @@ fn main() -> anyhow::Result<()> { if verbose { eprintln!("Running query from file: {}", file.display()); } - cmd_run(file, reader, writer, output, verbose); + cmd_run(file, reader, cache, writer, output, verbose); } Commands::Parse { query, format } => { @@ -194,9 +204,16 @@ fn main() -> anyhow::Result<()> { Ok(()) } -fn cmd_run(file: PathBuf, reader: String, writer: String, output: Option, verbose: bool) { +fn cmd_run( + file: PathBuf, + reader: String, + cache: Option, + writer: String, + output: Option, + verbose: bool, +) { match std::fs::read_to_string(&file) { - Ok(query) => cmd_exec(query, reader, writer, output, verbose), + Ok(query) => cmd_exec(query, reader, cache, writer, output, verbose), Err(e) => { eprintln!("Failed to read file {}: {}", file.display(), e); std::process::exit(1); @@ -204,76 +221,64 @@ fn cmd_run(file: PathBuf, reader: String, writer: String, output: Option, verbose: bool) { +fn cmd_exec( + query: String, + reader: String, + cache: Option, + writer: String, + output: Option, + verbose: bool, +) { + use ggsql::reader::connection; + if verbose { eprintln!("Reader: {}", reader); + if let Some(ref cache) = cache { + eprintln!("Cache: {}", cache); + } eprintln!("Writer: {}", writer); if let Some(ref output_file) = output { eprintln!("Output: {}", output_file.display()); } } - if reader.starts_with("duckdb://") { - #[cfg(feature = "duckdb")] - { - let r = match ggsql::reader::DuckDBReader::from_connection_string(&reader) { - Ok(r) => r, - Err(e) => { - eprintln!("Failed to create reader: {}", e); - std::process::exit(1); - } - }; - exec_with_reader(&query, &r, &writer, output, verbose); - } - #[cfg(not(feature = "duckdb"))] - { - eprintln!("DuckDB reader not compiled in. Rebuild with --features duckdb"); - std::process::exit(1); - } - } else if reader.starts_with("sqlite://") { - #[cfg(feature = "sqlite")] - { - let r = match ggsql::reader::SqliteReader::from_connection_string(&reader) { - Ok(r) => r, - Err(e) => { - eprintln!("Failed to create reader: {}", e); - std::process::exit(1); + // Build the reader. A composite `+://` URI is handled by + // `reader_from_uri`; the `--cache` flag is an explicit alternative and may + // not be combined with a composite URI. + let built = match cache { + Some(cache_scheme) => { + if connection::split_cache_uri(&reader).is_some() { + eprintln!( + "Cannot combine --cache with a composite '+://' connection string" + ); + std::process::exit(1); + } + // `--cache ` is sugar for the composite `+://` URI. + match reader.split_once("://") { + Some((scheme, rest)) => { + connection::reader_from_uri(&format!("{scheme}+{cache_scheme}://{rest}")) } - }; - exec_with_reader(&query, &r, &writer, output, verbose); - } - #[cfg(not(feature = "sqlite"))] - { - eprintln!("SQLite reader not compiled in. Rebuild with --features sqlite"); - std::process::exit(1); - } - } else if reader.starts_with("odbc://") { - #[cfg(feature = "odbc")] - { - let r = match ggsql::reader::OdbcReader::from_connection_string(&reader) { - Ok(r) => r, - Err(e) => { - eprintln!("Failed to create reader: {}", e); + None => { + eprintln!("Invalid --reader connection string: {reader}"); std::process::exit(1); } - }; - exec_with_reader(&query, &r, &writer, output, verbose); + } } - #[cfg(not(feature = "odbc"))] - { - eprintln!("ODBC reader not compiled in. Rebuild with --features odbc"); + None => connection::reader_from_uri(&reader), + }; + + let reader = match built { + Ok(r) => r, + Err(e) => { + eprintln!("Failed to create reader: {}", e); std::process::exit(1); } - } else if reader.starts_with("postgres://") || reader.starts_with("postgresql://") { - eprintln!("PostgreSQL reader is not yet implemented"); - std::process::exit(1); - } else { - eprintln!("Unsupported connection string: {}", reader); - std::process::exit(1); - } + }; + + exec_with_reader(&query, reader.as_ref(), &writer, output, verbose); } -fn exec_with_reader( +fn exec_with_reader( query: &str, reader: &R, writer: &str, @@ -430,7 +435,7 @@ fn cmd_validate(query: String, _reader: Option) { } // Prints a CSV-like output to stdout with aligned columns -fn print_table_fallback(query: &str, reader: &R, max_rows: usize) { +fn print_table_fallback(query: &str, reader: &R, max_rows: usize) { let source_tree = match parser::SourceTree::new(query) { Ok(st) => st, Err(e) => { diff --git a/ggsql-jupyter/CLAUDE.md b/ggsql-jupyter/CLAUDE.md index adcb5ecd3..18868e3a9 100644 --- a/ggsql-jupyter/CLAUDE.md +++ b/ggsql-jupyter/CLAUDE.md @@ -32,7 +32,7 @@ ggsql-jupyter/ 1. `ggsql-jupyter --install` writes a kernelspec into the active Python environment (Jupyter, conda, uv, virtualenv — auto-detected). 2. `ggsql-jupyter ` is the entry point Jupyter invokes; it reads the connection JSON, opens the five ZMQ sockets (shell, control, iopub, stdin, heartbeat), and runs `kernel.rs`'s message loop. -3. Each `execute_request` is dispatched through `executor.rs` → `ggsql::reader::DuckDBReader::execute(...)`. The kernel keeps a single persistent in-memory DuckDB session so cells share state. +3. Each `execute_request` is dispatched through `executor.rs` → `ggsql::reader::DuckDBReader::execute(...)`. The kernel keeps a single persistent in-memory DuckDB session so cells share state. Readers are built via the library factory `ggsql::reader::connection::reader_from_uri`, so a composite `+://` connection string (e.g. via `-- @connect:`) wraps the reader in an in-memory caching layer — the persistent kernel session means repeated cells reuse cached remote reads. 4. The result is wrapped by `display.rs` into a Jupyter `display_data` message — Vega-Lite specs go through vega-embed in an HTML payload (works in classic Jupyter, JupyterLab, and Positron); pure SQL goes out as an HTML table. ## Positron-specific bits diff --git a/ggsql-jupyter/src/executor.rs b/ggsql-jupyter/src/executor.rs index 6f928e7ab..18f6e0116 100644 --- a/ggsql-jupyter/src/executor.rs +++ b/ggsql-jupyter/src/executor.rs @@ -2,13 +2,14 @@ //! //! This module handles the execution of ggsql queries using the existing //! ggsql library components (parser, DuckDB reader, Vega-Lite writer). -//! It supports dynamic reader switching via `-- @connect:` meta-commands. +//! It supports leading `--` meta-command lines. Each occupies its own comment +//! line, so a cell may stack them above a query that then runs as normal. use anyhow::Result; use ggsql::{ reader::{ - connection::{extract_odbc_value, parse_connection_string}, - DuckDBReader, Reader, + connection::{extract_odbc_value, reader_from_uri}, + Reader, }, validate::validate, writer::{VegaLiteWriter, Writer}, @@ -25,42 +26,7 @@ pub enum ExecutionResult { spec: String, // Vega-Lite JSON }, /// Connection changed via meta-command - ConnectionChanged { uri: String, display_name: String }, -} - -/// Create a reader from a connection URI string. -/// -/// Supported schemes: -/// - `duckdb://memory` or `duckdb://` (always available) -/// - `sqlite://` (requires `sqlite` feature) -/// - `odbc://...` (requires `odbc` feature) -pub fn create_reader(uri: &str) -> Result> { - use ggsql::reader::connection::ConnectionInfo; - - let info = parse_connection_string(uri)?; - match info { - ConnectionInfo::DuckDBMemory => { - let reader = DuckDBReader::from_connection_string("duckdb://memory")?; - Ok(Box::new(reader)) - } - ConnectionInfo::DuckDBFile(path) => { - let reader = DuckDBReader::from_connection_string(&format!("duckdb://{}", path))?; - Ok(Box::new(reader)) - } - #[cfg(feature = "odbc")] - ConnectionInfo::ODBC(conn_str) => { - let reader = - ggsql::reader::OdbcReader::from_connection_string(&format!("odbc://{}", conn_str))?; - Ok(Box::new(reader)) - } - #[cfg(feature = "sqlite")] - ConnectionInfo::SQLite(path) => { - let reader = - ggsql::reader::SqliteReader::from_connection_string(&format!("sqlite://{}", path))?; - Ok(Box::new(reader)) - } - _ => anyhow::bail!("Unsupported reader type for connection string: {}", uri), - } + ConnectionChanged { display_name: String }, } /// Generate a human-readable display name for a connection URI. @@ -136,13 +102,49 @@ pub fn host_for_uri(uri: &str) -> String { /// The `-- @connect:` meta-command prefix. const META_CONNECT_PREFIX: &str = "-- @connect:"; +/// The `-- @uncache` meta-command prefix. +const META_UNCACHE_PREFIX: &str = "-- @uncache"; + +/// A leading cell directive expressed as a `--` line comment. +#[derive(Debug, PartialEq, Eq)] +pub enum MetaCommand { + /// Switch the active reader to the given connection URI. + Connect(String), + /// Clear the active reader's cache. + Uncache, +} -/// Parse a `-- @connect: ` meta-command, returning the URI if present. -pub fn parse_meta_command(code: &str) -> Option { - let trimmed = code.trim(); - trimmed - .strip_prefix(META_CONNECT_PREFIX) - .map(|rest| rest.trim().to_string()) +/// Split `code` into its first line and the remainder. +/// Handles `\n`, `\r\n`, and a lone `\r`. +fn split_first_line(code: &str) -> (&str, &str) { + match code.find(['\n', '\r']) { + None => (code, ""), + Some(i) => { + let line = &code[..i]; + let rest = &code[i..]; + let rest = rest + .strip_prefix("\r\n") + .or_else(|| rest.strip_prefix('\n')) + .or_else(|| rest.strip_prefix('\r')) + .unwrap_or(rest); + (line, rest) + } + } +} + +/// Peel a single leading meta-command from `code`, returning it together with +/// the rest of the cell to process next. +pub fn take_leading_meta(code: &str) -> Option<(MetaCommand, &str)> { + let trimmed = code.trim_start(); + let (line, rest) = split_first_line(trimmed); + let line = line.trim(); + if let Some(uri) = line.strip_prefix(META_CONNECT_PREFIX) { + return Some((MetaCommand::Connect(uri.trim().to_string()), rest)); + } + if line == META_UNCACHE_PREFIX { + return Some((MetaCommand::Uncache, rest)); + } + None } /// Query executor maintaining persistent database connection @@ -156,7 +158,7 @@ impl QueryExecutor { /// Create a new query executor with a given connection URI pub fn new_with_uri(uri: &str) -> Result { tracing::info!("Initializing query executor with reader: {}", uri); - let reader = create_reader(uri)?; + let reader = reader_from_uri(uri)?; let writer = VegaLiteWriter::new(); Ok(Self { @@ -184,7 +186,7 @@ impl QueryExecutor { /// Swap the reader to a new connection, returning the old URI pub fn swap_reader(&mut self, uri: &str) -> Result { - let new_reader = create_reader(uri)?; + let new_reader = reader_from_uri(uri)?; self.reader = new_reader; let old_uri = std::mem::replace(&mut self.reader_uri, uri.to_string()); Ok(old_uri) @@ -193,18 +195,38 @@ impl QueryExecutor { /// Execute a ggsql query or meta-command /// /// This handles: - /// - `-- @connect: ` meta-commands for switching readers + /// - `-- @` meta-commands /// - Pure SQL queries (no VISUALISE) /// - ggsql queries with VISUALISE clauses pub fn execute(&mut self, code: &str) -> Result { tracing::debug!("Executing query: {} chars", code.len()); - // Check for meta-commands first - if let Some(uri) = parse_meta_command(code) { - tracing::info!("Meta-command: switching reader to {}", uri); - self.swap_reader(&uri)?; - let display_name = display_name_for_uri(&uri); - return Ok(ExecutionResult::ConnectionChanged { uri, display_name }); + // Apply any leading meta-command lines, then run whatever SQL remains. + let mut code = code; + let mut last_connect: Option = None; + while let Some((cmd, rest)) = take_leading_meta(code) { + match cmd { + MetaCommand::Connect(uri) => { + tracing::info!("Meta-command: switching reader to {}", uri); + self.swap_reader(&uri)?; + last_connect = Some(uri); + } + MetaCommand::Uncache => { + tracing::info!("Meta-command: clearing cache"); + self.reader.clear_cache()?; + } + } + code = rest; + } + + // A cell that was nothing but meta-commands. + if code.trim().is_empty() { + if let Some(uri) = last_connect { + let display_name = display_name_for_uri(&uri); + return Ok(ExecutionResult::ConnectionChanged { display_name }); + } + // An empty DataFrame renders no cell output. + return Ok(ExecutionResult::DataFrame(DataFrame::empty())); } // 1. Validate to check if there's a visualization @@ -212,7 +234,7 @@ impl QueryExecutor { // 2. Check if there's a visualization if !validated.has_visual() { - // Pure SQL query - execute directly and return DataFrame + // Pure SQL query - execute directly and return DataFrame. let df = self.reader.execute_sql(code)?; tracing::info!( "Pure SQL executed: {} rows, {} cols", @@ -273,16 +295,44 @@ mod tests { } #[test] - fn test_parse_meta_command() { + fn test_take_leading_meta_connect() { + // `-- @connect:` takes the rest of its line as the URI; the next line is + // the remainder. + assert_eq!( + take_leading_meta("-- @connect: duckdb://memory"), + Some((MetaCommand::Connect("duckdb://memory".to_string()), "")) + ); + assert_eq!( + take_leading_meta(" -- @connect: duckdb://my.db \nSELECT 1"), + Some(( + MetaCommand::Connect("duckdb://my.db".to_string()), + "SELECT 1" + )) + ); + } + + #[test] + fn test_take_leading_meta_uncache() { + assert_eq!( + take_leading_meta("-- @uncache"), + Some((MetaCommand::Uncache, "")) + ); assert_eq!( - parse_meta_command("-- @connect: duckdb://memory"), - Some("duckdb://memory".to_string()) + take_leading_meta("-- @uncache\nSELECT 1"), + Some((MetaCommand::Uncache, "SELECT 1")) ); assert_eq!( - parse_meta_command(" -- @connect: duckdb://my.db "), - Some("duckdb://my.db".to_string()) + take_leading_meta("-- @uncache \r\nSELECT 1"), + Some((MetaCommand::Uncache, "SELECT 1")) ); - assert_eq!(parse_meta_command("SELECT 1"), None); + // `-- @uncache foo` on one line is an ordinary SQL comment, not the directive. + assert_eq!(take_leading_meta("-- @uncache foo"), None); + } + + #[test] + fn test_take_leading_meta_non_directive() { + assert_eq!(take_leading_meta("SELECT 1"), None); + assert_eq!(take_leading_meta("-- a normal comment\nSELECT 1"), None); } #[test] @@ -294,6 +344,42 @@ mod tests { assert!(matches!(result, ExecutionResult::ConnectionChanged { .. })); } + #[test] + fn test_connect_then_runs_remaining_query() { + // A leading `-- @connect:` switches the reader and still runs the query + // below it in the same cell. + let mut executor = QueryExecutor::new().unwrap(); + let result = executor + .execute( + "-- @connect: duckdb://memory\nSELECT 1 AS x, 2 AS y VISUALISE x, y DRAW point", + ) + .unwrap(); + assert_eq!(executor.reader_uri(), "duckdb://memory"); + assert!(matches!(result, ExecutionResult::Visualization { .. })); + } + + #[test] + fn test_uncache_meta_command_clears_cache() { + // On the default reader (no cache) `clear_cache` is a no-op; this proves + // the dispatch arm is wired and yields an empty DataFrame. + let mut executor = QueryExecutor::new().unwrap(); + let result = executor.execute("-- @uncache").unwrap(); + match result { + ExecutionResult::DataFrame(df) => assert_eq!(df.width(), 0), + other => panic!("expected empty DataFrame, got {other:?}"), + } + } + + #[test] + fn test_uncache_then_runs_remaining_query() { + // A leading `-- @uncache` clears the cache and still runs the query below. + let mut executor = QueryExecutor::new().unwrap(); + let result = executor + .execute("-- @uncache\nSELECT 1 AS x, 2 AS y VISUALISE x, y DRAW point") + .unwrap(); + assert!(matches!(result, ExecutionResult::Visualization { .. })); + } + #[test] fn test_display_name_for_uri() { assert_eq!(display_name_for_uri("duckdb://memory"), "DuckDB (memory)"); diff --git a/ggsql-jupyter/src/kernel.rs b/ggsql-jupyter/src/kernel.rs index b9844c602..85e82d0ad 100644 --- a/ggsql-jupyter/src/kernel.rs +++ b/ggsql-jupyter/src/kernel.rs @@ -323,21 +323,24 @@ impl KernelServer { } // Execute the query + let uri_before = self.executor.reader_uri().to_string(); let result = self.executor.execute(code); + let connection_changed = self.executor.reader_uri() != uri_before; + if connection_changed { + let uri = self.executor.reader_uri().to_string(); + self.open_connection_comm(&uri).await?; + } match result { Ok(exec_result) => { - // If the connection changed, open a new connection comm - let is_connection_changed = + // A bare connection change renders nothing. + let suppress_output = matches!(&exec_result, ExecutionResult::ConnectionChanged { .. }); - if let ExecutionResult::ConnectionChanged { ref uri, .. } = &exec_result { - self.open_connection_comm(uri).await?; - } // Send execute_result (not display_data) // Per Jupyter spec: execute_result includes execution_count // Only send if there's something to display (DDL returns None) - if !silent && !is_connection_changed { + if !silent && !suppress_output { if let Some(display_data) = format_display_data(exec_result, &hints) { // Build message content, including output_location if present let mut content = json!({ diff --git a/src/CLAUDE.md b/src/CLAUDE.md index b9b68dc91..321a2fdcf 100644 --- a/src/CLAUDE.md +++ b/src/CLAUDE.md @@ -46,11 +46,14 @@ Grammar lives in [`/tree-sitter-ggsql/`](../tree-sitter-ggsql/) — when adding | `duckdb.rs` | DuckDB (in-memory or file) | `duckdb` (default) | | `sqlite.rs` | SQLite | `sqlite` (default) | | `odbc.rs` | ODBC | `odbc` (default) | +| `cache.rs` | `CachingReader` — wraps any primary `Reader` with an in-memory cache | `duckdb` or `sqlite` | | `connection.rs` | Connection-string parsing for all of the above | — | | `data.rs`, `spec.rs` | `Spec` type returned by `execute()`, plus DataFrame conversion | — | `SqlDialect` trait in `mod.rs` lets each driver supply its own type names, information-schema queries, and spatial helper methods (`sql_st_transform`, `sql_geometry_to_wkb`, `sql_geometry_bbox`, `sql_ensure_geometry`, `sql_select_replace`, `sql_spatial_setup`). +**Caching layer.** `CachingReader` (`cache.rs`) wraps a primary reader plus an in-memory `CacheBackend`, splitting work across two `Reader` surfaces. **`execute_sql` = source**: base reads of the user's data plus user setup/DML run on the primary (with result memoization), except `ggsql:` builtins, the `__ggsql_cache_meta__` table, and reads that reference a cache-resident internal table, which go to the cache. **`execute_sql_cached` = compute**: all dialect-generated/derived SQL (schema probes, stats, projection/map transforms, spatial setup, final layer queries — everything operating on `__ggsql_*` tables) runs on the cache; it defaults to `execute_sql` so a plain reader runs everything on one connection. Cache routing is by **exact-identifier membership** in the set of tables registered into the cache. Memoization keys on `hash(primary_uri + sql)` and is tracked in the `__ggsql_cache_meta__` table inside the cache backend. Each memoized read is bounded by a **TTL** (default 300s) and the whole memo by an **LRU byte budget** (default 512 MB); both are configurable via `CacheConfig` (env `GGSQL_CACHE_DISABLED`/`GGSQL_CACHE_TTL`/`GGSQL_CACHE_MAX_BYTES`, or per-connection URI query parameters `?ttl=…&max_bytes=…&disabled=…`. The `__ggsql_cache_meta__` table is queryable for introspection (`SELECT * FROM __ggsql_cache_meta__`). Pure/non-visual SQL (CLI table fallback, Jupyter) goes through `execute_sql` so it reads the primary rather than the empty cache. `Reader::materialize_table` (default = `CREATE TEMP TABLE` on the reader, no Rust roundtrip) is overridden to read the body via the source surface and `register()` the result into the cache, so the primary is never written to; `Reader::caches_sources()` (default `false`, `true` for `CachingReader`) gates the executor's per-layer source staging: file sources are staged on the cache surface, while identifiers go through `materialize_table`, which routes the read to the cache (CTEs, builtins, cache-resident tables) or the primary as needed. `dialect()` returns the **cache** dialect. Selected via the composite `+://` scheme (`reader_from_uri` / `split_cache_uri`) or the CLI `--cache` flag; off by default. + ### `execute/` The pipeline that takes a parsed `Plot` plus a `Reader` and produces a fully-resolved `Spec` (typed data per layer, scales resolved, casts applied). Submodules: diff --git a/src/execute/cte.rs b/src/execute/cte.rs index 9b544988d..e843fa074 100644 --- a/src/execute/cte.rs +++ b/src/execute/cte.rs @@ -146,16 +146,11 @@ pub fn materialize_ctes(ctes: &[CteDefinition], reader: &dyn Reader) -> Result Result { - let execute_query = |sql: &str| reader.execute_sql(sql); + // `execute_query` is the COMPUTE surface for derived/dialect-generated SQL + // over internal `__ggsql_*` tables. Base source reads (user setup/DML, the + // global query) call `reader.execute_sql(...)` directly. + let execute_query = |sql: &str| reader.execute_sql_cached(sql); let dialect = reader.dialect(); // Parse once and create SourceTree @@ -1129,7 +1132,7 @@ pub fn prepare_data_with_reader(query: &str, reader: &dyn Reader) -> Result Result Result = specs[0] + let mut layer_source_queries: Vec = specs[0] .layers .iter_mut() .map(|l| layer::layer_source_query(l, &materialized_ctes, has_global_table, dialect)) .collect::>>()?; + // When the reader stages sources into a cache (a caching layer is active), + // materialize explicit external layer sources into the cache and rewrite the + // layer to read the cached table. + if reader.caches_sources() { + let mut materialized_sources: HashMap = HashMap::new(); + for (idx, layer) in specs[0].layers.iter().enumerate() { + let is_external = match &layer.source { + Some(crate::DataSource::Identifier(name)) => !materialized_ctes.contains(name), + Some(crate::DataSource::FilePath(_)) => true, + _ => false, + }; + if !is_external { + continue; + } + let source_query = layer_source_queries[idx].clone(); + let table = match materialized_sources.get(&source_query) { + Some(t) => t.clone(), + None => { + let t = naming::layer_source_table(materialized_sources.len()); + if matches!(layer.source, Some(crate::DataSource::FilePath(_))) { + // The cache backend reads local files + let df = reader.execute_sql_cached(&source_query)?; + reader.register(&t, df, true)?; + } else { + reader.materialize_table(&t, &[], &source_query)?; + } + materialized_sources.insert(source_query, t.clone()); + t + } + }; + layer_source_queries[idx] = format!("SELECT * FROM {}", naming::quote_ident(&table)); + } + } + // Get types for each layer from source queries (Phase 1: types only, no min/max yet) let mut layer_type_info: Vec> = Vec::new(); for source_query in &layer_source_queries { diff --git a/src/naming.rs b/src/naming.rs index 6259f5bd3..dfcf6d5e4 100644 --- a/src/naming.rs +++ b/src/naming.rs @@ -85,6 +85,9 @@ pub const ORDER_COLUMN: &str = concatcp!(GGSQL_PREFIX, "order", GGSQL_SUFFIX); /// Used with Vega-Lite filter transforms to select per-layer data. pub const SOURCE_COLUMN: &str = concatcp!(GGSQL_PREFIX, "source", GGSQL_SUFFIX); +/// Name of the caching layer's metadata table, held in the cache backend. +pub const CACHE_META_TABLE: &str = concatcp!(GGSQL_PREFIX, "cache_meta", GGSQL_SUFFIX); + /// Alias for schema extraction queries pub const SCHEMA_ALIAS: &str = concatcp!(GGSQL_SUFFIX, "schema", GGSQL_SUFFIX); @@ -137,6 +140,49 @@ pub fn cte_table(cte_name: &str) -> String { ) } +/// Generate temp table name for a memoized primary query result in a cache. +/// +/// The cache key is a hex hash of the primary URI and SQL. +/// Format: `__ggsql_cache____` +/// +/// # Example +/// ``` +/// use ggsql::naming; +/// let table = naming::cache_result_table("deadbeef"); +/// assert!(table.starts_with("__ggsql_cache_")); +/// assert!(table.ends_with("_deadbeef__")); +/// ``` +pub fn cache_result_table(key: &str) -> String { + format!( + "{}cache_{}_{}{}", + GGSQL_PREFIX, + session_id(), + key, + GGSQL_SUFFIX + ) +} + +/// Generate temp table name for a materialized explicit layer source. +/// +/// Format: `__ggsql_layer_src____` +/// +/// # Example +/// ``` +/// use ggsql::naming; +/// let table = naming::layer_source_table(2); +/// assert!(table.starts_with("__ggsql_layer_src_2_")); +/// assert!(table.ends_with("__")); +/// ``` +pub fn layer_source_table(index: usize) -> String { + format!( + "{}layer_src_{}_{}{}", + GGSQL_PREFIX, + index, + session_id(), + GGSQL_SUFFIX + ) +} + /// Generate table name for a builtin dataset. /// /// Used when rewriting `ggsql:penguins` to the internal table name. @@ -494,6 +540,7 @@ mod tests { assert_eq!(ORDER_COLUMN, "__ggsql_order__"); assert_eq!(SOURCE_COLUMN, "__ggsql_source__"); assert_eq!(SCHEMA_ALIAS, "__schema__"); + assert_eq!(CACHE_META_TABLE, "__ggsql_cache_meta__"); } #[test] diff --git a/src/reader/adbc.rs b/src/reader/adbc.rs index 8de00b01d..203d31e26 100644 --- a/src/reader/adbc.rs +++ b/src/reader/adbc.rs @@ -687,71 +687,7 @@ mod equivalence_tests { .expect("SqliteReader at the same path") } - /// Compare two DataFrames by schema (field names + types) and by - /// per-column Arrow array contents. We don't use a blanket - /// `assert_eq!(df, df)` because `DataFrame` doesn't implement `PartialEq`; - /// going through schema + per-column equality is also more diagnostic - /// when one of them diverges. - fn assert_dataframes_equal( - adbc_df: &crate::DataFrame, - sqlite_df: &crate::DataFrame, - ctx: &str, - ) { - let adbc_schema = adbc_df.schema(); - let sqlite_schema = sqlite_df.schema(); - assert_eq!( - adbc_schema.fields().len(), - sqlite_schema.fields().len(), - "{}: column count mismatch (adbc={}, sqlite={})", - ctx, - adbc_schema.fields().len(), - sqlite_schema.fields().len(), - ); - for (i, (a, s)) in adbc_schema - .fields() - .iter() - .zip(sqlite_schema.fields().iter()) - .enumerate() - { - assert_eq!( - a.name(), - s.name(), - "{}: column {} name mismatch (adbc='{}', sqlite='{}')", - ctx, - i, - a.name(), - s.name(), - ); - assert_eq!( - a.data_type(), - s.data_type(), - "{}: column '{}' type mismatch (adbc={:?}, sqlite={:?})", - ctx, - a.name(), - a.data_type(), - s.data_type(), - ); - } - assert_eq!( - adbc_df.height(), - sqlite_df.height(), - "{}: row count mismatch (adbc={}, sqlite={})", - ctx, - adbc_df.height(), - sqlite_df.height(), - ); - for field in adbc_schema.fields() { - let a = adbc_df.column(field.name()).unwrap(); - let s = sqlite_df.column(field.name()).unwrap(); - assert_eq!( - a.as_ref(), - s.as_ref(), - "{}: column '{}' data mismatch", - ctx, - field.name(), - ); - } - } + use crate::reader::test_support::assert_dataframes_equal; #[test] #[ignore = "requires `dbc install sqlite`; see module docs"] diff --git a/src/reader/cache.rs b/src/reader/cache.rs new file mode 100644 index 000000000..7b7a14ce9 --- /dev/null +++ b/src/reader/cache.rs @@ -0,0 +1,1130 @@ +//! Caching reader: any primary [`Reader`] + an in-memory writeable cache. +//! +//! [`CachingReader`] wraps a `primary` reader and an in-memory `cache` backend, +//! splitting work across two surfaces: +//! +//! - **Source** ([`Reader::execute_sql`]) reads the primary: base reads of the +//! user's data, plus user setup/DML. +//! - **Compute** ([`Reader::execute_sql_cached`]) runs on the cache: all derived, +//! dialect-generated SQL operates on the `__ggsql_*` tables. +//! - [`Reader::materialize_table`] reads a body via the source surface and +//! `register`s the result into the cache. +//! - [`Reader::dialect`] returns the cache dialect. + +use crate::array_util::{as_i64, as_str}; +use crate::reader::{execute_with_reader, ColumnInfo, Reader, Spec, SqlDialect, TableInfo}; +use crate::{naming, DataFrame, Result}; +use arrow::array::Array; +use std::cell::{Cell, RefCell}; +use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; +use std::hash::Hasher; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Runtime configuration for the result memo: TTL and LRU byte-budget. +#[derive(Debug, Clone)] +pub struct CacheConfig { + /// When `false`, reads always hit the primary. + pub enabled: bool, + /// Entries older than this are treated as misses and re-fetched. + pub ttl_secs: u64, + /// Cumulative byte budget across all memo entries before LRU eviction. + pub max_bytes: u64, +} + +impl Default for CacheConfig { + fn default() -> Self { + Self { + enabled: true, + ttl_secs: 300, + max_bytes: 512 * 1024 * 1024, + } + } +} + +/// Per-field overrides applied on top of an env-derived [`CacheConfig`]. +#[derive(Debug, Clone, Default)] +pub struct CacheConfigOverride { + pub enabled: Option, + pub ttl_secs: Option, + pub max_bytes: Option, +} + +impl CacheConfig { + /// Read configuration from the environment, falling back to defaults. + /// + /// - `GGSQL_CACHE_DISABLED` — set, non-empty and not `0` disables the cache. + /// - `GGSQL_CACHE_TTL` — TTL in seconds. + /// - `GGSQL_CACHE_MAX_BYTES` — byte budget, accepts `512mb`/`1gb`/bytes. + pub fn from_env() -> Self { + let mut cfg = Self::default(); + if std::env::var("GGSQL_CACHE_DISABLED") + .ok() + .filter(|v| !v.is_empty() && v != "0") + .is_some() + { + cfg.enabled = false; + } + if let Ok(v) = std::env::var("GGSQL_CACHE_TTL") { + if let Ok(secs) = v.trim().parse::() { + cfg.ttl_secs = secs; + } + } + if let Ok(v) = std::env::var("GGSQL_CACHE_MAX_BYTES") { + if let Some(bytes) = parse_human_bytes(&v) { + cfg.max_bytes = bytes; + } + } + cfg + } + + /// Apply per-field overrides; each `Some` value wins over `self`. + pub fn merge(self, over: CacheConfigOverride) -> Self { + Self { + enabled: over.enabled.unwrap_or(self.enabled), + ttl_secs: over.ttl_secs.unwrap_or(self.ttl_secs), + max_bytes: over.max_bytes.unwrap_or(self.max_bytes), + } + } +} + +/// Parse a byte count, accepting an optional `kb`/`mb`/`gb` suffix. +pub fn parse_human_bytes(s: &str) -> Option { + let s = s.trim(); + let lower = s.to_ascii_lowercase(); + let (num, mult) = if let Some(n) = lower.strip_suffix("gb") { + (n, 1024 * 1024 * 1024) + } else if let Some(n) = lower.strip_suffix("mb") { + (n, 1024 * 1024) + } else if let Some(n) = lower.strip_suffix("kb") { + (n, 1024) + } else { + (lower.as_str(), 1) + }; + num.trim().parse::().ok().map(|n| n * mult) +} + +/// Wall-clock milliseconds since the UNIX epoch. On a clock earlier than the +/// epoch (a misconfigured system clock) we return `i64::MAX` so existing +/// entries appear ancient and force a re-fetch. +fn now_ms() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(i64::MAX) +} + +/// Estimate the in-memory size of a DataFrame as the sum of its Arrow columns' +/// memory footprints. +fn estimate_bytes(df: &DataFrame) -> i64 { + df.get_columns() + .iter() + .map(|col| col.get_array_memory_size()) + .sum::() as i64 +} + +/// One memo row, trimmed to the fields consulted at runtime. +struct MemoEntry { + table_name: String, + fetched_at_epoch_ms: i64, +} + +pub struct CachingReader { + /// Primary backend — the real data source. + primary: Box, + /// In-memory writeable cache: derived tables, registered data, memoized reads. + cache: Box, + /// Connection URI of the primary. + primary_uri: String, + /// TTL + byte-budget configuration for the result memo. + config: CacheConfig, + /// Whether the metadata table has been created in the cache backend. + meta_ready: Cell, + /// Names registered into the cache. A source read that references one + /// is routed to the cache rather than the primary. + resident: RefCell>, +} + +impl CachingReader { + /// Construct a `CachingReader` from a primary reader, an in-memory cache + /// backend, and the primary's connection URI, using environment-derived + /// cache configuration. The cache is owned by the `CachingReader` and + /// dropped with it. + pub fn new( + primary: Box, + cache: Box, + primary_uri: impl Into, + ) -> Self { + Self::with_config(primary, cache, primary_uri, CacheConfig::from_env()) + } + + /// Construct a `CachingReader` with explicit cache configuration. + pub fn with_config( + primary: Box, + cache: Box, + primary_uri: impl Into, + config: CacheConfig, + ) -> Self { + Self { + primary, + cache, + primary_uri: primary_uri.into(), + config, + meta_ready: Cell::new(false), + resident: RefCell::new(HashSet::new()), + } + } + + /// The active cache configuration. + pub fn cache_config(&self) -> &CacheConfig { + &self.config + } + + /// Derive a stable cache key from the primary URI and the SQL text. + fn cache_key(&self, sql: &str) -> String { + let mut hasher = DefaultHasher::new(); + hasher.write(self.primary_uri.as_bytes()); + hasher.write(b"\n"); + hasher.write(sql.as_bytes()); + format!("{:016x}", hasher.finish()) + } + + /// Create the metadata table in the cache backend if it doesn't exist yet. + fn ensure_meta_table(&self) -> Result<()> { + if self.meta_ready.get() { + return Ok(()); + } + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} (\ + cache_key VARCHAR PRIMARY KEY, sql VARCHAR NOT NULL, table_name VARCHAR NOT NULL, \ + fetched_at_epoch_ms BIGINT NOT NULL, last_accessed_epoch_ms BIGINT NOT NULL, \ + byte_estimate BIGINT NOT NULL, row_count BIGINT NOT NULL)", + naming::quote_ident(naming::CACHE_META_TABLE) + ); + self.cache.execute_sql(&sql)?; + self.meta_ready.set(true); + Ok(()) + } + + /// Look up the memo entry for `key`. + fn lookup_memo(&self, key: &str) -> Result> { + let sql = format!( + "SELECT table_name, fetched_at_epoch_ms FROM {} WHERE cache_key = {}", + naming::quote_ident(naming::CACHE_META_TABLE), + naming::quote_literal(key), + ); + let df = self.cache.execute_sql(&sql)?; + if df.height() == 0 { + return Ok(None); + } + let table_name = as_str(df.column("table_name")?)?.value(0).to_string(); + let fetched_at_epoch_ms = as_i64(df.column("fetched_at_epoch_ms")?)?.value(0); + Ok(Some(MemoEntry { + table_name, + fetched_at_epoch_ms, + })) + } + + /// Record a memoized read in the metadata table. + fn insert_memo( + &self, + key: &str, + sql: &str, + table: &str, + byte_estimate: i64, + row_count: i64, + ) -> Result<()> { + let now = now_ms(); + let stmt = format!( + "INSERT OR REPLACE INTO {} \ + (cache_key, sql, table_name, fetched_at_epoch_ms, last_accessed_epoch_ms, \ + byte_estimate, row_count) \ + VALUES ({}, {}, {}, {}, {}, {}, {})", + naming::quote_ident(naming::CACHE_META_TABLE), + naming::quote_literal(key), + naming::quote_literal(sql), + naming::quote_literal(table), + now, + now, + byte_estimate, + row_count, + ); + self.cache.execute_sql(&stmt)?; + Ok(()) + } + + /// Advance the last-accessed timestamp for `key` (LRU bookkeeping). + fn touch(&self, key: &str) -> Result<()> { + let stmt = format!( + "UPDATE {} SET last_accessed_epoch_ms = {} WHERE cache_key = {}", + naming::quote_ident(naming::CACHE_META_TABLE), + now_ms(), + naming::quote_literal(key), + ); + self.cache.execute_sql(&stmt)?; + Ok(()) + } + + /// Drop a single memo entry: delete its meta row and unregister the table. + fn drop_entry(&self, key: &str, table: &str) -> Result<()> { + let del = format!( + "DELETE FROM {} WHERE cache_key = {}", + naming::quote_ident(naming::CACHE_META_TABLE), + naming::quote_literal(key), + ); + self.cache.execute_sql(&del)?; + let _ = self.cache.unregister(table); + Ok(()) + } + + /// Evict LRU entries until the cumulative byte estimate is within `max_bytes`. + fn evict_over_budget(&self) -> Result<()> { + // Cast the SUM to BIGINT. + let sum_sql = format!( + "SELECT CAST(COALESCE(SUM(byte_estimate), 0) AS BIGINT) AS n FROM {}", + naming::quote_ident(naming::CACHE_META_TABLE) + ); + loop { + let df = self.cache.execute_sql(&sum_sql)?; + let total = if df.height() == 0 { + 0 + } else { + as_i64(df.column("n")?)?.value(0) + }; + if total <= self.config.max_bytes as i64 { + return Ok(()); + } + let pick = format!( + "SELECT cache_key, table_name FROM {} ORDER BY last_accessed_epoch_ms ASC LIMIT 1", + naming::quote_ident(naming::CACHE_META_TABLE) + ); + let df = self.cache.execute_sql(&pick)?; + if df.height() == 0 { + return Ok(()); + } + let key = as_str(df.column("cache_key")?)?.value(0).to_string(); + let table = as_str(df.column("table_name")?)?.value(0).to_string(); + self.drop_entry(&key, &table)?; + } + } + + /// Whether `sql` references a cache-resident table by exact name. + /// + /// Parses the query's `table_ref` targets and tests them against `resident`. + /// On a parse failure we conservatively return `false`. + fn references_resident(&self, sql: &str) -> bool { + let Ok(refs) = super::data::extract_table_refs(sql) else { + return false; + }; + let resident = self.resident.borrow(); + refs.iter().any(|t| resident.contains(t)) + } + + /// Drop every memoized result, one entry at a time. + fn clear_memo(&self) -> Result<()> { + self.ensure_meta_table()?; + let df = self.cache.execute_sql(&format!( + "SELECT cache_key, table_name FROM {}", + naming::quote_ident(naming::CACHE_META_TABLE) + ))?; + let n = df.height(); + if n == 0 { + return Ok(()); + } + let keys = as_str(df.column("cache_key")?)?; + let tables = as_str(df.column("table_name")?)?; + let mut failures: Vec = Vec::new(); + for i in 0..n { + let key = keys.value(i); + let table = tables.value(i); + if let Err(e) = self.drop_entry(key, table) { + failures.push(format!("{key}: {e}")); + } + } + if !failures.is_empty() { + return Err(crate::GgsqlError::ReaderError(format!( + "clear_cache: {} cache entries failed to drop: {}", + failures.len(), + failures.join("; ") + ))); + } + Ok(()) + } +} + +impl Reader for CachingReader { + /// Source surface: base reads of the user's data (plus user setup/DML). + fn execute_sql(&self, sql: &str) -> Result { + // Route to the cache when the read targets cache-resident objects, the + // metadata table, or a builtin dataset. + if sql.contains("ggsql:") + || sql.contains(naming::CACHE_META_TABLE) + || self.references_resident(sql) + { + return self.cache.execute_sql(sql); + } + + self.ensure_meta_table()?; + + // With caching disabled the memo is never consulted or written. + if !self.config.enabled { + return self.primary.execute_sql(sql); + } + + let key = self.cache_key(sql); + + // Serve a fresh, still-present entry; on a stale or vanished entry drop + // it and fall through to a primary re-fetch. + if let Some(entry) = self.lookup_memo(&key)? { + let age_ms = (now_ms() - entry.fetched_at_epoch_ms).max(0); + let ttl_ms = (self.config.ttl_secs as i64).saturating_mul(1000); + if age_ms < ttl_ms { + let select = format!("SELECT * FROM {}", naming::quote_ident(&entry.table_name)); + if let Ok(df) = self.cache.execute_sql(&select) { + self.touch(&key)?; + return Ok(df); + } + } + self.drop_entry(&key, &entry.table_name)?; + } + + let df = self.primary.execute_sql(sql)?; + + // Cache row-returning reads only. + if super::returns_rows(sql) && df.width() > 0 { + let table = naming::cache_result_table(&key); + let byte_estimate = estimate_bytes(&df); + let row_count = df.height() as i64; + self.cache.register(&table, df.clone(), true)?; + if let Err(e) = self.insert_memo(&key, sql, &table, byte_estimate, row_count) { + // The result table registered but the meta row didn't: drop the + // orphan so it can't leak, then surface the error. + let _ = self.cache.unregister(&table); + return Err(e); + } + self.evict_over_budget()?; + } + + Ok(df) + } + + /// Compute surface: derived/dialect-generated SQL runs on the cache. + fn execute_sql_cached(&self, sql: &str) -> Result { + self.cache.execute_sql(sql) + } + + fn register(&self, name: &str, df: DataFrame, replace: bool) -> Result<()> { + self.cache.register(name, df, replace)?; + self.resident.borrow_mut().insert(name.to_string()); + Ok(()) + } + + fn unregister(&self, name: &str) -> Result<()> { + self.cache.unregister(name)?; + self.resident.borrow_mut().remove(name); + Ok(()) + } + + fn execute(&self, query: &str) -> Result { + execute_with_reader(self, query) + } + + fn dialect(&self) -> &dyn SqlDialect { + // All executor-generated SQL targets cache-resident tables. + self.cache.dialect() + } + + fn materialize_table( + &self, + name: &str, + column_aliases: &[String], + body_sql: &str, + ) -> Result<()> { + // Read the body via the source surface, then register the result + // into the cache. + let body = super::wrap_with_column_aliases(body_sql, column_aliases); + let df = self.execute_sql(&body)?; + self.register(name, df, true) + } + + fn caches_sources(&self) -> bool { + true + } + + fn clear_cache(&self) -> Result<()> { + self.clear_memo() + } + + // Schema introspection describes the real data source, so delegate to the + // primary; the cache only holds synthetic `__ggsql_*` tables. + fn list_catalogs(&self) -> Result> { + self.primary.list_catalogs() + } + + fn list_schemas(&self, catalog: &str) -> Result> { + self.primary.list_schemas(catalog) + } + + fn list_tables(&self, catalog: &str, schema: &str) -> Result> { + self.primary.list_tables(catalog, schema) + } + + fn list_columns(&self, catalog: &str, schema: &str, table: &str) -> Result> { + self.primary.list_columns(catalog, schema, table) + } +} + +#[cfg(all(test, feature = "duckdb"))] +mod behavior_tests { + use super::*; + use crate::array_util::as_i64; + use crate::df; + use crate::reader::test_support::{ReadOnlyReader, SpyReader}; + use crate::reader::{CacheBackend, DuckDBReader}; + + #[test] + fn test_register_writes_to_cache_and_query_routes_there() { + let (primary, log) = SpyReader::wrap(Box::new(DuckDBReader::new_in_memory().unwrap())); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + reader + .register("t", df! { "x" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + // register writes to the cache; the compute surface reads it back. + let out = reader + .execute_sql_cached("SELECT COUNT(*) AS n FROM t") + .unwrap(); + + assert_eq!(as_i64(out.column("n").unwrap()).unwrap().value(0), 3); + // The primary was never touched. + assert!(log.lock().unwrap().is_empty()); + } + + #[test] + fn test_source_read_hits_primary_and_memoizes() { + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let q = "SELECT y FROM base ORDER BY y"; + let d1 = reader.execute_sql(q).unwrap(); + let d2 = reader.execute_sql(q).unwrap(); + assert_eq!(d1.height(), 3); + assert_eq!(d2.height(), 3); + + // The primary executed the base read exactly once; the repeat was served + // from the cache memo. + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 1); + } + + #[test] + fn test_full_execute_keeps_computation_off_primary() { + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register( + "sales", + df! { "x" => vec![1_i64, 2, 3, 4], "y" => vec![10_i64, 20, 30, 40] }.unwrap(), + true, + ) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + reader + .execute("SELECT x, y FROM sales VISUALISE x, y DRAW point") + .unwrap(); + + let log = log.lock().unwrap(); + // The primary is only ever read from: no temp-table DDL, no derived + // `__ggsql_*` tables ever reach it. + assert!( + log.iter().all(|s| !s.to_uppercase().contains("TEMP TABLE")), + "primary must not be written to: {:?}", + *log + ); + assert!( + log.iter().all(|s| !s.contains("__ggsql_")), + "primary must not see derived tables: {:?}", + *log + ); + // It did see the base read. + assert!(log.iter().any(|s| s.contains("sales"))); + } + + #[test] + fn test_caching_makes_read_only_primary_usable() { + // The read-only-primary value proposition, exercised in every DuckDB + // build (including duckdb-only CI): a primary that refuses all writes is + // unusable on its own — materializing the global temp table fails — but + // works once wrapped in a caching layer, because every write goes to the + // cache and the primary is only read. + let query = "SELECT v FROM t VISUALISE v AS x DRAW histogram"; + + // Bare read-only primary: materialization must fail. + let bare_primary = DuckDBReader::new_in_memory().unwrap(); + bare_primary + .register( + "t", + df! { "v" => vec![1.0_f64, 2.0, 3.0, 4.0] }.unwrap(), + true, + ) + .unwrap(); + let bare = ReadOnlyReader::new(Box::new(bare_primary)); + assert!( + bare.execute(query).is_err(), + "a read-only primary with no cache must fail to materialize" + ); + + // Same read-only primary behind a cache: must succeed. + let primary = DuckDBReader::new_in_memory().unwrap(); + primary + .register( + "t", + df! { "v" => vec![1.0_f64, 2.0, 3.0, 4.0] }.unwrap(), + true, + ) + .unwrap(); + let cached = CachingReader::new( + Box::new(ReadOnlyReader::new(Box::new(primary))), + Box::new(DuckDBReader::new_in_memory().unwrap()), + "test://primary", + ); + assert!( + cached.execute(query).is_ok(), + "caching should make a read-only primary usable" + ); + } + + #[test] + fn test_no_cache_path_materializes_on_the_reader() { + // A plain reader (no CachingReader) must keep today's behavior: + // derived tables are materialized on the reader itself. + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register( + "sales", + df! { "x" => vec![1_i64, 2, 3], "y" => vec![10_i64, 20, 30] }.unwrap(), + true, + ) + .unwrap(); + let (reader, log) = SpyReader::wrap(Box::new(inner)); + + reader + .execute("SELECT x, y FROM sales VISUALISE x, y DRAW point") + .unwrap(); + + assert!( + log.lock() + .unwrap() + .iter() + .any(|s| s.to_uppercase().contains("TEMP TABLE")), + "default path must materialize on the reader" + ); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_dialect_returns_cache_dialect() { + use crate::reader::SqliteReader; + // SQLite primary, DuckDB cache: dialect() must return DuckDB's (native + // GREATEST), not SQLite's (CASE fallback). + let primary = Box::new(SqliteReader::new().unwrap()); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + assert_eq!(reader.dialect().sql_greatest(&["a", "b"]), "GREATEST(a, b)"); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_explicit_layer_source_with_stat_heterogeneous() { + use crate::reader::SqliteReader; + // SQLite primary holds the table; DuckDB is the cache. A layer draws a + // histogram from the primary table, which generates DuckDB-dialect stat + // SQL. This only works because the layer source is materialized into the + // cache; otherwise DuckDB SQL would run against the SQLite primary. + let primary = SqliteReader::new().unwrap(); + primary + .register( + "tbl", + df! { "val" => vec![1.0_f64, 2.0, 2.0, 3.0, 3.0, 3.0, 9.0] }.unwrap(), + true, + ) + .unwrap(); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(Box::new(primary), cache, "test://primary"); + + let spec = reader.execute("VISUALISE x DRAW histogram MAPPING val AS x FROM tbl"); + assert!( + spec.is_ok(), + "explicit-source histogram failed: {:?}", + spec.err() + ); + } + + #[test] + fn test_aliased_cte_reading_primary_routes_to_primary() { + // A column-aliased CTE whose body reads a primary-only table must run on + // the primary. The `__ggsql_aliased__` column-alias wrapper must not + // misroute the read to the (empty) cache. + let base = DuckDBReader::new_in_memory().unwrap(); + base.register("base", df! { "v" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let primary = Box::new(ReadOnlyReader::new(Box::new(base))); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let spec = reader.execute( + "WITH t(a) AS (SELECT v FROM base) SELECT a FROM t VISUALISE a AS x DRAW point", + ); + assert!( + spec.is_ok(), + "aliased CTE over a primary table should succeed: {:?}", + spec.err() + ); + } + + #[test] + fn test_aliased_cte_referencing_prior_cte_routes_to_cache() { + // A column-aliased CTE that references a *prior* CTE reads a table that + // lives in the cache, so its body must route to the cache, while the + // first CTE still reads the primary. + let base = DuckDBReader::new_in_memory().unwrap(); + base.register("base", df! { "v" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let primary = Box::new(ReadOnlyReader::new(Box::new(base))); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let spec = reader.execute( + "WITH a(p) AS (SELECT v FROM base), b(q) AS (SELECT p FROM a) \ + SELECT q FROM b VISUALISE q AS x DRAW point", + ); + assert!( + spec.is_ok(), + "dependent aliased CTE should succeed: {:?}", + spec.err() + ); + } + + #[test] + fn test_meta_table_records_and_serves_memo() { + // A memoized read is recorded in the metadata table and served back from + // the cache on repeat, without touching the primary again. + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let q = "SELECT y FROM base ORDER BY y"; + reader.execute_sql(q).unwrap(); + + // The metadata table now has exactly one row for this read. + let meta = reader + .execute_sql(&format!("SELECT sql FROM {}", naming::CACHE_META_TABLE)) + .unwrap(); + assert_eq!(meta.height(), 1); + assert_eq!( + crate::array_util::as_str(meta.column("sql").unwrap()) + .unwrap() + .value(0), + q + ); + + // The repeat read is served from the cache, not the primary. + reader.execute_sql(q).unwrap(); + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 1); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_sqlite_cache_backend_memoizes() { + use crate::reader::SqliteReader; + // A SQLite cache backend must support the metadata table DDL/DML + // (CREATE TABLE IF NOT EXISTS, INSERT OR REPLACE) and serve memoized reads. + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(SqliteReader::new().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let q = "SELECT y FROM base ORDER BY y"; + let d1 = reader.execute_sql(q).unwrap(); + let d2 = reader.execute_sql(q).unwrap(); + assert_eq!(d1.height(), 3); + assert_eq!(d2.height(), 3); + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!( + hits, 1, + "SQLite cache should serve the repeat from the memo" + ); + } + + #[test] + fn test_resident_substring_not_false_matched() { + // A primary-only table whose name *contains* a cache-resident table name + // as a substring must still route to the primary. Exact-identifier + // matching distinguishes `orders` (resident) from `orders_archive` + // (primary-only). + let primary = DuckDBReader::new_in_memory().unwrap(); + primary + .register( + "orders_archive", + df! { "v" => vec![1_i64, 2, 3] }.unwrap(), + true, + ) + .unwrap(); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(Box::new(primary), cache, "test://primary"); + + // `orders` lives only in the cache. + reader + .register("orders", df! { "v" => vec![9_i64] }.unwrap(), true) + .unwrap(); + + // Reading the primary-only `orders_archive` must hit the primary (3 rows), + // not the resident `orders` (1 row). + let df = reader.execute_sql("SELECT v FROM orders_archive").unwrap(); + assert_eq!(df.height(), 3); + } + + #[test] + fn test_source_write_invalidates_memo() { + // Memoize a base read, mutate the primary, then re-read: the memo must be + // invalidated by the non-row-returning statement so the second read is fresh. + let primary = DuckDBReader::new_in_memory().unwrap(); + primary + .register("t", df! { "v" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(Box::new(primary), cache, "test://primary"); + + let q = "SELECT v FROM t"; + let d1 = reader.execute_sql(q).unwrap(); + assert_eq!(d1.height(), 3); + + reader.execute_sql("INSERT INTO t VALUES (4)").unwrap(); + + let d2 = reader.execute_sql(q).unwrap(); + assert_eq!(d2.height(), 3, "the memo is served despite the write"); + + // After an explicit clear, the re-read sees the inserted row. + reader.clear_cache().unwrap(); + let d3 = reader.execute_sql(q).unwrap(); + assert_eq!(d3.height(), 4, "clear_cache forces a fresh primary read"); + } + + #[test] + fn test_default_config_enabled_ttl_300() { + let reader = CachingReader::with_config( + Box::new(DuckDBReader::new_in_memory().unwrap()), + Box::new(DuckDBReader::new_in_memory().unwrap()), + "test://primary", + CacheConfig::default(), + ); + assert!(reader.cache_config().enabled); + assert_eq!(reader.cache_config().ttl_secs, 300); + } + + #[test] + fn test_repeat_query_hits_primary_once() { + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let q = "SELECT y FROM base ORDER BY y"; + reader.execute_sql(q).unwrap(); + reader.execute_sql(q).unwrap(); + + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 1, "the repeat read is served from the memo"); + } + + #[test] + fn test_ttl_zero_always_misses() { + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::with_config( + primary, + cache, + "test://primary", + CacheConfig { + enabled: true, + ttl_secs: 0, + max_bytes: 512 * 1024 * 1024, + }, + ); + + let q = "SELECT y FROM base ORDER BY y"; + reader.execute_sql(q).unwrap(); + reader.execute_sql(q).unwrap(); + + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 2, "ttl=0 must miss on every read"); + } + + #[test] + fn test_disabled_always_hits_primary() { + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::with_config( + primary, + cache, + "test://primary", + CacheConfig { + enabled: false, + ttl_secs: 300, + max_bytes: 512 * 1024 * 1024, + }, + ); + + let q = "SELECT y FROM base ORDER BY y"; + reader.execute_sql(q).unwrap(); + reader.execute_sql(q).unwrap(); + + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 2, "a disabled cache always hits the primary"); + // The memo was never written. + let meta = reader + .execute_sql(&format!("SELECT * FROM {}", naming::CACHE_META_TABLE)) + .unwrap(); + assert_eq!(meta.height(), 0); + } + + #[test] + fn test_lru_evicts_oldest_when_over_budget() { + // A 1-byte budget forces eviction after every insert, so each cached + // read is gone by the next time it's requested. + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register( + "base", + df! { "a" => vec![1_i64, 2, 3], "b" => vec![4_i64, 5, 6] }.unwrap(), + true, + ) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::with_config( + primary, + cache, + "test://primary", + CacheConfig { + enabled: true, + ttl_secs: 300, + max_bytes: 1, + }, + ); + + let q1 = "SELECT a FROM base ORDER BY a"; + let q2 = "SELECT b FROM base ORDER BY b"; + reader.execute_sql(q1).unwrap(); + reader.execute_sql(q2).unwrap(); + // q1 was evicted when q2 was inserted, so re-reading q1 hits the primary + // again. + reader.execute_sql(q1).unwrap(); + + let q1_hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q1) + .count(); + assert_eq!( + q1_hits, 2, + "the evicted entry is re-fetched from the primary" + ); + // The budget is enforced: at most one entry resides. + let meta = reader + .execute_sql(&format!("SELECT * FROM {}", naming::CACHE_META_TABLE)) + .unwrap(); + assert!(meta.height() <= 1, "over-budget entries are evicted"); + } + + #[test] + fn test_missing_cached_table_self_heals() { + // If the cached result table vanishes out from under the memo, the entry + // is dropped and the read falls through to the primary instead of erroring. + let inner = DuckDBReader::new_in_memory().unwrap(); + inner + .register("base", df! { "y" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let (primary, log) = SpyReader::wrap(Box::new(inner)); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let q = "SELECT y FROM base ORDER BY y"; + reader.execute_sql(q).unwrap(); + + // Drop the cached table directly, leaving a dangling meta row. + let table = reader + .execute_sql(&format!( + "SELECT table_name FROM {}", + naming::CACHE_META_TABLE + )) + .unwrap(); + let table_name = crate::array_util::as_str(table.column("table_name").unwrap()) + .unwrap() + .value(0) + .to_string(); + reader + .cache + .execute_sql(&format!("DROP TABLE {}", naming::quote_ident(&table_name))) + .unwrap(); + + // The next read self-heals: it re-fetches from the primary. + let df = reader.execute_sql(q).unwrap(); + assert_eq!(df.height(), 3); + let hits = log + .lock() + .unwrap() + .iter() + .filter(|s| s.as_str() == q) + .count(); + assert_eq!(hits, 2, "the missing table forced a primary re-fetch"); + } + + #[test] + fn test_parse_human_bytes() { + assert_eq!(parse_human_bytes("1024"), Some(1024)); + assert_eq!(parse_human_bytes("512mb"), Some(512 * 1024 * 1024)); + assert_eq!(parse_human_bytes("1GB"), Some(1024 * 1024 * 1024)); + assert_eq!(parse_human_bytes(" 2kb "), Some(2 * 1024)); + assert_eq!(parse_human_bytes("nonsense"), None); + } + + #[test] + fn test_config_merge_uri_wins() { + let base = CacheConfig::default(); + let merged = base.merge(CacheConfigOverride { + enabled: Some(false), + ttl_secs: Some(60), + max_bytes: None, + }); + assert!(!merged.enabled); + assert_eq!(merged.ttl_secs, 60); + assert_eq!(merged.max_bytes, 512 * 1024 * 1024); + } + + #[test] + fn test_pure_sql_reads_primary_not_cache() { + // The pure-SQL display path uses `execute_sql` (source), which reads the + // primary; `execute_sql_cached` (compute) would hit the empty cache and fail. + let primary = DuckDBReader::new_in_memory().unwrap(); + primary + .register("t", df! { "v" => vec![1_i64, 2, 3] }.unwrap(), true) + .unwrap(); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(Box::new(primary), cache, "test://primary"); + + let df = reader.execute_sql("SELECT v FROM t").unwrap(); + assert_eq!(df.height(), 3); + assert!( + reader.execute_sql_cached("SELECT v FROM t").is_err(), + "compute surface should not find the primary-only table" + ); + } + + #[test] + fn test_cache_resident_table_as_layer_source() { + // A table registered directly on the CachingReader lives only in the + // cache. + let primary = Box::new(DuckDBReader::new_in_memory().unwrap()); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + reader + .register( + "only_in_cache", + df! { "val" => vec![1.0_f64, 2.0, 2.0, 3.0, 3.0, 3.0, 9.0] }.unwrap(), + true, + ) + .unwrap(); + + let spec = reader.execute("VISUALISE x DRAW histogram MAPPING val AS x FROM only_in_cache"); + assert!( + spec.is_ok(), + "cache-resident layer source should succeed: {:?}", + spec.err() + ); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_file_layer_source_staged_via_cache() { + use crate::reader::SqliteReader; + // A file source must be staged on the cache surface. + let dir = std::env::temp_dir(); + let path = dir.join(format!("ggsql_cache_file_test_{}.csv", std::process::id())); + std::fs::write(&path, "val\n1.0\n2.0\n2.0\n3.0\n3.0\n3.0\n9.0\n").unwrap(); + let path_str = path.to_str().unwrap().to_string(); + + let primary = Box::new(SqliteReader::new().unwrap()); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let spec = reader.execute(&format!( + "VISUALISE x DRAW histogram MAPPING val AS x FROM '{}'", + path_str + )); + let _ = std::fs::remove_file(&path); + assert!( + spec.is_ok(), + "file layer source via cache should succeed: {:?}", + spec.err() + ); + } +} diff --git a/src/reader/cache_equivalence.rs b/src/reader/cache_equivalence.rs new file mode 100644 index 000000000..52960a4a7 --- /dev/null +++ b/src/reader/cache_equivalence.rs @@ -0,0 +1,362 @@ +//! Equivalence and read-only-safety tests for the caching layer. +//! +//! These exercise the whole mechanism (materialization, routing, the result +//! cache, builtin routing, layer-source staging, dialect selection). + +use super::CachingReader; +use crate::reader::test_support::{ReadOnlyReader, SpyReader}; +use crate::reader::{CacheBackend, DuckDBReader, Reader, SqliteReader}; +use crate::DataFrame; + +/// One corpus entry. `builtin_only` queries read only `ggsql:` datasets (which +/// route to the cache), so they can be compared exactly to plain DuckDB. +struct Case { + query: &'static str, + builtin_only: bool, +} + +/// A stat-heavy corpus — these generate the most cache-dialect SQL +/// (`sql_percentile`, `sql_greatest`/`sql_least`, `sql_generate_series`, +/// casts), where caching is most likely to diverge. +const CORPUS: &[Case] = &[ + // boxplot: quantiles / IQR + Case { + query: "VISUALISE species AS x, bill_len AS y FROM ggsql:penguins DRAW boxplot", + builtin_only: true, + }, + // histogram: binning + casts (global SELECT over a builtin) + Case { + query: "SELECT Temp FROM ggsql:airquality VISUALISE Temp AS x DRAW histogram", + builtin_only: true, + }, + // density: percentile + generate_series + stddev + Case { + query: "VISUALISE bill_len AS x, species AS colour FROM ggsql:penguins DRAW density", + builtin_only: true, + }, + // grouped aggregation + facet + discrete scale + Case { + query: "SELECT species, bill_len, island FROM ggsql:penguins \ + VISUALISE species AS x, bill_len AS y \ + DRAW bar SETTING aggregate => 'mean' FACET island", + builtin_only: true, + }, + // WITH CTE + multi-layer + FILTER (CTE materialization + global + per-layer routing) + Case { + query: "WITH hot AS (SELECT Date, Temp FROM ggsql:airquality WHERE Temp > 70) \ + SELECT Date, Temp FROM hot \ + VISUALISE Date AS x, Temp AS y \ + DRAW line DRAW point FILTER Temp > 80 SCALE x VIA date", + builtin_only: true, + }, + // explicit per-layer source from a seeded table → forces layer-source staging + // (`caches_sources`) and the read-from-primary + stat-on-cache path. + Case { + query: "VISUALISE species AS x, bill_len AS y \ + DRAW boxplot MAPPING species AS x, bill_len AS y FROM cache_eq_tbl", + builtin_only: false, + }, +]; + +/// Seed the table referenced by the non-builtin corpus entry. +fn seed(reader: &dyn Reader) { + let df = crate::df! { + "species" => vec!["A", "A", "B", "B", "B", "C"], + "bill_len" => vec![1.0_f64, 2.0, 3.0, 4.0, 5.0, 6.0], + } + .unwrap(); + reader.register("cache_eq_tbl", df, true).unwrap(); +} + +/// Stringify each row using the given (canonical, sorted) column order, so two +/// DataFrames can be compared as row multisets — ignoring both physical row order +/// (a query without `ORDER BY` may return rows in a different order on each +/// materialization path) and column order (aesthetic columns are emitted in +/// HashMap order; they bind to encoding channels by name, not position). +fn row_multiset(df: &DataFrame, names: &[String]) -> Vec { + (0..df.height()) + .map(|i| { + names + .iter() + .map(|n| crate::array_util::value_to_string(df.column(n).unwrap(), i)) + .collect::>() + .join("\u{1f}") + }) + .collect() +} + +/// Assert two layer DataFrames have the same columns (by name + type) and the +/// same set of rows — insensitive to row and column ordering. +fn assert_data_equivalent(a: &DataFrame, b: &DataFrame, ctx: &str) { + let mut na: Vec = a + .schema() + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect(); + let mut nb: Vec = b + .schema() + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect(); + na.sort(); + nb.sort(); + assert_eq!(na, nb, "{ctx}: column-name set"); + for name in &na { + assert_eq!( + a.column(name).unwrap().data_type(), + b.column(name).unwrap().data_type(), + "{ctx}: column '{name}' type", + ); + } + assert_eq!(a.height(), b.height(), "{ctx}: row count"); + let (mut ra, mut rb) = (row_multiset(a, &na), row_multiset(b, &na)); + ra.sort(); + rb.sort(); + assert_eq!(ra, rb, "{ctx}: row multiset mismatch"); +} + +/// Assert that a query produces equivalent output through `plain` and `cached`. +/// A query a backend can't run must fail the same way with or without the cache. +fn assert_equivalent(plain: &dyn Reader, cached: &dyn Reader, query: &str) { + let a = plain.execute(query); + let b = cached.execute(query); + assert_eq!( + a.is_ok(), + b.is_ok(), + "ok-mismatch for `{query}`: plain={:?} cached={:?}", + a.as_ref().err(), + b.as_ref().err(), + ); + let (Ok(sa), Ok(sb)) = (a, b) else { return }; + + assert_eq!( + sa.layer_count(), + sb.layer_count(), + "layer count for `{query}`" + ); + for i in 0..sa.layer_count() { + match (sa.layer_data(i), sb.layer_data(i)) { + (Some(da), Some(db)) => assert_data_equivalent(da, db, &format!("`{query}` layer {i}")), + (None, None) => {} + _ => panic!("layer {i} data-presence mismatch for `{query}`"), + } + } +} + +/// The builtin corpus through `{ ReadOnlyReader(SQLite), DuckDB }` matches plain +/// DuckDB exactly. The DuckDB cache does all reading and computing for `ggsql:` +/// sources; the SQLite primary stays idle and unwritten. +#[test] +fn mode1_builtin_equivalence_matches_plain_duckdb() { + if !cfg!(feature = "builtin-data") { + return; // builtin corpus needs the embedded datasets + } + for case in CORPUS.iter().filter(|c| c.builtin_only) { + let plain = DuckDBReader::new_in_memory().unwrap(); + let primary = ReadOnlyReader::new(Box::new(SqliteReader::new().unwrap())); + let cache = DuckDBReader::new_in_memory().unwrap(); + let cached = CachingReader::new(Box::new(primary), Box::new(cache), "test://primary"); + assert_equivalent(&plain, &cached, case.query); + } +} + +/// Read-only safety: the full corpus succeeds through the caching reader, +/// proving a read-only/remote primary is sufficient. +#[test] +fn mode1_read_only_primary_is_sufficient() { + for case in CORPUS { + if case.builtin_only && !cfg!(feature = "builtin-data") { + continue; + } + let sqlite = SqliteReader::new().unwrap(); + seed(&sqlite); + let primary = ReadOnlyReader::new(Box::new(sqlite)); + let cache = DuckDBReader::new_in_memory().unwrap(); + let cached = CachingReader::new(Box::new(primary), Box::new(cache), "test://primary"); + let r = cached.execute(case.query); + assert!( + r.is_ok(), + "read-only primary should suffice with caching for `{}`: {:?}", + case.query, + r.err(), + ); + } +} + +/// Cross-call memoization: a second identical execute does not re-read the +/// primary, because the base read is served from the cache memo. +#[test] +fn cross_call_memoization_avoids_second_primary_read() { + let sqlite = SqliteReader::new().unwrap(); + seed(&sqlite); + let (primary, log) = SpyReader::wrap(Box::new(sqlite)); + let cache = DuckDBReader::new_in_memory().unwrap(); + let cached = CachingReader::new(primary, Box::new(cache), "test://primary"); + + let query = "SELECT bill_len FROM cache_eq_tbl VISUALISE bill_len AS x DRAW histogram"; + cached.execute(query).unwrap(); + let after_first = log.lock().unwrap().len(); + cached.execute(query).unwrap(); + let after_second = log.lock().unwrap().len(); + + assert!(after_first >= 1, "first execute should read the primary"); + assert_eq!( + after_first, + after_second, + "second execute must not re-hit the primary; log: {:?}", + *log.lock().unwrap(), + ); +} + +/// The library factory builds working caching readers from composite URIs. +#[test] +fn factory_builds_caching_readers() { + use crate::reader::connection::reader_from_uri; + for uri in ["duckdb+sqlite://memory", "sqlite+duckdb://memory"] { + let r = reader_from_uri(uri).unwrap_or_else(|e| panic!("build `{uri}`: {e}")); + if cfg!(feature = "builtin-data") { + let spec = + r.execute("VISUALISE species AS x, bill_len AS y FROM ggsql:penguins DRAW boxplot"); + assert!( + spec.is_ok(), + "factory reader failed for `{uri}`: {:?}", + spec.err() + ); + } + } +} + +/// Map projections run entirely on the cache. +#[cfg(all(feature = "spatial", feature = "builtin-data"))] +#[test] +fn map_projection_runs_on_cache_not_primary() { + let (primary, log) = SpyReader::wrap(Box::new(DuckDBReader::new_in_memory().unwrap())); + let cache = Box::new(DuckDBReader::new_in_memory().unwrap()); + let reader = CachingReader::new(primary, cache, "test://primary"); + + let spec = reader.execute("VISUALISE FROM ggsql:world DRAW spatial PROJECT TO orthographic"); + assert!( + spec.is_ok(), + "map projection via cache failed: {:?}", + spec.err() + ); + + // No dialect-specific spatial SQL, temp-table DDL, or `__ggsql_*` reference + // ever reached the primary. + let log = log.lock().unwrap(); + for stmt in log.iter() { + let upper = stmt.to_uppercase(); + assert!( + !upper.contains("ST_") && !upper.contains("TEMP TABLE") && !stmt.contains("__ggsql_"), + "derived spatial SQL leaked to the primary: {stmt}" + ); + } +} + +/// A real external ADBC SQLite primary + DuckDB cache, compared against a bare ADBC reader. +/// `#[ignore]` — requires `dbc install sqlite`. +#[cfg(feature = "adbc")] +mod adbc_mode { + use super::*; + use crate::reader::sqlite::SqliteDialect; + use crate::reader::test_support::assert_dataframes_equal; + use crate::reader::{AdbcReader, Spec, SqlDialect}; + use crate::{DataFrame, Result}; + use adbc_core::options::{AdbcVersion, OptionDatabase, OptionValue}; + use adbc_core::LOAD_FLAG_DEFAULT; + use adbc_driver_manager::ManagedDriver; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use tempfile::NamedTempFile; + + fn make_adbc_reader(db_path: &str) -> AdbcReader { + let driver = ManagedDriver::load_from_name( + "sqlite", + None, + AdbcVersion::V110, + LOAD_FLAG_DEFAULT, + None, + ) + .expect("`dbc install sqlite` first; see adbc.rs::equivalence_tests docs"); + let dialect: Box = Box::new(SqliteDialect); + AdbcReader::new_with_database_opts( + driver, + dialect, + std::iter::once(( + OptionDatabase::Uri, + OptionValue::String(format!("file:{}", db_path)), + )), + ) + .expect("construct AdbcReader") + } + + fn seed_adbc(path: &str) { + let bare = make_adbc_reader(path); + let df = crate::df! { + "x" => vec![1_i64, 2, 3, 4, 5], + "y" => vec![10_i64, 20, 30, 40, 50], + } + .unwrap(); + bare.register("t", df, false).unwrap(); + } + + /// Counts `execute_sql` calls reaching the ADBC primary. + struct CountingAdbcReader { + inner: AdbcReader, + calls: Arc, + } + + impl Reader for CountingAdbcReader { + fn execute_sql(&self, sql: &str) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + self.inner.execute_sql(sql) + } + fn register(&self, name: &str, df: DataFrame, replace: bool) -> Result<()> { + self.inner.register(name, df, replace) + } + fn unregister(&self, name: &str) -> Result<()> { + self.inner.unregister(name) + } + fn execute(&self, query: &str) -> Result { + crate::reader::execute_with_reader(self, query) + } + fn dialect(&self) -> &dyn SqlDialect { + self.inner.dialect() + } + } + + #[test] + #[ignore = "requires `dbc install sqlite`"] + fn mode2_adbc_primary_duckdb_cache_equiv_and_memo() { + let db = NamedTempFile::new().unwrap(); + let path = db.path().to_str().unwrap(); + seed_adbc(path); + + let sql = "SELECT x, y, x*y AS xy FROM t WHERE y > 15 ORDER BY x"; + let golden = make_adbc_reader(path).execute_sql(sql).unwrap(); + + let calls = Arc::new(AtomicUsize::new(0)); + let primary = CountingAdbcReader { + inner: make_adbc_reader(path), + calls: calls.clone(), + }; + let cache = DuckDBReader::new_in_memory().unwrap(); + let cached = CachingReader::new(Box::new(primary), Box::new(cache), "test://primary"); + + // Base reads go through the source surface; the cache memoizes them. + let miss = cached.execute_sql(sql).unwrap(); + assert_dataframes_equal(&golden, &miss, "adbc cache miss"); + let after_miss = calls.load(Ordering::SeqCst); + assert!(after_miss >= 1, "miss should reach the ADBC primary"); + + let hit = cached.execute_sql(sql).unwrap(); + assert_dataframes_equal(&golden, &hit, "adbc cache hit"); + let after_hit = calls.load(Ordering::SeqCst); + assert_eq!( + after_miss, after_hit, + "cache hit must not round-trip to the ADBC primary" + ); + } +} diff --git a/src/reader/connection.rs b/src/reader/connection.rs index 8f72c7c57..7c1fba364 100644 --- a/src/reader/connection.rs +++ b/src/reader/connection.rs @@ -1,78 +1,173 @@ -//! Connection string parsing for data sources +//! Connection string handling for data sources. //! -//! Parses URI-style connection strings to determine database type and connection parameters. +//! Maps URI-style connection strings (`duckdb://…`, `sqlite://…`, `odbc://…`) and +//! the composite caching form (`+://…`) to readers. +use crate::reader::Reader; use crate::{GgsqlError, Result}; -/// Parsed connection information -#[derive(Debug, Clone, PartialEq)] -pub enum ConnectionInfo { - /// DuckDB in-memory database - DuckDBMemory, - /// DuckDB file-based database - DuckDBFile(String), - /// PostgreSQL connection - #[allow(dead_code)] - PostgreSQL(String), - /// SQLite file-based database - #[allow(dead_code)] - SQLite(String), - /// Generic ODBC connection (raw connection string after `odbc://` prefix) - #[allow(dead_code)] - ODBC(String), -} - -/// Parse a connection string into connection information +/// Split a composite cache URI `+://` into the primary +/// connection URI and the cache backend scheme. /// -/// # Supported Formats +/// Returns `None` when there is no `+` before `://` (a plain URI). /// -/// - `duckdb://memory` - DuckDB in-memory database -/// - `duckdb://...` - DuckDB path -/// - `postgres://...` - PostgreSQL connection string -/// - `sqlite://...` - SQLite file path +/// # Example /// ``` -pub fn parse_connection_string(uri: &str) -> Result { - if uri == "duckdb://memory" { - return Ok(ConnectionInfo::DuckDBMemory); +/// use ggsql::reader::connection::split_cache_uri; +/// assert_eq!( +/// split_cache_uri("odbc+duckdb://DSN=foo"), +/// Some(("odbc://DSN=foo".to_string(), "duckdb".to_string())) +/// ); +/// assert_eq!(split_cache_uri("duckdb://memory"), None); +/// ``` +pub fn split_cache_uri(uri: &str) -> Option<(String, String)> { + let (scheme, rest) = uri.split_once("://")?; + let (primary, cache) = scheme.split_once('+')?; + if primary.is_empty() || cache.is_empty() || cache.contains('+') { + return None; } + Some((format!("{}://{}", primary, rest), cache.to_string())) +} - if let Some(path) = uri.strip_prefix("duckdb://") { - if path.is_empty() { - return Err(GgsqlError::ReaderError( - "DuckDB file path cannot be empty".to_string(), - )); +/// Cache-config keys recognised in a connection URI's trailing `?` query string. +#[cfg(any(feature = "duckdb", feature = "sqlite"))] +const KNOWN_CACHE_PARAMS: &[&str] = &["ttl", "max_bytes", "disabled"]; + +/// Pull cache-config keys out of a connection URI's trailing `?key=value&…` +/// query string, returning the URI with those keys removed plus the overrides. +#[cfg(any(feature = "duckdb", feature = "sqlite"))] +fn strip_cache_params(uri: &str) -> (String, crate::reader::cache::CacheConfigOverride) { + use crate::reader::cache::{parse_human_bytes, CacheConfigOverride}; + + let mut over = CacheConfigOverride::default(); + let Some((body, query)) = uri.split_once('?') else { + return (uri.to_string(), over); + }; + + let mut kept: Vec<&str> = Vec::new(); + for segment in query.split('&') { + match segment.split_once('=') { + Some((key, value)) if KNOWN_CACHE_PARAMS.contains(&key) => match key { + "ttl" => over.ttl_secs = value.trim().parse::().ok(), + "max_bytes" => over.max_bytes = parse_human_bytes(value), + "disabled" => { + let v = value.trim().to_ascii_lowercase(); + over.enabled = Some(!matches!(v.as_str(), "1" | "true" | "yes")); + } + _ => unreachable!("validated against KNOWN_CACHE_PARAMS"), + }, + // Not a cache key: keep it on the URI for the primary reader. + _ => kept.push(segment), } - return Ok(ConnectionInfo::DuckDBFile(path.to_string())); } - if uri.starts_with("postgres://") || uri.starts_with("postgresql://") { - return Ok(ConnectionInfo::PostgreSQL(uri.to_string())); + if kept.is_empty() { + (body.to_string(), over) + } else { + (format!("{}?{}", body, kept.join("&")), over) } +} - if let Some(path) = uri.strip_prefix("sqlite://") { - if path.is_empty() { +/// Map a cache-backend scheme to its in-memory connection URI. +#[cfg(any(feature = "duckdb", feature = "sqlite"))] +fn cache_uri(scheme: &str) -> Result<&'static str> { + match scheme { + "duckdb" => Ok("duckdb://memory"), + "sqlite" => Ok("sqlite://memory"), + _ => Err(GgsqlError::ReaderError(format!( + "Unsupported cache backend '{}'. Supported: duckdb, sqlite", + scheme + ))), + } +} + +/// Build a reader from a non-composite connection URI +pub fn build_reader(uri: &str) -> Result> { + if uri.starts_with("duckdb://") { + #[cfg(feature = "duckdb")] + { + return Ok(Box::new( + crate::reader::DuckDBReader::from_connection_string(uri)?, + )); + } + #[cfg(not(feature = "duckdb"))] + { return Err(GgsqlError::ReaderError( - "SQLite file path cannot be empty".to_string(), + "DuckDB reader not compiled in. Rebuild with --features duckdb".to_string(), )); } - return Ok(ConnectionInfo::SQLite(path.to_string())); } - - if let Some(conn_str) = uri.strip_prefix("odbc://") { - if conn_str.is_empty() { + if uri.starts_with("sqlite://") { + #[cfg(feature = "sqlite")] + { + return Ok(Box::new( + crate::reader::SqliteReader::from_connection_string(uri)?, + )); + } + #[cfg(not(feature = "sqlite"))] + { return Err(GgsqlError::ReaderError( - "ODBC connection string cannot be empty".to_string(), + "SQLite reader not compiled in. Rebuild with --features sqlite".to_string(), )); } - return Ok(ConnectionInfo::ODBC(conn_str.to_string())); } - + if uri.starts_with("odbc://") { + #[cfg(feature = "odbc")] + { + return Ok(Box::new(crate::reader::OdbcReader::from_connection_string( + uri, + )?)); + } + #[cfg(not(feature = "odbc"))] + { + return Err(GgsqlError::ReaderError( + "ODBC reader not compiled in. Rebuild with --features odbc".to_string(), + )); + } + } + if uri.starts_with("postgres://") || uri.starts_with("postgresql://") { + return Err(GgsqlError::ReaderError( + "PostgreSQL reader is not yet implemented".to_string(), + )); + } Err(GgsqlError::ReaderError(format!( - "Unsupported connection string format: {}. Supported: duckdb://, postgres://, sqlite://, odbc://", + "Unsupported connection string: {}. Supported: duckdb://, sqlite://, odbc://", uri ))) } +/// Construct a reader from a connection URI, wrapping it in a [`CachingReader`] +/// when the URI uses the composite `+://` form. +/// +/// [`CachingReader`]: crate::reader::CachingReader +pub fn reader_from_uri(uri: &str) -> Result> { + if let Some((primary_uri, cache_scheme)) = split_cache_uri(uri) { + #[cfg(any(feature = "duckdb", feature = "sqlite"))] + { + use crate::reader::cache::CacheConfig; + + let (primary_uri, over) = strip_cache_params(&primary_uri); + let config = CacheConfig::from_env().merge(over); + let primary = build_reader(&primary_uri)?; + let cache = build_reader(cache_uri(&cache_scheme)?)?; + return Ok(Box::new(crate::reader::CachingReader::with_config( + primary, + cache, + primary_uri, + config, + ))); + } + #[cfg(not(any(feature = "duckdb", feature = "sqlite")))] + { + let _ = (&primary_uri, &cache_scheme); + return Err(GgsqlError::ReaderError( + "Caching layer requires the duckdb or sqlite feature".to_string(), + )); + } + } + build_reader(uri) +} + /// Extract a value from an ODBC connection string by key, stripping braces. pub fn extract_odbc_value(conn_str: &str, key: &str) -> Option { let lower = conn_str.to_lowercase(); @@ -93,91 +188,130 @@ mod tests { use super::*; #[test] - fn test_duckdb_memory() { - let info = parse_connection_string("duckdb://memory").unwrap(); - assert_eq!(info, ConnectionInfo::DuckDBMemory); + fn test_build_reader_unsupported_scheme() { + let err = build_reader("mysql://localhost/db") + .err() + .unwrap() + .to_string(); + assert!(err.contains("Unsupported connection string"), "got: {err}"); } #[test] - fn test_duckdb_file_relative() { - let info = parse_connection_string("duckdb://data.db").unwrap(); - assert_eq!(info, ConnectionInfo::DuckDBFile("data.db".to_string())); + fn test_build_reader_postgres_not_implemented() { + let err = build_reader("postgres://user@localhost/db") + .err() + .unwrap() + .to_string(); + assert!(err.contains("not yet implemented"), "got: {err}"); } + #[cfg(feature = "duckdb")] #[test] - fn test_duckdb_file_absolute() { - let info = parse_connection_string("duckdb:///tmp/data.db").unwrap(); - assert_eq!(info, ConnectionInfo::DuckDBFile("/tmp/data.db".to_string())); + fn test_build_reader_duckdb_memory_and_empty() { + assert!(build_reader("duckdb://memory").is_ok()); + assert!(build_reader("duckdb://").is_err()); } + #[cfg(feature = "sqlite")] #[test] - fn test_duckdb_file_nested() { - let info = parse_connection_string("duckdb://path/to/data.db").unwrap(); - assert_eq!( - info, - ConnectionInfo::DuckDBFile("path/to/data.db".to_string()) - ); + fn test_build_reader_sqlite_memory() { + assert!(build_reader("sqlite://memory").is_ok()); + assert!(build_reader("sqlite://:memory:").is_ok()); + } + + #[cfg(all(feature = "duckdb", feature = "sqlite"))] + #[test] + fn test_reader_from_uri_composite_builds() { + assert!(reader_from_uri("duckdb+sqlite://memory").is_ok()); + assert!(reader_from_uri("sqlite+duckdb://memory").is_ok()); } #[test] - fn test_postgres() { - let uri = "postgres://user:pass@localhost/db"; - let info = parse_connection_string(uri).unwrap(); - assert_eq!(info, ConnectionInfo::PostgreSQL(uri.to_string())); + fn test_split_cache_uri_odbc_duckdb() { + assert_eq!( + split_cache_uri("odbc+duckdb://Driver=Snowflake;Server=x"), + Some(( + "odbc://Driver=Snowflake;Server=x".to_string(), + "duckdb".to_string() + )) + ); } #[test] - fn test_postgresql_alias() { - let uri = "postgresql://user:pass@localhost/db"; - let info = parse_connection_string(uri).unwrap(); - assert_eq!(info, ConnectionInfo::PostgreSQL(uri.to_string())); + fn test_split_cache_uri_duckdb_sqlite_memory() { + assert_eq!( + split_cache_uri("duckdb+sqlite://memory"), + Some(("duckdb://memory".to_string(), "sqlite".to_string())) + ); } #[test] - fn test_sqlite() { - let info = parse_connection_string("sqlite://data.db").unwrap(); - assert_eq!(info, ConnectionInfo::SQLite("data.db".to_string())); + fn test_split_cache_uri_plain_is_none() { + assert_eq!(split_cache_uri("duckdb://memory"), None); + assert_eq!(split_cache_uri("odbc://DSN=x"), None); } #[test] - fn test_sqlite_absolute() { - let info = parse_connection_string("sqlite:///tmp/data.db").unwrap(); - assert_eq!(info, ConnectionInfo::SQLite("/tmp/data.db".to_string())); + fn test_split_cache_uri_rejects_multiple_plus() { + assert_eq!(split_cache_uri("a+b+c://x"), None); } #[test] - fn test_empty_duckdb_path() { - let result = parse_connection_string("duckdb://"); - assert!(result.is_err()); + fn test_split_cache_uri_rejects_empty_parts() { + assert_eq!(split_cache_uri("+duckdb://x"), None); + assert_eq!(split_cache_uri("odbc+://x"), None); } + #[cfg(any(feature = "duckdb", feature = "sqlite"))] #[test] - fn test_odbc() { - let info = parse_connection_string( - "odbc://Driver=Snowflake;Server=myaccount.snowflakecomputing.com", - ) - .unwrap(); - assert_eq!( - info, - ConnectionInfo::ODBC( - "Driver=Snowflake;Server=myaccount.snowflakecomputing.com".to_string() - ) - ); + fn test_strip_cache_params_parses_known_keys() { + let (uri, over) = strip_cache_params("duckdb://memory?ttl=600"); + assert_eq!(uri, "duckdb://memory"); + assert_eq!(over.ttl_secs, Some(600)); + assert_eq!(over.max_bytes, None); + assert_eq!(over.enabled, None); + + let (uri, over) = strip_cache_params("duckdb://memory?max_bytes=256mb&disabled=true"); + assert_eq!(uri, "duckdb://memory"); + assert_eq!(over.max_bytes, Some(256 * 1024 * 1024)); + assert_eq!(over.enabled, Some(false)); } + #[cfg(any(feature = "duckdb", feature = "sqlite"))] #[test] - fn test_odbc_empty() { - let result = parse_connection_string("odbc://"); - assert!(result.is_err()); + fn test_strip_cache_params_keeps_non_cache_segments() { + // A non-cache `?key=` tail contributes no overrides and is left in place. + let (uri, over) = strip_cache_params("odbc://DSN=foo?warehouse=PROD"); + assert_eq!(uri, "odbc://DSN=foo?warehouse=PROD"); + assert_eq!(over.ttl_secs, None); + + // ODBC body with `=`/`;` and no `?` is returned verbatim. + let (uri, over) = strip_cache_params("odbc://Driver=Snowflake;Server=x"); + assert_eq!(uri, "odbc://Driver=Snowflake;Server=x"); + assert_eq!(over.enabled, None); + + // Cache keys are extracted; other params (e.g. ODBC settings) are kept. + let (uri, over) = strip_cache_params("odbc://DSN=foo?warehouse=PROD&ttl=10&max_bytes=8mb"); + assert_eq!(uri, "odbc://DSN=foo?warehouse=PROD"); + assert_eq!(over.ttl_secs, Some(10)); + assert_eq!(over.max_bytes, Some(8 * 1024 * 1024)); + + // When every param is a cache key, the `?` is dropped entirely. + let (uri, over) = strip_cache_params("duckdb://memory?ttl=10&disabled=1"); + assert_eq!(uri, "duckdb://memory"); + assert_eq!(over.ttl_secs, Some(10)); + assert_eq!(over.enabled, Some(false)); + + // Plain URI, no query string. + let (uri, over) = strip_cache_params("duckdb://memory"); + assert_eq!(uri, "duckdb://memory"); + assert_eq!(over.ttl_secs, None); } + #[cfg(all(feature = "duckdb", feature = "sqlite"))] #[test] - fn test_unsupported_scheme() { - let result = parse_connection_string("mysql://localhost/db"); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Unsupported connection string")); + fn test_reader_from_uri_applies_uri_cache_params() { + // A composite URI with a cache-param tail builds a CachingReader + assert!(reader_from_uri("duckdb+sqlite://memory?ttl=600&max_bytes=64mb").is_ok()); } } diff --git a/src/reader/data.rs b/src/reader/data.rs index abf9e9604..384d944ff 100644 --- a/src/reader/data.rs +++ b/src/reader/data.rs @@ -146,6 +146,22 @@ pub fn extract_builtin_dataset_names(sql: &str) -> Result, GgsqlErro Ok(datasets) } +/// Extract the table names referenced in a SQL query's `FROM`/`JOIN` clauses. +/// +/// Returns the `table_ref` targets (deduplicated), with surrounding quotes +/// stripped via [`naming::unquote_ident`] so they can be compared against +/// unquoted registered table names. Subquery sources contribute no name. +pub fn extract_table_refs(sql: &str) -> Result, GgsqlError> { + let token_def = r#"(table_ref table: (_) @table)"#; + let mut tokens = tokens_from_tree(sql, token_def, "table")? + .iter() + .map(|t| naming::unquote_ident(t)) + .collect::>(); + tokens.sort_unstable(); + tokens.dedup(); + Ok(tokens) +} + /// Rewrite SQL to replace namespaced identifiers with internal table names. /// /// e.g., `SELECT * FROM ggsql:penguins` -> `SELECT * FROM __ggsql_data_penguins__` diff --git a/src/reader/duckdb.rs b/src/reader/duckdb.rs index bf459c155..5badc5324 100644 --- a/src/reader/duckdb.rs +++ b/src/reader/duckdb.rs @@ -2,7 +2,7 @@ //! //! Provides a reader for DuckDB databases with Arrow DataFrame integration. -use crate::reader::{connection::ConnectionInfo, Reader}; +use crate::reader::{CacheBackend, Reader}; use crate::{naming, DataFrame, GgsqlError, Result}; use arrow::compute::{cast, concat_batches}; use arrow::datatypes::{DataType, Field, Schema}; @@ -243,21 +243,25 @@ impl DuckDBReader { /// - The database file cannot be opened /// - DuckDB initialization fails pub fn from_connection_string(uri: &str) -> Result { - let conn_info = super::connection::parse_connection_string(uri)?; + let path = uri.strip_prefix("duckdb://").ok_or_else(|| { + GgsqlError::ReaderError(format!( + "DuckDB URI must start with duckdb://, got '{}'", + uri + )) + })?; - let conn = match conn_info { - ConnectionInfo::DuckDBMemory => Connection::open_in_memory().map_err(|e| { + let conn = if path == "memory" { + Connection::open_in_memory().map_err(|e| { GgsqlError::ReaderError(format!("Failed to open in-memory DuckDB: {}", e)) - })?, - ConnectionInfo::DuckDBFile(path) => Connection::open(&path).map_err(|e| { + })? + } else if path.is_empty() { + return Err(GgsqlError::ReaderError( + "DuckDB file path cannot be empty".to_string(), + )); + } else { + Connection::open(path).map_err(|e| { GgsqlError::ReaderError(format!("Failed to open DuckDB file '{}': {}", path, e)) - })?, - _ => { - return Err(GgsqlError::ReaderError(format!( - "Connection string '{}' is not supported by DuckDBReader", - uri - ))) - } + })? }; // https://github.com/duckdb/duckdb/issues/22133 @@ -349,6 +353,12 @@ fn normalize_arrow_types(batch: RecordBatch) -> Result { .map_err(|e| GgsqlError::ReaderError(format!("Failed to normalize types: {}", e))) } +impl CacheBackend for DuckDBReader { + fn new_in_memory() -> Result { + Self::from_connection_string("duckdb://memory") + } +} + impl Reader for DuckDBReader { fn execute_sql(&self, sql: &str) -> Result { // Register builtin datasets if referenced diff --git a/src/reader/mod.rs b/src/reader/mod.rs index a6609d0c8..aaa961a83 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -429,6 +429,12 @@ pub mod odbc; #[cfg(feature = "adbc")] pub mod adbc; +#[cfg(any(feature = "duckdb", feature = "sqlite"))] +pub mod cache; + +#[cfg(all(test, feature = "duckdb", feature = "sqlite"))] +mod cache_equivalence; + pub mod connection; pub mod data; mod spec; @@ -445,6 +451,9 @@ pub use odbc::OdbcReader; #[cfg(feature = "adbc")] pub use adbc::AdbcReader; +#[cfg(any(feature = "duckdb", feature = "sqlite"))] +pub use cache::CachingReader; + // ============================================================================ // Shared utilities // ============================================================================ @@ -489,6 +498,177 @@ pub(crate) fn returns_rows(sql: &str) -> bool { ) } +/// Shared test helpers for reader equivalence suites. +#[cfg(test)] +pub(crate) mod test_support { + use super::{ + execute_with_reader, returns_rows, ColumnInfo, Reader, Spec, SqlDialect, TableInfo, + }; + use crate::{DataFrame, GgsqlError, Result}; + use std::sync::{Arc, Mutex}; + + /// A `Reader` that records every `execute_sql` it receives, delegating + /// everything to an inner reader. + pub(crate) struct SpyReader { + inner: Box, + log: Arc>>, + } + + impl SpyReader { + /// Wrap `inner`, returning the boxed spy and a handle to its call log. + pub(crate) fn wrap( + inner: Box, + ) -> (Box, Arc>>) { + let log = Arc::new(Mutex::new(Vec::new())); + ( + Box::new(SpyReader { + inner, + log: log.clone(), + }), + log, + ) + } + } + + impl Reader for SpyReader { + fn execute_sql(&self, sql: &str) -> Result { + self.log.lock().unwrap().push(sql.to_string()); + self.inner.execute_sql(sql) + } + fn register(&self, name: &str, df: DataFrame, replace: bool) -> Result<()> { + self.inner.register(name, df, replace) + } + fn unregister(&self, name: &str) -> Result<()> { + self.inner.unregister(name) + } + fn execute(&self, query: &str) -> Result { + execute_with_reader(self, query) + } + fn dialect(&self) -> &dyn SqlDialect { + self.inner.dialect() + } + fn list_catalogs(&self) -> Result> { + self.inner.list_catalogs() + } + fn list_schemas(&self, c: &str) -> Result> { + self.inner.list_schemas(c) + } + fn list_tables(&self, c: &str, s: &str) -> Result> { + self.inner.list_tables(c, s) + } + fn list_columns(&self, c: &str, s: &str, t: &str) -> Result> { + self.inner.list_columns(c, s, t) + } + } + + /// A `Reader` that wraps an inner reader and **refuses every write**: any + /// `register`/`unregister` and any non-row-returning `execute_sql` + /// (CREATE/INSERT/DROP/…) returns an error. + pub(crate) struct ReadOnlyReader { + inner: Box, + } + + impl ReadOnlyReader { + pub(crate) fn new(inner: Box) -> Self { + Self { inner } + } + + fn refuse(op: &str) -> GgsqlError { + GgsqlError::ReaderError(format!("read-only primary: refused {op}")) + } + } + + impl Reader for ReadOnlyReader { + fn execute_sql(&self, sql: &str) -> Result { + if !returns_rows(sql) { + return Err(Self::refuse(&format!("write statement: {sql}"))); + } + self.inner.execute_sql(sql) + } + fn register(&self, _name: &str, _df: DataFrame, _replace: bool) -> Result<()> { + Err(Self::refuse("register")) + } + fn unregister(&self, _name: &str) -> Result<()> { + Err(Self::refuse("unregister")) + } + fn execute(&self, query: &str) -> Result { + execute_with_reader(self, query) + } + fn dialect(&self) -> &dyn SqlDialect { + self.inner.dialect() + } + fn list_catalogs(&self) -> Result> { + self.inner.list_catalogs() + } + fn list_schemas(&self, c: &str) -> Result> { + self.inner.list_schemas(c) + } + fn list_tables(&self, c: &str, s: &str) -> Result> { + self.inner.list_tables(c, s) + } + fn list_columns(&self, c: &str, s: &str, t: &str) -> Result> { + self.inner.list_columns(c, s, t) + } + } + + /// Compare two DataFrames by schema (field names + types) and by + /// per-column Arrow array contents. We don't use a blanket + /// `assert_eq!(df, df)` because `DataFrame` doesn't implement `PartialEq`; + /// going through schema + per-column equality is also more diagnostic + /// when one of them diverges. + #[cfg(feature = "adbc")] + pub(crate) fn assert_dataframes_equal(a: &DataFrame, b: &DataFrame, ctx: &str) { + let a_schema = a.schema(); + let b_schema = b.schema(); + assert_eq!( + a_schema.fields().len(), + b_schema.fields().len(), + "{ctx}: column count mismatch (a={}, b={})", + a_schema.fields().len(), + b_schema.fields().len(), + ); + for (i, (af, bf)) in a_schema + .fields() + .iter() + .zip(b_schema.fields().iter()) + .enumerate() + { + assert_eq!( + af.name(), + bf.name(), + "{ctx}: column {i} name mismatch (a='{}', b='{}')", + af.name(), + bf.name(), + ); + assert_eq!( + af.data_type(), + bf.data_type(), + "{ctx}: column '{}' type mismatch (a={:?}, b={:?})", + af.name(), + af.data_type(), + bf.data_type(), + ); + } + assert_eq!( + a.height(), + b.height(), + "{ctx}: row count mismatch (a={}, b={})", + a.height(), + b.height(), + ); + for field in a_schema.fields() { + let ac = a.column(field.name()).unwrap(); + let bc = b.column(field.name()).unwrap(); + assert_eq!( + ac.as_ref(), + bc.as_ref(), + "{ctx}: column '{}' data mismatch", + field.name(), + ); + } + } +} + // ============================================================================ // Spec - Result of reader.execute() // ============================================================================ @@ -544,15 +724,26 @@ pub struct Metadata { /// let result = reader.execute_sql("SELECT * FROM sales WHERE amount > 100")?; /// ``` pub trait Reader { - /// Execute a SQL query and return the result as a DataFrame + /// Execute a SQL query and return the result as a DataFrame. /// - /// # Arguments + /// This is the **source surface**: base reads of the user's data plus user + /// setup/DML. A plain reader runs everything on its one connection; a caching + /// reader reads the primary here (with result memoization). /// - /// * `sql` - The SQL query to execute + /// # Errors /// - /// # Returns + /// Returns `GgsqlError::ReaderError` if: + /// - The SQL is invalid + /// - The connection fails + /// - The table or columns don't exist + fn execute_sql(&self, sql: &str) -> Result; + + /// Execute SQL against the *compute surface* — dialect-generated/derived SQL + /// over internal `__ggsql_*` tables. /// - /// A Polars DataFrame containing the query results + /// Defaults to the source surface ([`Reader::execute_sql`]), so a plain reader + /// runs everything on one connection. A caching reader overrides this to run + /// on the in-memory cache, where all derived `__ggsql_*` tables live. /// /// # Errors /// @@ -560,7 +751,9 @@ pub trait Reader { /// - The SQL is invalid /// - The connection fails /// - The table or columns don't exist - fn execute_sql(&self, sql: &str) -> Result; + fn execute_sql_cached(&self, sql: &str) -> Result { + self.execute_sql(sql) + } /// Register a DataFrame as a queryable table (takes ownership) /// @@ -642,6 +835,32 @@ pub trait Reader { &AnsiDialect } + /// Materialize the result of `body_sql` as a temporary table named `name`. + fn materialize_table( + &self, + name: &str, + column_aliases: &[String], + body_sql: &str, + ) -> Result<()> { + for stmt in self + .dialect() + .create_or_replace_temp_table_sql(name, column_aliases, body_sql) + { + self.execute_sql(&stmt)?; + } + Ok(()) + } + + /// Whether this reader stages external data sources into a separate cache. + fn caches_sources(&self) -> bool { + false + } + + /// Clear any cached query results held by this reader. + fn clear_cache(&self) -> Result<()> { + Ok(()) + } + // ========================================================================= // Schema introspection // ========================================================================= @@ -721,6 +940,16 @@ pub trait Reader { } } +/// A reader that can serve as an in-memory, writable caching backend. +/// +/// Cache backends take no options: they are always a fresh in-memory, writable +/// database scoped to the process; consumed by [`CachingReader`]. +pub trait CacheBackend: Reader { + fn new_in_memory() -> Result + where + Self: Sized; +} + /// A table or view in the schema. pub struct TableInfo { pub name: String, diff --git a/src/reader/sqlite.rs b/src/reader/sqlite.rs index 8f1e1f9f6..245c65a96 100644 --- a/src/reader/sqlite.rs +++ b/src/reader/sqlite.rs @@ -3,7 +3,7 @@ //! Provides a reader for SQLite databases with Arrow DataFrame integration. //! Works on both native targets and wasm32-unknown-unknown (via sqlite-wasm-rs). -use crate::reader::Reader; +use crate::reader::{CacheBackend, Reader}; use crate::{naming, DataFrame, GgsqlError, Result}; use arrow::array::*; use arrow::datatypes::{DataType, TimeUnit}; @@ -161,21 +161,31 @@ impl SqliteReader { } /// Create a SQLite reader from a connection string + /// + /// `sqlite://memory` (or `sqlite://:memory:`) opens an in-memory database; + /// any other path opens that file. pub fn from_connection_string(uri: &str) -> Result { - let conn_info = super::connection::parse_connection_string(uri)?; + let path = uri.strip_prefix("sqlite://").ok_or_else(|| { + GgsqlError::ReaderError(format!( + "SQLite URI must start with sqlite://, got '{}'", + uri + )) + })?; - let conn = match conn_info { - super::connection::ConnectionInfo::SQLite(path) => { - Connection::open(&path).map_err(|e| { - GgsqlError::ReaderError(format!("Failed to open SQLite file '{}': {}", path, e)) - })? - } - _ => { - return Err(GgsqlError::ReaderError(format!( - "Connection string '{}' is not supported by SqliteReader", - uri - ))) - } + if path.is_empty() { + return Err(GgsqlError::ReaderError( + "SQLite file path cannot be empty".to_string(), + )); + } + + let conn = if path == "memory" || path == ":memory:" { + Connection::open_in_memory().map_err(|e| { + GgsqlError::ReaderError(format!("Failed to open in-memory SQLite: {}", e)) + })? + } else { + Connection::open(path).map_err(|e| { + GgsqlError::ReaderError(format!("Failed to open SQLite file '{}': {}", path, e)) + })? }; #[cfg(feature = "spatial")] @@ -366,6 +376,12 @@ fn to_sql_value(v: &dyn rusqlite::types::ToSql) -> Option Result { + Self::new() + } +} + impl Reader for SqliteReader { fn execute_sql(&self, sql: &str) -> Result { // Handle ggsql:name namespaced identifiers (builtin datasets)