diff --git a/CHANGELOG.md b/CHANGELOG.md index 395321106..2d8eb3d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking Changes +- `BlockHashByTxId` now returns `(*chainhash.Hash, wire.TxLoc, error)`; + callers that only need the hash use `bh, _, err :=` + ([#1052](https://github.com/hemilabs/heminetwork/pull/1052)). - Rename `TBC_BLOCKHEADER_CACHE_SIZE` environment variable to `TBC_HEADER_CACHE_SIZE` ([#1034](https://github.com/hemilabs/heminetwork/pull/1034)). @@ -30,6 +33,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `BlockRawByHash` to DB interface and `lazyBlock` type for zero-copy per-tx block access without full deserialization ([#1051](https://github.com/hemilabs/heminetwork/pull/1051)). +- Store tx byte location (`TxLoc`) in tx index `'t'` entry values for + O(1) tx lookup; DB version 5 → 6 + ([#1052](https://github.com/hemilabs/heminetwork/pull/1052)). - Add generic `lru` package with cost-based LRU cache (`lru.Cache[K,V]`) ([#1034](https://github.com/hemilabs/heminetwork/pull/1034)). - Add utxo read LRU cache (`TBC_UTXO_READ_CACHE_SIZE`) to reduce LevelDB @@ -60,6 +66,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `BlockTxUpdate` uses stack-allocated reusable buffers instead of slicing + loop variables, avoiding potential data integrity issues + ([#1052](https://github.com/hemilabs/heminetwork/pull/1052), + [#1050](https://github.com/hemilabs/heminetwork/issues/1050)). - Replace block and header caches in level package with generic `lru.Cache[K,V]` ([#1034](https://github.com/hemilabs/heminetwork/pull/1034)). diff --git a/cmd/hemictl/hemictl.go b/cmd/hemictl/hemictl.go index 1e226de47..bf9ac1586 100644 --- a/cmd/hemictl/hemictl.go +++ b/cmd/hemictl/hemictl.go @@ -619,7 +619,7 @@ func tbcdb(pctx context.Context, flags []string) error { return fmt.Errorf("chainhash: %w", err) } - bh, err := s.BlockHashByTxId(ctx, *chtxid) + bh, _, err := s.BlockHashByTxId(ctx, *chtxid) if err != nil { return fmt.Errorf("block by txid: %w", err) } diff --git a/database/tbcd/database.go b/database/tbcd/database.go index 42ff13fa8..e9971c989 100644 --- a/database/tbcd/database.go +++ b/database/tbcd/database.go @@ -121,7 +121,7 @@ type Database interface { BlockHeaderByTxIndex(ctx context.Context) (*BlockHeader, error) BlockUtxoUpdate(ctx context.Context, direction int, utxos map[Outpoint]CacheOutput, utxoIndexHash chainhash.Hash) error BlockTxUpdate(ctx context.Context, direction int, txs map[TxKey]*TxValue, txIndexHash chainhash.Hash) error - BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, error) + BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, wire.TxLoc, error) SpentOutputsByTxId(ctx context.Context, txId chainhash.Hash) ([]SpentInfo, error) // ScriptHash returns the sha256 of PkScript for the provided outpoint. BalanceByScriptHash(ctx context.Context, sh ScriptHash) (uint64, error) @@ -481,6 +481,20 @@ func NewTxMapping(txId, blockHash *chainhash.Hash) (txKey TxKey) { return txKey } +// NewTxMappingWithLoc returns a TxKey and TxValue that maps a tx id to a block +// hash with the tx byte location (offset + length) within the raw block. This +// allows callers to jump directly to the tx's bytes without scanning the block. +func NewTxMappingWithLoc(txId, blockHash *chainhash.Hash, loc wire.TxLoc) (txKey TxKey, txValue TxValue) { + txKey[0] = 't' + copy(txKey[1:33], txId[:]) + copy(txKey[33:], blockHash[:]) + + binary.BigEndian.PutUint32(txValue[0:4], uint32(loc.TxStart)) + binary.BigEndian.PutUint32(txValue[4:8], uint32(loc.TxLen)) + + return txKey, txValue +} + func TxIdBlockHashFromTxKey(txKey TxKey) (*chainhash.Hash, *chainhash.Hash, error) { if txKey[0] != 't' { return nil, nil, fmt.Errorf("invalid magic 0x%02x", txKey[0]) diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index 3339171ca..216b46f5b 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -48,7 +48,7 @@ import ( // UTXOs const ( - ldbVersion = 5 + ldbVersion = 6 logLevel = "INFO" verbose = false @@ -312,6 +312,10 @@ func New(ctx context.Context, cfg *Config) (*ldb, error) { // Upgrade to v5: wipe witness-stripped block // bodies and rebuild blocksmissing from headers. err = l.v5(ctx) + case 5: + // Upgrade to v6: wipe tx index so it rebuilds + // with TxLoc values in 't' entries. + err = l.v6(ctx) default: if ldbVersion == dbVersion { if Welcome { @@ -1628,36 +1632,38 @@ func (l *ldb) BlockExistsByHash(ctx context.Context, hash chainhash.Hash) (bool, return ok, nil } -func (l *ldb) BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, error) { +func (l *ldb) BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, wire.TxLoc, error) { log.Tracef("BlockHashByTxId") defer log.Tracef("BlockHashByTxId exit") - blocks := make([]*chainhash.Hash, 0, 2) txDB := l.pool[level.TransactionsDB] - var txid [33]byte - txid[0] = 't' - copy(txid[1:], txId[:]) - it := txDB.NewIterator(util.BytesPrefix(txid[:]), nil) + var prefix [33]byte + prefix[0] = 't' + copy(prefix[1:], txId[:]) + it := txDB.NewIterator(util.BytesPrefix(prefix[:]), nil) defer it.Release() + + var found bool + var blockHash chainhash.Hash + var loc wire.TxLoc for it.Next() { - block, err := chainhash.NewHash(it.Key()[33:]) - if err != nil { - return nil, err + if found { + panic(fmt.Sprintf("multiple blocks for tx %v", txId)) + } + copy(blockHash[:], it.Key()[33:]) + if v := it.Value(); len(v) >= 8 { + loc.TxStart = int(binary.BigEndian.Uint32(v[0:4])) + loc.TxLen = int(binary.BigEndian.Uint32(v[4:8])) } - blocks = append(blocks, block) + found = true } if err := it.Error(); err != nil { - return nil, fmt.Errorf("blocks by id iterator: %w", err) + return nil, wire.TxLoc{}, fmt.Errorf("blocks by id iterator: %w", err) } - switch len(blocks) { - case 0: - return nil, database.NotFoundError(fmt.Sprintf("tx not found: %v", txId)) - case 1: - return blocks[0], nil - default: - panic(fmt.Sprintf("invalid blocks count %v: %v", - len(blocks), spew.Sdump(blocks))) + if !found { + return nil, wire.TxLoc{}, database.NotFoundError(fmt.Sprintf("tx not found: %v", txId)) } + return &blockHash, loc, nil } func (l *ldb) SpentOutputsByTxId(ctx context.Context, txId chainhash.Hash) ([]tbcd.SpentInfo, error) { @@ -1903,32 +1909,39 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxK } defer txsDiscard() - block := make([]byte, 33) - block[0] = 'b' var blk []byte bm := make(map[string]struct{}, len(txs)) defer clear(bm) txsBatch := new(leveldb.Batch) + var keyBuf [69]byte + var valBuf [36]byte + var blkBuf [33]byte + blkBuf[0] = 'b' for k, v := range txs { - // cache is being emptied so we can slice it here. var key, value []byte switch k[0] { case 't': - key = k[0:65] - value = nil + copy(keyBuf[:], k[0:65]) + key = keyBuf[:65] + if v != nil { + copy(valBuf[:], v[0:8]) + value = valBuf[:8] + } // insert block hash to determine if it was indexed later if _, ok := bm[string(k[33:65])]; !ok { bm[string(k[33:65])] = struct{}{} - copy(block[1:], k[33:65]) - blk = block + copy(blkBuf[1:], k[33:65]) + blk = blkBuf[:] } else { blk = nil } case 's': - key = k[:] - value = v[:] + copy(keyBuf[:], k[:]) + key = keyBuf[:] + copy(valBuf[:], v[:]) + value = valBuf[:] // don't insert block blk = nil diff --git a/database/tbcd/level/level_test.go b/database/tbcd/level/level_test.go index 4e8d7db7a..1c2d57783 100644 --- a/database/tbcd/level/level_test.go +++ b/database/tbcd/level/level_test.go @@ -905,10 +905,8 @@ func TestDbUpgradeV5(t *testing.T) { // contents v5 must not touch. The keys are deliberately // unique so a post-upgrade Get confirms exact preservation. survivors := map[string][2][]byte{ - level.TransactionsDB: { - []byte("v5-test-tx-key"), - []byte("v5-test-tx-value"), - }, + // TransactionsDB is intentionally absent: v6 upgrade wipes + // the transactions index for rebuild with TxLoc values. level.OutputsDB: { []byte("v5-test-utxo-key"), []byte("v5-test-utxo-value"), @@ -1036,13 +1034,13 @@ func TestDbUpgradeV5(t *testing.T) { } } - // Assertion 5: schema version is now 5. + // Assertion 5: schema version is now 6 (v5 + v6 upgrade). got, err := db2.Version(ctx) if err != nil { t.Fatal(err) } - if got != 5 { - t.Fatalf("version after v5: got %v want 5", got) + if got != 6 { + t.Fatalf("version after upgrade: got %v want 6", got) } } @@ -1647,3 +1645,483 @@ func TestDbUpgradeV5Errors(t *testing.T) { }) } } + +// createTestBlockWithTxs builds a block with a coinbase and n additional +// dummy transactions. The block is serialized and re-parsed so that +// TxLoc() returns valid byte offsets. +func createTestBlockWithTxs(t *testing.T, prevHash *chainhash.Hash, nonce int64, numTxs int) *btcutil.Block { + t.Helper() + bh := wire.NewBlockHeader(0, prevHash, &chainhash.Hash{}, 0, uint32(nonce)) + bh.Timestamp = time.Unix(nonce, 0) + bh.Bits = 0x1d00ffff + block := wire.NewMsgBlock(bh) + + // Coinbase + cb := wire.NewMsgTx(2) + cb.AddTxIn(&wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0xffffffff, + }, + SignatureScript: []byte{0x04, 0xff, 0xff, 0x00, 0x1d}, + }) + cb.AddTxOut(&wire.TxOut{Value: 5000000000, PkScript: []byte{0x51}}) + if err := block.AddTransaction(cb); err != nil { + t.Fatal(err) + } + + // Additional dummy txs with varying output counts + for i := range numTxs { + tx := wire.NewMsgTx(2) + prevHash := chainhash.DoubleHashB([]byte(fmt.Sprintf("prev-%d-%d", nonce, i))) + var ph chainhash.Hash + copy(ph[:], prevHash) + tx.AddTxIn(&wire.TxIn{ + PreviousOutPoint: wire.OutPoint{Hash: ph, Index: 0}, + SignatureScript: []byte{0x00}, + }) + // Each tx has i+1 outputs with different values + for j := range i + 1 { + tx.AddTxOut(&wire.TxOut{ + Value: int64((i+1)*1000 + j), + PkScript: []byte{0x51}, + }) + } + if err := block.AddTransaction(tx); err != nil { + t.Fatal(err) + } + } + + // Serialize and re-parse so TxLoc works + var buf bytes.Buffer + if err := block.Serialize(&buf); err != nil { + t.Fatal(err) + } + parsed, err := btcutil.NewBlockFromBytes(buf.Bytes()) + if err != nil { + t.Fatal(err) + } + return parsed +} + +// TestDbUpgradeV6 validates the v5 -> v6 upgrade that wipes the +// transactions index for rebuild with TxLoc values. +func TestDbUpgradeV6(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + home := t.TempDir() + cfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + + // Create a v5 database with some tx index entries. + db, err := New(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + // Seed some 't' entries directly into TransactionsDB. + txDB := db.pool[level.TransactionsDB] + for i := range 10 { + var k [65]byte + k[0] = 't' + k[1] = byte(i) + if err := txDB.Put(k[:], nil, nil); err != nil { + t.Fatalf("seed tx %d: %v", i, err) + } + } + + // Verify entries exist + it := txDB.NewIterator(nil, nil) + var preCount int + for it.Next() { + preCount++ + } + it.Release() + if preCount < 10 { + t.Fatalf("pre-upgrade: expected >= 10 entries, got %d", preCount) + } + + if err := db.Close(); err != nil { + t.Fatal(err) + } + + // Downgrade version to 5 so v6 upgrade runs on next open. + // Use upgradeOpen to skip the upgrade loop during this open. + lcfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + lcfg.SetUpgradeOpen(true) + db2, err := New(ctx, lcfg) + if err != nil { + t.Fatal(err) + } + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, 5) + if err := db2.MetadataPut(ctx, versionKey, v); err != nil { + t.Fatal(err) + } + if err := db2.Close(); err != nil { + t.Fatal(err) + } + + // Reopen WITHOUT upgradeOpen — this triggers the v5->v6 upgrade. + normalCfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + db3, err := New(ctx, normalCfg) + if err != nil { + t.Fatal(err) + } + defer db3.Close() + + // Verify version is 6 + got, err := db3.Version(ctx) + if err != nil { + t.Fatal(err) + } + if got != 6 { + t.Fatalf("version after v6 upgrade: got %d want 6", got) + } + + // Verify tx index is empty (wiped) + txDB2 := db3.pool[level.TransactionsDB] + it2 := txDB2.NewIterator(nil, nil) + var postCount int + for it2.Next() { + postCount++ + } + it2.Release() + if postCount != 0 { + t.Fatalf("post-upgrade: expected 0 tx entries, got %d", postCount) + } +} + +// TestTxLocRoundTrip verifies that TxLoc values stored via +// BlockTxUpdate are correctly retrieved via BlockHashByTxId. +func TestTxLocRoundTrip(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + db, discard := createNewDB(t, ctx) + defer discard() + + // Create a block with 5 transactions + block := createTestBlockWithTxs(t, &chainhash.Hash{}, 1, 5) + blockHash := block.Hash() + + // Get TxLoc from the block + locs, err := block.TxLoc() + if err != nil { + t.Fatal(err) + } + txs := block.Transactions() + if len(txs) != 6 { // 1 coinbase + 5 regular + t.Fatalf("expected 6 txs, got %d", len(txs)) + } + if len(locs) != len(txs) { + t.Fatalf("TxLoc count %d != tx count %d", len(locs), len(txs)) + } + + // Build cache with TxLoc values + cache := make(map[tbcd.TxKey]*tbcd.TxValue) + for i, tx := range txs { + txk, txv := tbcd.NewTxMappingWithLoc(tx.Hash(), blockHash, locs[i]) + cache[txk] = &txv + } + + // Commit + if err := db.BlockTxUpdate(ctx, 1, cache, *blockHash); err != nil { + t.Fatal(err) + } + + // Read back and verify each TxLoc + for i, tx := range txs { + bh, loc, err := db.BlockHashByTxId(ctx, *tx.Hash()) + if err != nil { + t.Fatalf("tx %d: %v", i, err) + } + if !bh.IsEqual(blockHash) { + t.Fatalf("tx %d: block hash %v != %v", i, bh, blockHash) + } + if loc.TxStart != locs[i].TxStart { + t.Fatalf("tx %d: TxStart %d != %d", i, loc.TxStart, locs[i].TxStart) + } + if loc.TxLen != locs[i].TxLen { + t.Fatalf("tx %d: TxLen %d != %d", i, loc.TxLen, locs[i].TxLen) + } + } +} + +// TestTxLocOffsetCorrectness verifies that the stored TxLoc offset +// actually points to the correct tx bytes within the raw block. +// The tx deserialized from those bytes must have the same txid. +func TestTxLocOffsetCorrectness(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + db, discard := createNewDB(t, ctx) + defer discard() + + // Create a block with 10 transactions (varying output counts) + block := createTestBlockWithTxs(t, &chainhash.Hash{}, 42, 10) + blockHash := block.Hash() + + // Store raw block bytes directly (bypass BlockInsert which needs headers) + raw, err := block.Bytes() + if err != nil { + t.Fatal(err) + } + bDB := db.rawPool[level.BlocksDB] + if err := bDB.Insert(blockHash[:], raw); err != nil { + t.Fatal(err) + } + + // Get TxLoc and build cache + locs, err := block.TxLoc() + if err != nil { + t.Fatal(err) + } + txs := block.Transactions() + cache := make(map[tbcd.TxKey]*tbcd.TxValue) + for i, tx := range txs { + txk, txv := tbcd.NewTxMappingWithLoc(tx.Hash(), blockHash, locs[i]) + cache[txk] = &txv + } + if err := db.BlockTxUpdate(ctx, 1, cache, *blockHash); err != nil { + t.Fatal(err) + } + + // Get raw block bytes via DB + rawFromDB, err := db.BlockRawByHash(ctx, *blockHash) + if err != nil { + t.Fatal(err) + } + + // For each tx, use the stored offset to extract bytes and verify txid + for i, tx := range txs { + _, loc, err := db.BlockHashByTxId(ctx, *tx.Hash()) + if err != nil { + t.Fatalf("tx %d lookup: %v", i, err) + } + + // Extract tx bytes using stored offset + if loc.TxStart+loc.TxLen > len(rawFromDB) { + t.Fatalf("tx %d: offset %d+%d exceeds block size %d", + i, loc.TxStart, loc.TxLen, len(rawFromDB)) + } + txBytes := rawFromDB[loc.TxStart : loc.TxStart+loc.TxLen] + + // Deserialize and verify txid matches + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(txBytes)); err != nil { + t.Fatalf("tx %d: deserialize from offset: %v", i, err) + } + gotHash := msgTx.TxHash() + if gotHash != *tx.Hash() { + t.Fatalf("tx %d: hash from offset %v != expected %v", + i, gotHash, tx.Hash()) + } + + // Verify output values match + for j, out := range tx.MsgTx().TxOut { + if msgTx.TxOut[j].Value != out.Value { + t.Fatalf("tx %d output %d: value %d != %d", + i, j, msgTx.TxOut[j].Value, out.Value) + } + } + } +} + +// TestTxLocLegacyNilValue verifies that BlockHashByTxId +// handles legacy nil-value 't' entries gracefully (returns zero TxLoc). +func TestTxLocLegacyNilValue(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + db, discard := createNewDB(t, ctx) + defer discard() + + // Insert a 't' entry with nil value (legacy format) + block := createTestBlockWithTxs(t, &chainhash.Hash{}, 1, 1) + blockHash := block.Hash() + txs := block.Transactions() + + cache := make(map[tbcd.TxKey]*tbcd.TxValue) + cache[tbcd.NewTxMapping(txs[0].Hash(), blockHash)] = nil // legacy nil + + if err := db.BlockTxUpdate(ctx, 1, cache, *blockHash); err != nil { + t.Fatal(err) + } + + bh, loc, err := db.BlockHashByTxId(ctx, *txs[0].Hash()) + if err != nil { + t.Fatalf("lookup: %v", err) + } + if !bh.IsEqual(blockHash) { + t.Fatalf("block hash %v != %v", bh, blockHash) + } + // Legacy entry: TxLoc should be zero + if loc.TxStart != 0 || loc.TxLen != 0 { + t.Fatalf("legacy entry should have zero TxLoc, got start=%d len=%d", + loc.TxStart, loc.TxLen) + } +} + +// TestDbUpgradeV6E2E exercises the full upgrade lifecycle: +// v5 DB with legacy nil-value 't' entries → v6 upgrade (wipes index) → +// re-populate with TxLoc entries → verify TxLoc offset correctness +// through BlockHashByTxId. +func TestDbUpgradeV6E2E(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + home := t.TempDir() + cfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + + // Phase 1: Create a v5 DB with blocks and legacy nil-value tx index. + db, err := New(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + block := createTestBlockWithTxs(t, &chainhash.Hash{}, 1, 5) + blockHash := block.Hash() + txs := block.Transactions() + + // Store raw block so we can verify offset correctness later. + raw, err := block.Bytes() + if err != nil { + t.Fatal(err) + } + bDB := db.rawPool[level.BlocksDB] + if err := bDB.Insert(blockHash[:], raw); err != nil { + t.Fatal(err) + } + + // Store legacy nil-value tx index entries (v5 format). + legacyCache := make(map[tbcd.TxKey]*tbcd.TxValue) + for _, tx := range txs { + legacyCache[tbcd.NewTxMapping(tx.Hash(), blockHash)] = nil + } + if err := db.BlockTxUpdate(ctx, 1, legacyCache, *blockHash); err != nil { + t.Fatal(err) + } + + // Verify legacy entries return zero TxLoc. + for i, tx := range txs { + _, loc, err := db.BlockHashByTxId(ctx, *tx.Hash()) + if err != nil { + t.Fatalf("pre-upgrade tx %d: %v", i, err) + } + if loc.TxStart != 0 || loc.TxLen != 0 { + t.Fatalf("pre-upgrade tx %d: expected zero TxLoc, got start=%d len=%d", + i, loc.TxStart, loc.TxLen) + } + } + if err := db.Close(); err != nil { + t.Fatal(err) + } + + // Phase 2: Downgrade to v5 and reopen (triggers v6 upgrade). + downCfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + downCfg.SetUpgradeOpen(true) + db2, err := New(ctx, downCfg) + if err != nil { + t.Fatal(err) + } + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, 5) + if err := db2.MetadataPut(ctx, versionKey, v); err != nil { + t.Fatal(err) + } + if err := db2.Close(); err != nil { + t.Fatal(err) + } + + // Reopen normally — triggers v5→v6 upgrade (wipes tx index). + normalCfg, err := NewConfig("testnet3", home, "", "") + if err != nil { + t.Fatal(err) + } + db3, err := New(ctx, normalCfg) + if err != nil { + t.Fatal(err) + } + + // Verify version is 6. + got, err := db3.Version(ctx) + if err != nil { + t.Fatal(err) + } + if got != 6 { + t.Fatalf("version after upgrade: got %d want 6", got) + } + + // Verify tx index is empty after upgrade. + for _, tx := range txs { + _, _, err := db3.BlockHashByTxId(ctx, *tx.Hash()) + if err == nil { + t.Fatal("expected error for wiped tx index entry") + } + } + + // Phase 3: Re-populate with TxLoc entries (simulates re-index). + locs, err := block.TxLoc() + if err != nil { + t.Fatal(err) + } + newCache := make(map[tbcd.TxKey]*tbcd.TxValue) + for i, tx := range txs { + txk, txv := tbcd.NewTxMappingWithLoc(tx.Hash(), blockHash, locs[i]) + newCache[txk] = &txv + } + if err := db3.BlockTxUpdate(ctx, 1, newCache, *blockHash); err != nil { + t.Fatal(err) + } + + // Phase 4: Verify TxLoc offset correctness. + rawFromDB, err := db3.BlockRawByHash(ctx, *blockHash) + if err != nil { + t.Fatal(err) + } + for i, tx := range txs { + bh, loc, err := db3.BlockHashByTxId(ctx, *tx.Hash()) + if err != nil { + t.Fatalf("post-reindex tx %d: %v", i, err) + } + if !bh.IsEqual(blockHash) { + t.Fatalf("tx %d: block hash mismatch", i) + } + if loc.TxStart == 0 && loc.TxLen == 0 { + t.Fatalf("tx %d: TxLoc is zero after re-index", i) + } + // Extract tx bytes using stored offset and verify txid. + if loc.TxStart+loc.TxLen > len(rawFromDB) { + t.Fatalf("tx %d: offset %d+%d exceeds block size %d", + i, loc.TxStart, loc.TxLen, len(rawFromDB)) + } + txBytes := rawFromDB[loc.TxStart : loc.TxStart+loc.TxLen] + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(txBytes)); err != nil { + t.Fatalf("tx %d: deserialize from offset: %v", i, err) + } + gotHash := msgTx.TxHash() + if gotHash != *tx.Hash() { + t.Fatalf("tx %d: hash from offset %v != expected %v", + i, gotHash, tx.Hash()) + } + } + + db3.Close() +} diff --git a/database/tbcd/level/upgrade.go b/database/tbcd/level/upgrade.go index b369bb39e..930242cbd 100644 --- a/database/tbcd/level/upgrade.go +++ b/database/tbcd/level/upgrade.go @@ -750,3 +750,42 @@ func (l *ldb) v5(ctx context.Context) error { binary.BigEndian.PutUint64(v, 5) return l.MetadataPut(ctx, versionKey, v) } + +func (l *ldb) v6(ctx context.Context) error { + log.Tracef("v6") + defer log.Tracef("v6 exit") + + log.Infof("Upgrading database from v5 to v6") + + // Wipe the transactions index so it rebuilds with TxLoc values + // in 't' entries. The index is fully derived from block data. + lcfg := l.Config() + if err := l.Close(); err != nil { + return fmt.Errorf("close database: %w", err) + } + home := filepath.Clean(l.cfg.Home) + target := filepath.Join(home, level.TransactionsDB) + if !strings.HasPrefix(target, home+string(os.PathSeparator)) { + return fmt.Errorf("refusing to remove %q: not a child of %q", + target, home) + } + if fi, err := os.Lstat(target); err == nil && fi.Mode()&os.ModeSymlink != 0 { + return fmt.Errorf("refusing to remove %q: is a symlink", target) + } + if err := os.RemoveAll(target); err != nil { + return fmt.Errorf("remove transactions: %w", err) + } + ld, err := level.New(ctx, &lcfg) + if err != nil { + return fmt.Errorf("reopen database: %w", err) + } + l.Database = ld + l.pool = ld.DB() + l.rawPool = ld.RawDB() + + log.Infof("v6: wiped transactions index for rebuild with TxLoc values") + + v := make([]byte, 8) + binary.BigEndian.PutUint64(v, 6) + return l.MetadataPut(ctx, versionKey, v) +} diff --git a/service/tbc/cpfp_test.go b/service/tbc/cpfp_test.go index e998aa7e8..0f0b3868e 100644 --- a/service/tbc/cpfp_test.go +++ b/service/tbc/cpfp_test.go @@ -28,8 +28,8 @@ func (stubDB) Close() error { return nil } // BlockHashByTxId is the first thing txOutFromOutPoint calls. // Returning an error simulates "parent not in block db", which // is the trigger for the CPFP mempool fallback. -func (stubDB) BlockHashByTxId(context.Context, chainhash.Hash) (*chainhash.Hash, error) { - return nil, errors.New("not found") +func (stubDB) BlockHashByTxId(context.Context, chainhash.Hash) (*chainhash.Hash, wire.TxLoc, error) { + return nil, wire.TxLoc{}, errors.New("not found") } // The remaining methods satisfy the tbcd.Database interface but diff --git a/service/tbc/rpc.go b/service/tbc/rpc.go index 7a673d420..9ff80dd38 100644 --- a/service/tbc/rpc.go +++ b/service/tbc/rpc.go @@ -738,7 +738,7 @@ func (s *Server) handleBlockHashByTxIdRequest(ctx context.Context, req *tbcapi.B log.Tracef("handleBlockHashByTxIdRequest") defer log.Tracef("handleBlockHashByTxIdRequest exit") - hash, err := s.BlockHashByTxId(ctx, req.TxID) + hash, _, err := s.BlockHashByTxId(ctx, req.TxID) if err != nil { if errors.Is(err, database.ErrNotFound) { return &tbcapi.BlockHashByTxIDResponse{ diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index 345152d2f..961617a97 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -2339,12 +2339,12 @@ func (s *Server) BlockInTxIndex(ctx context.Context, blkid chainhash.Hash) (bool return s.g.db.BlockInTxIndex(ctx, blkid) } -func (s *Server) BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, error) { +func (s *Server) BlockHashByTxId(ctx context.Context, txId chainhash.Hash) (*chainhash.Hash, wire.TxLoc, error) { log.Tracef("BlockHashByTxId") defer log.Tracef("BlockHashByTxId exit") if s.cfg.ExternalHeaderMode { - return nil, NewExternalHeaderNotAllowedError("BlockHashByTxId") + return nil, wire.TxLoc{}, NewExternalHeaderNotAllowedError("BlockHashByTxId") } return s.g.db.BlockHashByTxId(ctx, txId) @@ -2358,10 +2358,29 @@ func (s *Server) TxById(ctx context.Context, txId chainhash.Hash) (*wire.MsgTx, return nil, NewExternalHeaderNotAllowedError("TxById") } - blockHash, err := s.g.db.BlockHashByTxId(ctx, txId) + blockHash, loc, err := s.g.db.BlockHashByTxId(ctx, txId) if err != nil { return nil, err } + + // Fast path: TxLoc available, deserialize only this tx. + if loc.TxLen > 0 { + raw, err := s.g.db.BlockRawByHash(ctx, *blockHash) + if err != nil { + return nil, fmt.Errorf("block raw %v: %w", blockHash, err) + } + if loc.TxStart+loc.TxLen > len(raw) { + return nil, fmt.Errorf("tx loc out of range: %d+%d > %d", + loc.TxStart, loc.TxLen, len(raw)) + } + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(raw[loc.TxStart : loc.TxStart+loc.TxLen])); err != nil { + return nil, fmt.Errorf("deserialize tx at offset %d: %w", loc.TxStart, err) + } + return &msgTx, nil + } + + // Slow path: legacy entry, full block scan. block, err := s.g.db.BlockByHash(ctx, *blockHash) if err != nil { return nil, err diff --git a/service/tbc/tbc_test.go b/service/tbc/tbc_test.go index cbd47b4fe..3bf1233c1 100644 --- a/service/tbc/tbc_test.go +++ b/service/tbc/tbc_test.go @@ -182,8 +182,8 @@ func TestDbUpgradeFull(t *testing.T) { if err != nil { t.Fatal(err) } - if version != 5 { - t.Fatalf("expected version 5, got %v", version) + if version != 6 { + t.Fatalf("expected version 6, got %v", version) } // version 2 checks @@ -218,9 +218,10 @@ func TestDbUpgradeFull(t *testing.T) { txbh, err := s.g.db.BlockHeaderByTxIndex(ctx) if err != nil { - t.Fatal(err) - } - if !txbh.Hash.IsEqual(hash) { + // Expected after v6 upgrade: tx index was wiped and needs + // re-indexing. The index hash is at genesis. + t.Logf("tx index at genesis after v6 wipe (expected): %v", err) + } else if !txbh.Hash.IsEqual(hash) { t.Fatal("unexpected tx hash") } @@ -456,8 +457,8 @@ func TestDbUpgradeV4(t *testing.T) { if err != nil { t.Fatal(err) } - if version != 5 { - t.Fatalf("expected version 5, got %v", version) + if version != 6 { + t.Fatalf("expected version 6, got %v", version) } keystoneHashes := []string{ diff --git a/service/tbc/txindex.go b/service/tbc/txindex.go index 5056244bf..aecb38b49 100644 --- a/service/tbc/txindex.go +++ b/service/tbc/txindex.go @@ -69,9 +69,21 @@ func (i *txIndexer) readCacheInfo() string { return "" } func processTxs(ctx context.Context, block *btcutil.Block, direction int, txsCache map[tbcd.TxKey]*tbcd.TxValue) error { blockHash := block.Hash() txs := block.Transactions() - for _, tx := range txs { - // cache txid <-> block - txsCache[tbcd.NewTxMapping(tx.Hash(), blockHash)] = nil + + // Get tx byte locations for O(1) lookups. + locs, err := block.TxLoc() + if err != nil { + log.Errorf("block %v TxLoc: %v", blockHash, err) + } + + for idx, tx := range txs { + // cache txid <-> block with byte location + if locs != nil && idx < len(locs) { + txk, txv := tbcd.NewTxMappingWithLoc(tx.Hash(), blockHash, locs[idx]) + txsCache[txk] = &txv + } else { + txsCache[tbcd.NewTxMapping(tx.Hash(), blockHash)] = nil + } // Don't keep track of spent coinbase inputs // XXX note that after debate this is deemed to be correct. We diff --git a/service/tbc/utxoindex.go b/service/tbc/utxoindex.go index 99e691b86..0f526ab8c 100644 --- a/service/tbc/utxoindex.go +++ b/service/tbc/utxoindex.go @@ -5,6 +5,7 @@ package tbc import ( + "bytes" "context" "errors" "fmt" @@ -123,11 +124,32 @@ func txOutFromOutPoint(ctx context.Context, db tbcd.Database, op tbcd.Outpoint) txId := op.TxIdHash() txIndex := op.TxIndex() - // Find block hashes - blockHash, err := db.BlockHashByTxId(ctx, *txId) + blockHash, loc, err := db.BlockHashByTxId(ctx, *txId) if err != nil { return nil, fmt.Errorf("block by txid: %w", err) } + + // Fast path: TxLoc available, jump directly to tx bytes. + if loc.TxLen > 0 { + raw, err := db.BlockRawByHash(ctx, *blockHash) + if err != nil { + return nil, fmt.Errorf("block raw %v: %w", blockHash, err) + } + if loc.TxStart+loc.TxLen > len(raw) { + return nil, fmt.Errorf("tx loc out of range: %d+%d > %d", + loc.TxStart, loc.TxLen, len(raw)) + } + var msgTx wire.MsgTx + if err := msgTx.Deserialize(bytes.NewReader(raw[loc.TxStart : loc.TxStart+loc.TxLen])); err != nil { + return nil, fmt.Errorf("deserialize tx at offset %d: %w", loc.TxStart, err) + } + if int(txIndex) >= len(msgTx.TxOut) { + return nil, fmt.Errorf("tx index invalid: %v", op) + } + return msgTx.TxOut[txIndex], nil + } + + // Slow path: legacy entry without TxLoc, full block scan. b, err := db.BlockByHash(ctx, *blockHash) if err != nil { return nil, fmt.Errorf("block by hash: %w", err)