Skip to content
6 changes: 2 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ jobs:
- uses: actions/checkout@v4

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: clippy, rustfmt
run: rustup show

- name: Install protoc
# Issue #28: prost-build needs protoc to compile our internal
Expand Down Expand Up @@ -60,7 +58,7 @@ jobs:
- uses: actions/checkout@v4

- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
run: rustup show

- name: Install protoc
# Issue #28: prost-build needs protoc; ubuntu-latest does not
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
resolver = "2"
resolver = "3"
# Issue #38: collapsed all internal merutable-* crates into the
# single `merutable` crate. Only `merutable` (the published library)
# and `merutable-python` (cdylib for PyO3 bindings; can't merge into
Expand All @@ -17,6 +17,10 @@ default-members = [
"crates/merutable",
]

[workspace.package]
edition = "2024"
rust-version = "1.88.0"

[workspace.dependencies]
# Parquet / Arrow (arrow-rs monorepo)
parquet = { version = "53", features = ["async"] }
Expand Down
32 changes: 31 additions & 1 deletion DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

## Prerequisites

- **Rust stable** (1.80+): install via [rustup](https://rustup.rs/)
- **Rust**: install via [rustup](https://rustup.rs/)
```
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
```
The exact toolchain version is pinned in `rust-toolchain.toml` at the workspace root.
`rustup` reads this file automatically — no manual version selection needed.
- **Git**

## Python bindings (`merutable-python`)
Expand Down Expand Up @@ -131,3 +133,31 @@ modules with `pub` visibility (a follow-up sweep tightens to
## Adding a dependency

All dependency versions are pinned in the workspace root `Cargo.toml` under `[workspace.dependencies]`. Individual crates reference them with `{ workspace = true }`. Never add version specs in crate-level `Cargo.toml` files.

## MSRV (Minimum Supported Rust Version)

The MSRV is set via `rust-version` in `[workspace.package]` in the root `Cargo.toml`.

### Finding the MSRV

```bash
cargo install cargo-msrv
cargo msrv find --min <version>
```

### Verifying after a dependency upgrade

After bumping a dependency version, check that it still builds with the
MSRV. The `+<version>` syntax tells cargo to use a specific toolchain
instead of the default:

```bash
# Install the MSRV toolchain (one-time)
rustup install <msrv>

# Build against it
cargo +<msrv> check
```

If the build fails, either pin the dependency to its last MSRV-compatible
version or bump `rust-version` in the workspace root.
3 changes: 2 additions & 1 deletion crates/merutable-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[package]
name = "merutable-python"
version = "0.1.0"
edition = "2021"
edition = { workspace = true }
rust-version = { workspace = true }
description = "Python bindings for merutable via PyO3"
license = "Apache-2.0"
publish = false
Expand Down
5 changes: 4 additions & 1 deletion crates/merutable-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
// PyO3's #[pymethods] proc macro generates internal `Into<PyErr>` conversions
// that clippy flags as useless. These are not in our code.
#![allow(clippy::useless_conversion)]
// PyO3 0.22 macros emit bare unsafe ops inside unsafe fns, which edition 2024
// flags as `unsafe_op_in_unsafe_fn`. Suppress until PyO3 upgrade (issue #77).
#![allow(unsafe_op_in_unsafe_fn)]

mod convert;

use std::sync::Arc;

use ::merutable::types::schema::{ColumnDef, TableSchema};
use ::merutable::MeruDB as RustMeruDB;
use ::merutable::types::schema::{ColumnDef, TableSchema};
use pyo3::{prelude::*, types::PyDict};

/// Python-visible MeruDB wrapper.
Expand Down
3 changes: 2 additions & 1 deletion crates/merutable/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
[package]
name = "merutable"
version = "0.0.1"
edition = "2021"
edition = { workspace = true }
rust-version = { workspace = true }
description = "Embeddable single-table engine: row + columnar Parquet with Iceberg-compatible metadata"
keywords = ["embedded", "lsm", "iceberg", "parquet", "table"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/benches/engine_compaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{Criterion, black_box, criterion_group, criterion_main};

fn bench_compaction_placeholder(c: &mut Criterion) {
c.bench_function("compaction_noop", |b| {
Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/benches/memtable_memtable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{Criterion, black_box, criterion_group, criterion_main};
use merutable::memtable::memtable::Memtable;
use merutable::types::sequence::SeqNum;
use merutable::wal::batch::WriteBatch;
Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/benches/parquet_bloom.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{Criterion, black_box, criterion_group, criterion_main};
use merutable::parquet::bloom::FastLocalBloom;

fn bench_bloom_add_and_probe(c: &mut Criterion) {
Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ use std::sync::Arc;

use crate::engine::{background::BackgroundWorkers, config::EngineConfig, engine::MeruEngine};
use crate::types::{
Result,
key::InternalKey,
schema::TableSchema,
sequence::SeqNum,
value::{FieldValue, Row},
Result,
};

use crate::options::OpenOptions;
Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/src/engine/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
//! The `AtomicBool` is checked at the top of each loop iteration and
//! is set by `shutdown()` BEFORE notifying; a worker cannot miss both.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use tokio::sync::Notify;
use tracing::{debug, info, warn};
Expand Down
14 changes: 7 additions & 7 deletions crates/merutable/src/engine/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
//! held only for the duration of a hash-map get/put — no I/O under the lock.

use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};

use crate::types::{sequence::OpType, value::Row};
use lru::LruCache;
Expand Down Expand Up @@ -228,12 +228,12 @@ mod tests {
cache.insert(b"k".to_vec(), make_entry(1));

// Reader captures generation, then the cache is invalidated.
let gen = cache.snapshot_generation();
let generation = cache.snapshot_generation();
cache.invalidate(b"k");
assert!(cache.get(b"k").is_none());

// Reader tries to reinstall the (now-stale) value. Must be dropped.
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), gen);
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), generation);
assert!(
cache.get(b"k").is_none(),
"insert_if_fresh must drop when generation advanced"
Expand All @@ -243,17 +243,17 @@ mod tests {
#[test]
fn insert_if_fresh_rejects_after_clear() {
let cache = RowCache::new(10);
let gen = cache.snapshot_generation();
let generation = cache.snapshot_generation();
cache.clear();
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), gen);
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), generation);
assert!(cache.get(b"k").is_none());
}

#[test]
fn insert_if_fresh_succeeds_without_races() {
let cache = RowCache::new(10);
let gen = cache.snapshot_generation();
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), gen);
let generation = cache.snapshot_generation();
cache.insert_if_fresh(b"k".to_vec(), make_entry(42), generation);
assert!(cache.get(b"k").is_some());
}

Expand Down
2 changes: 1 addition & 1 deletion crates/merutable/src/engine/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! unambiguous regardless of column count, we prepend a single `0x01` marker
//! byte to postcard output.

use crate::types::{value::Row, MeruError, Result};
use crate::types::{MeruError, Result, value::Row};

/// Marker byte prepended to postcard-encoded rows.
/// JSON never starts with 0x01, so this is a reliable discriminator.
Expand Down
18 changes: 9 additions & 9 deletions crates/merutable/src/engine/compaction/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ impl CompactionIterator {

for entry in all {
let uk = entry.ikey.user_key_bytes().to_vec();
if let Some(ref last) = last_uk {
if *last == uk {
// Older version of the same key — only keep if an
// active reader might need it.
if entry.ikey.seq >= oldest_snapshot_seq {
deduped.push(entry);
}
continue;
if let Some(ref last) = last_uk
&& *last == uk
{
// Older version of the same key — only keep if an
// active reader might need it.
if entry.ikey.seq >= oldest_snapshot_seq {
deduped.push(entry);
}
continue;
}
// New user key — this is the latest version (always kept).
last_uk = Some(uk);
Expand Down Expand Up @@ -267,7 +267,7 @@ mod tests {
let iter = CompactionIterator::new(fe, SeqNum(100), false);
let results: Vec<_> = iter.collect();
assert_eq!(results.len(), 3); // keys 1, 2, 3
// Key 1 should come from file 0 (seq=10, newer).
// Key 1 should come from file 0 (seq=10, newer).
assert_eq!(results[0].ikey.seq, SeqNum(10));
assert_eq!(results[0].source_file_idx, 0);
}
Expand Down
12 changes: 6 additions & 6 deletions crates/merutable/src/engine/compaction/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

use std::{path::Path, sync::Arc};

use crate::iceberg::{version::DataFileMeta, DeletionVector, IcebergDataFile, SnapshotTransaction};
use crate::iceberg::{DeletionVector, IcebergDataFile, SnapshotTransaction, version::DataFileMeta};
use crate::parquet::reader::ParquetReader;
use crate::types::{
MeruError, Result,
level::Level,
level::ParquetFileMeta,
schema::TableSchema,
value::{FieldValue, Row},
MeruError, Result,
};
use bytes::Bytes;
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -561,10 +561,10 @@ async fn run_one_compaction_job(engine: &Arc<MeruEngine>) -> Result<bool> {

// IMP-19: fsync the data directory so the new file's directory
// entry is durable before the manifest commit.
if let Some(parent) = full_path.parent() {
if let Ok(dir) = tokio::fs::File::open(parent).await {
let _ = dir.sync_all().await;
}
if let Some(parent) = full_path.parent()
&& let Ok(dir) = tokio::fs::File::open(parent).await
{
let _ = dir.sync_all().await;
}

let meta = ParquetFileMeta {
Expand Down
46 changes: 22 additions & 24 deletions crates/merutable/src/engine/engine.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
//! `MeruEngine`: central orchestrator. Owns WAL, memtable, version set, catalog,
//! and background workers. All public operations go through this struct.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use crate::iceberg::{IcebergCatalog, VersionSet};
use crate::memtable::manager::MemtableManager;
use crate::types::{
MeruError, Result,
key::InternalKey,
schema::TableSchema,
sequence::{GlobalSeq, OpType, SeqNum},
value::{FieldValue, Row},
MeruError, Result,
};
use crate::wal::{batch::WriteBatch, manager::WalManager};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -558,27 +558,25 @@ impl MeruEngine {
// see the same stale `should_flush=true` during a burst — serialize
// rotation through `rotation_lock` and re-check under the lock so
// only one task actually seals and spawns a flush.
if should_flush {
if let Ok(_guard) = self.rotation_lock.try_lock() {
// Stale should_flush from another task's apply_batch? If the
// active memtable was already rotated out from under us, the
// new active is small and we have nothing to do.
if self.memtable.active_should_flush() {
let next_seq = self.global_seq.current().next();
self.memtable.rotate(next_seq);
// Rotate the WAL as well so the sealed memtable's
// writes live in a closed log that GC can reclaim.
{
let mut wal = self.wal.lock().await;
wal.rotate()?;
}
let engine = Arc::clone(self);
tokio::spawn(async move {
if let Err(e) = crate::engine::flush::run_flush(&engine).await {
tracing::error!(error = %e, "auto-flush failed");
}
});
if should_flush && let Ok(_guard) = self.rotation_lock.try_lock() {
// Stale should_flush from another task's apply_batch? If the
// active memtable was already rotated out from under us, the
// new active is small and we have nothing to do.
if self.memtable.active_should_flush() {
let next_seq = self.global_seq.current().next();
self.memtable.rotate(next_seq);
// Rotate the WAL as well so the sealed memtable's
// writes live in a closed log that GC can reclaim.
{
let mut wal = self.wal.lock().await;
wal.rotate()?;
}
let engine = Arc::clone(self);
tokio::spawn(async move {
if let Err(e) = crate::engine::flush::run_flush(&engine).await {
tracing::error!(error = %e, "auto-flush failed");
}
});
}
}

Expand Down Expand Up @@ -649,8 +647,8 @@ impl MeruEngine {
wal.rotate()?;
}
} // rotation_lock dropped
// Flush all immutables. `run_flush` calls `mark_flushed_seq` which
// GCs the matching closed WAL file as a side effect.
// Flush all immutables. `run_flush` calls `mark_flushed_seq` which
// GCs the matching closed WAL file as a side effect.
while self.memtable.oldest_immutable().is_some() {
crate::engine::flush::run_flush(self).await?;
}
Expand Down
10 changes: 5 additions & 5 deletions crates/merutable/src/engine/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::sync::Arc;
use crate::iceberg::{IcebergDataFile, SnapshotTransaction};
use crate::memtable::iterator::MemEntry;
use crate::types::{
MeruError, Result,
key::InternalKey,
level::{Level, ParquetFileMeta},
sequence::SeqNum,
value::Row,
MeruError, Result,
};
use tracing::{debug, info, instrument};

Expand Down Expand Up @@ -197,10 +197,10 @@ pub async fn run_flush(engine: &Arc<MeruEngine>) -> Result<()> {
// IMP-19: fsync the data directory so the directory entry for the
// new file is durable. POSIX: fsync on a file syncs data+metadata
// of the file itself but NOT the directory containing the link.
if let Some(parent) = full_path.parent() {
if let Ok(dir) = tokio::fs::File::open(parent).await {
let _ = dir.sync_all().await;
}
if let Some(parent) = full_path.parent()
&& let Ok(dir) = tokio::fs::File::open(parent).await
{
let _ = dir.sync_all().await;
}

parquet_bytes.len() as u64
Expand Down
Loading
Loading