diff --git a/include/mori/io/backend.hpp b/include/mori/io/backend.hpp index 196da4e7f..a67122d52 100644 --- a/include/mori/io/backend.hpp +++ b/include/mori/io/backend.hpp @@ -166,6 +166,10 @@ class Backend { TransferStatus* status) = 0; virtual bool CanHandle(const MemoryDesc& local, const MemoryDesc& remote) const { return true; } + + // Returns the maximum memory region size the backend can register in a + // single ibv_reg_mr call. SIZE_MAX means no known limit. + virtual size_t GetMaxMemoryRegionSize() const { return SIZE_MAX; } }; } // namespace io diff --git a/include/mori/io/engine.hpp b/include/mori/io/engine.hpp index 18718c450..3e9c15197 100644 --- a/include/mori/io/engine.hpp +++ b/include/mori/io/engine.hpp @@ -108,6 +108,10 @@ class IOEngine { std::optional CreateSession(const MemoryDesc& local, const MemoryDesc& remote); void LoadScatterGatherModule(const std::string& hsacoPath); + // Returns the minimum max_mr_size across all RDMA backends/devices. + // SIZE_MAX means no known limit. + size_t GetMaxMemoryRegionSize() const; + private: struct RouteCacheKey { EngineKey remoteEngineKey; diff --git a/src/io/engine.cpp b/src/io/engine.cpp index 84d954f3d..7ae1f923f 100644 --- a/src/io/engine.cpp +++ b/src/io/engine.cpp @@ -473,5 +473,13 @@ bool IOEngine::PopInboundTransferStatus(EngineKey remote, TransferUniqueId id, return false; } +size_t IOEngine::GetMaxMemoryRegionSize() const { + size_t min_size = SIZE_MAX; + for (const auto& [type, be] : backends) { + min_size = std::min(min_size, be->GetMaxMemoryRegionSize()); + } + return min_size; +} + } // namespace io } // namespace mori diff --git a/src/io/rdma/backend_impl.cpp b/src/io/rdma/backend_impl.cpp index 056195027..59ca0306e 100644 --- a/src/io/rdma/backend_impl.cpp +++ b/src/io/rdma/backend_impl.cpp @@ -989,5 +989,27 @@ void RdmaBackend::InvalidateSessionsForMemory(MemoryUniqueId id) { } } +size_t RdmaBackend::GetMaxMemoryRegionSize() const { + // IONIC (Pensando/AINIC) NICs report an incorrect max_mr_size via + // ibv_query_device. Cap to 2 GB for these devices. + static constexpr size_t kIonicMaxMrSize = 2ULL * 1024 * 1024 * 1024; + + size_t min_size = SIZE_MAX; + for (const auto& [dev, port] : rdma->GetAvailDevices()) { + const auto* attr = dev->GetDeviceAttr(); + if (!attr) continue; + + size_t dev_max = static_cast(attr->orig_attr.max_mr_size); + if (attr->orig_attr.vendor_id == + static_cast(application::RdmaDeviceVendorId::Pensando)) { + dev_max = kIonicMaxMrSize; + } + if (dev_max > 0) { + min_size = std::min(min_size, dev_max); + } + } + return min_size; +} + } // namespace io } // namespace mori diff --git a/src/io/rdma/backend_impl.hpp b/src/io/rdma/backend_impl.hpp index 6cbdb14fc..3d67cf773 100644 --- a/src/io/rdma/backend_impl.hpp +++ b/src/io/rdma/backend_impl.hpp @@ -78,6 +78,7 @@ class RdmaManager { std::vector> SnapshotEndpointRuntimes(); application::RdmaDeviceContext* GetRdmaDeviceContext(int devId); + const application::ActiveDevicePortList& GetAvailDevices() const { return availDevices; } private: application::RdmaDeviceContext* GetOrCreateDeviceContext(int devId); @@ -250,6 +251,7 @@ class RdmaBackend : public Backend { bool isRead); BackendSession* CreateSession(const MemoryDesc& local, const MemoryDesc& remote); bool PopInboundTransferStatus(EngineKey remote, TransferUniqueId id, TransferStatus* status); + size_t GetMaxMemoryRegionSize() const override; private: void CreateSession(const MemoryDesc& local, const MemoryDesc& remote, RdmaBackendSession& sess); diff --git a/src/pybind/pybind_umbp.cpp b/src/pybind/pybind_umbp.cpp index 77bc2d2cc..0f32e06be 100644 --- a/src/pybind/pybind_umbp.cpp +++ b/src/pybind/pybind_umbp.cpp @@ -102,7 +102,8 @@ void RegisterMoriUmbp(py::module_& m) { .def_readwrite("io_engine_port", &UMBPDistributedConfig::io_engine_port) .def_readwrite("staging_buffer_size", &UMBPDistributedConfig::staging_buffer_size) .def_readwrite("peer_service_port", &UMBPDistributedConfig::peer_service_port) - .def_readwrite("cache_remote_fetches", &UMBPDistributedConfig::cache_remote_fetches); + .def_readwrite("cache_remote_fetches", &UMBPDistributedConfig::cache_remote_fetches) + .def_readwrite("max_mr_chunk_size", &UMBPDistributedConfig::max_mr_chunk_size); py::class_(m, "UMBPConfig") .def(py::init<>()) diff --git a/src/umbp/distributed/master/client_registry.cpp b/src/umbp/distributed/master/client_registry.cpp index 81ea2837e..b45800f5d 100644 --- a/src/umbp/distributed/master/client_registry.cpp +++ b/src/umbp/distributed/master/client_registry.cpp @@ -228,7 +228,6 @@ size_t ClientRegistry::UnregisterClient(const std::string& node_id) { // PA-3 fix: exclusive lock because we mutate last_heartbeat and tier_capacities ClientStatus ClientRegistry::Heartbeat(const std::string& node_id, const std::map& tier_capacities) { - (void)tier_capacities; std::unique_lock lock(mutex_); auto it = clients_.find(node_id); if (it == clients_.end()) { @@ -239,6 +238,11 @@ ClientStatus ClientRegistry::Heartbeat(const std::string& node_id, it->second.last_heartbeat = std::chrono::steady_clock::now(); it->second.status = ClientStatus::ALIVE; + // Update tier capacities reported by the client. + for (const auto& [tier, cap] : tier_capacities) { + it->second.tier_capacities[tier] = cap; + } + return ClientStatus::ALIVE; } diff --git a/src/umbp/distributed/pool_client.cpp b/src/umbp/distributed/pool_client.cpp index c89fcb8b3..26f2f55be 100644 --- a/src/umbp/distributed/pool_client.cpp +++ b/src/umbp/distributed/pool_client.cpp @@ -87,15 +87,44 @@ bool PoolClient::Init() { staging_mem_ = io_engine_->RegisterMemory(staging_buffer_.get(), config_.staging_buffer_size, -1, mori::io::MemoryLocationType::CPU); + // Determine effective chunk size for DRAM MR registration. + size_t device_max_mr = io_engine_->GetMaxMemoryRegionSize(); + size_t effective_chunk = config_.max_mr_chunk_size > 0 + ? std::min(config_.max_mr_chunk_size, device_max_mr) + : device_max_mr; + // Align to system page size. + size_t page_size = static_cast(sysconf(_SC_PAGE_SIZE)); + if (effective_chunk != SIZE_MAX && effective_chunk > page_size) { + effective_chunk = (effective_chunk / page_size) * page_size; + } + // If effective_chunk covers all buffers, normalize to SIZE_MAX (no chunking). + size_t max_buffer_size = 0; for (const auto& dram : config_.dram_buffers) { - if (dram.buffer && dram.size > 0) { - auto mem = io_engine_->RegisterMemory(dram.buffer, dram.size, -1, + max_buffer_size = std::max(max_buffer_size, dram.size); + } + if (effective_chunk >= max_buffer_size) { + effective_chunk = SIZE_MAX; + } + dram_chunk_size_ = effective_chunk; + + if (dram_chunk_size_ != SIZE_MAX) { + MORI_UMBP_INFO("[PoolClient] DRAM MR chunk size: {} bytes (device_max={}, config={})", + dram_chunk_size_, device_max_mr, config_.max_mr_chunk_size); + } + + // Register DRAM buffers, splitting into chunks if needed. + for (const auto& dram : config_.dram_buffers) { + if (!dram.buffer || dram.size == 0) continue; + size_t chunk = (dram_chunk_size_ != SIZE_MAX) ? dram_chunk_size_ : dram.size; + for (size_t off = 0; off < dram.size; off += chunk) { + size_t sz = std::min(chunk, dram.size - off); + auto mem = io_engine_->RegisterMemory(static_cast(dram.buffer) + off, sz, -1, mori::io::MemoryLocationType::CPU); export_dram_mems_.push_back(mem); } } - MORI_UMBP_INFO("[PoolClient] IOEngine initialized on {}:{} ({} DRAM buffers)", + MORI_UMBP_INFO("[PoolClient] IOEngine initialized on {}:{} ({} DRAM MR chunks)", config_.io_engine_host, config_.io_engine_port, export_dram_mems_.size()); } @@ -112,7 +141,7 @@ bool PoolClient::Init() { msgpack::sbuffer mbuf; msgpack::pack(mbuf, export_dram_mems_[i]); dram_memory_desc_bytes_list.emplace_back(mbuf.data(), mbuf.data() + mbuf.size()); - dram_buffer_sizes.push_back(config_.dram_buffers[i].size); + dram_buffer_sizes.push_back(export_dram_mems_[i].size); } } @@ -214,20 +243,32 @@ bool PoolClient::RegisterMemory(void* ptr, size_t size) { MORI_UMBP_ERROR("[PoolClient] RegisterMemory: IOEngine not available"); return false; } - auto mem_desc = io_engine_->RegisterMemory(ptr, size, -1, mori::io::MemoryLocationType::CPU); + // Split into chunks matching dram_chunk_size_ to stay within MR limits. + size_t chunk = (dram_chunk_size_ != 0 && dram_chunk_size_ != SIZE_MAX) ? dram_chunk_size_ : size; std::lock_guard lock(registered_mem_mutex_); - registered_regions_.push_back({ptr, size, mem_desc}); - MORI_UMBP_INFO("[PoolClient] RegisterMemory: ptr={}, size={}", ptr, size); + size_t num_chunks = 0; + for (size_t off = 0; off < size; off += chunk) { + size_t sz = std::min(chunk, size - off); + auto mem_desc = io_engine_->RegisterMemory(static_cast(ptr) + off, sz, -1, + mori::io::MemoryLocationType::CPU); + registered_regions_.push_back({static_cast(ptr) + off, sz, mem_desc, ptr}); + ++num_chunks; + } + MORI_UMBP_INFO("[PoolClient] RegisterMemory: ptr={}, size={}, chunks={}", ptr, size, num_chunks); return true; } void PoolClient::DeregisterMemory(void* ptr) { std::lock_guard lock(registered_mem_mutex_); - auto it = std::find_if(registered_regions_.begin(), registered_regions_.end(), - [ptr](const RegisteredRegion& r) { return r.base == ptr; }); - if (it != registered_regions_.end()) { - if (io_engine_) io_engine_->DeregisterMemory(it->mem_desc); - registered_regions_.erase(it); + // Remove all chunk entries belonging to the same original RegisterMemory() call. + auto it = registered_regions_.begin(); + while (it != registered_regions_.end()) { + if (it->group_base == ptr) { + if (io_engine_) io_engine_->DeregisterMemory(it->mem_desc); + it = registered_regions_.erase(it); + } else { + ++it; + } } } @@ -491,13 +532,19 @@ PoolClient::PeerConnection& PoolClient::GetOrConnectPeer( std::lock_guard lock(peers_mutex_); auto it = peers_.find(node_id); if (it != peers_.end()) { - // Ensure dram_memories vector has the requested index populated + // Ensure dram_memories vector has the requested index populated. + // Always fill the slot when we have desc bytes, even if the vector + // was previously resized past this index by an out-of-order arrival. auto& peer = *it->second; - if (buffer_index >= peer.dram_memories.size() && !dram_memory_desc_bytes.empty()) { - peer.dram_memories.resize(buffer_index + 1); - auto handle = msgpack::unpack(reinterpret_cast(dram_memory_desc_bytes.data()), - dram_memory_desc_bytes.size()); - peer.dram_memories[buffer_index] = handle.get().as(); + if (!dram_memory_desc_bytes.empty()) { + if (buffer_index >= peer.dram_memories.size()) { + peer.dram_memories.resize(buffer_index + 1); + } + if (!IsValidMemoryDesc(peer.dram_memories[buffer_index])) { + auto handle = msgpack::unpack(reinterpret_cast(dram_memory_desc_bytes.data()), + dram_memory_desc_bytes.size()); + peer.dram_memories[buffer_index] = handle.get().as(); + } } return peer; } diff --git a/src/umbp/include/umbp/common/config.h b/src/umbp/include/umbp/common/config.h index 0e30071ce..d8ed84b48 100644 --- a/src/umbp/include/umbp/common/config.h +++ b/src/umbp/include/umbp/common/config.h @@ -115,6 +115,12 @@ struct UMBPDistributedConfig { uint16_t peer_service_port = 0; // gRPC peer service port bool cache_remote_fetches = true; // cache remotely-fetched blocks locally + + // Maximum single MR size for RDMA memory registration (bytes). + // 0 (default) = auto-detect from ibv_device_attr.max_mr_size. + // Set explicitly when auto-detection is unavailable or for testing. + // Env: UMBP_MAX_MR_CHUNK_SIZE + size_t max_mr_chunk_size = 0; }; struct UMBPConfig { @@ -365,6 +371,9 @@ struct PoolClientConfig { std::map tier_capacities; uint16_t peer_service_port = 0; + + // Passed from UMBPDistributedConfig::max_mr_chunk_size. + size_t max_mr_chunk_size = 0; }; } // namespace mori::umbp diff --git a/src/umbp/include/umbp/distributed/pool_client.h b/src/umbp/include/umbp/distributed/pool_client.h index 0a0400a6c..942374cf4 100644 --- a/src/umbp/include/umbp/distributed/pool_client.h +++ b/src/umbp/include/umbp/distributed/pool_client.h @@ -52,6 +52,10 @@ class PoolClient { const std::string& NodeId() const { return config_.master_config.node_id; } + // Returns the effective chunk size used for DRAM MR registration. + // SIZE_MAX means no chunking was applied. + size_t DramChunkSize() const { return dram_chunk_size_; } + bool RegisterMemory(void* ptr, size_t size); void DeregisterMemory(void* ptr); @@ -150,9 +154,10 @@ class PoolClient { // Zero-copy registered memory regions struct RegisteredRegion { - void* base; - size_t size; + void* base; // this chunk's actual start address + size_t size; // this chunk's size mori::io::MemoryDesc mem_desc; + void* group_base; // original RegisterMemory() caller's base pointer }; std::mutex registered_mem_mutex_; std::vector registered_regions_; @@ -162,6 +167,9 @@ class PoolClient { mutable std::mutex cache_mutex_; std::unordered_map cluster_locations_; + + // Effective MR chunk size for DRAM registration. SIZE_MAX = no chunking. + size_t dram_chunk_size_ = SIZE_MAX; }; } // namespace mori::umbp diff --git a/src/umbp/include/umbp/local/storage/dram_tier.h b/src/umbp/include/umbp/local/storage/dram_tier.h index c95ca2152..80b438791 100644 --- a/src/umbp/include/umbp/local/storage/dram_tier.h +++ b/src/umbp/include/umbp/local/storage/dram_tier.h @@ -31,13 +31,23 @@ #include #include +#include "umbp/common/config.h" #include "umbp/local/storage/tier_backend.h" namespace mori::umbp { -// DRAM Tier: mmap pre-allocated large memory block with offset allocator +// DRAM Tier: mmap pre-allocated large memory block with chunk-aware offset allocator. +// +// The arena is divided into fixed-size logical chunks, each of which maps 1:1 +// to one RDMA memory region, one master-side allocator, and one published +// location_id. Every block is placed entirely within one chunk. class DRAMTier : public TierBackend { public: + struct ChunkLocation { + uint32_t chunk_index = 0; + size_t offset = 0; + }; + DRAMTier(size_t capacity, bool use_shm = false, const std::string& shm_name = "/umbp_dram"); ~DRAMTier() override; @@ -68,12 +78,38 @@ class DRAMTier : public TierBackend { // the returned pointer across Evict/Write calls. const void* ReadPtr(const std::string& key, size_t* out_size) override; - // Accessors for distributed integration (Phase 2). // Returns the mmap'd base address for RDMA registration. void* GetBasePtr() const { return base_ptr_; } - // Returns the byte offset of a key's slot, or nullopt if not found. + + // Returns the byte offset of a key's slot (global, across all chunks). std::optional GetSlotOffset(const std::string& key) const; + // Re-slice the DRAM arena into fixed-size chunks for RDMA MR registration. + // The constructor already initializes a single full-arena chunk, so this + // method only needs to be called when the deployment requires smaller + // chunks (e.g. AINIC/Pensando 2 GB MR limit). When chunk_size >= capacity + // (including SIZE_MAX or 0) the result is a single chunk. + // + // Chunk layout is a startup-time decision: ConfigureChunks() may only be + // called before the layout is sealed. The layout is sealed either by + // SealChunkLayout() (called after distributed initialization registers + // MRs with the master) or by the first successful Write(). Once sealed + // the layout is permanently locked — Clear() does not re-enable + // reconfiguration. + void ConfigureChunks(size_t chunk_size); + + // Permanently lock the current chunk layout. Called by UMBPClient after + // distributed initialization so that the local layout cannot diverge + // from the already-registered MRs and master's buffer_index mapping. + // Also called implicitly by the first successful Write(). + void SealChunkLayout(); + + // Return one ExportableDram per configured chunk. + std::vector GetExportableChunks() const; + + // Return the chunk-local location of a key's slot. + std::optional GetSlotChunkLocation(const std::string& key) const; + private: void* base_ptr_; // mmap base address size_t capacity_; @@ -82,10 +118,11 @@ class DRAMTier : public TierBackend { bool use_shm_; std::string shm_name_; - // Simple offset allocator: key -> (offset, size) + // Chunk-aware slot metadata: key -> (chunk_index, offset_within_chunk, size) struct SlotInfo { - size_t offset; - size_t size; + uint32_t chunk_index = 0; + size_t offset = 0; + size_t size = 0; }; std::unordered_map slots_; @@ -98,14 +135,27 @@ class DRAMTier : public TierBackend { size_t offset; size_t size; }; - std::list free_list_; + + // Chunk layout + struct DramChunk { + void* base = nullptr; + size_t size = 0; + std::list free_list; + }; + std::vector chunks_; + bool chunks_configured_ = false; + bool chunks_sealed_ = false; // set by SealChunkLayout() or first Write; never cleared mutable std::mutex mu_; - size_t Allocate(size_t size); // Allocate from free_list_ - void Deallocate(size_t offset, size_t size); // Return to free_list_ - void EvictLRU(); // Evict least recently used - void TouchLRU(const std::string& key); // Update LRU position + // Allocate from chunk free lists. Returns {chunk_index, offset} or nullopt. + std::optional> Allocate(size_t size); + + // Return space to the chunk-local free list with coalescing. + void Deallocate(uint32_t chunk_index, size_t offset, size_t size); + + void EvictLRU(); + void TouchLRU(const std::string& key); }; } // namespace mori::umbp diff --git a/src/umbp/include/umbp/local/storage/local_storage_manager.h b/src/umbp/include/umbp/local/storage/local_storage_manager.h index 3a7be2d4b..1b5f918e3 100644 --- a/src/umbp/include/umbp/local/storage/local_storage_manager.h +++ b/src/umbp/include/umbp/local/storage/local_storage_manager.h @@ -107,6 +107,12 @@ class LocalStorageManager { return dynamic_cast(GetTier(tier)); } + // Serialize a key's location for publication to the Master. + // DRAM: "chunk_index:offset" SSD: "0:" + // This is the single serialization point for published location strings. + std::optional BuildTierLocationInfo(TierBackend* tier, const std::string& key, + size_t size); + private: UMBPConfig config_; UMBPRole role_; @@ -160,8 +166,6 @@ class LocalStorageManager { bool DemoteLRUForSpace(TierBackend* tier); bool InsertReadCacheNoWriteback(const std::string& key); void UpsertIndexTier(const std::string& key, StorageTier tier, size_t size_hint); - static std::optional BuildTierLocationInfo(TierBackend* tier, - const std::string& key, size_t size); void MaybeAutoPromote(const std::string& key); diff --git a/src/umbp/include/umbp/local/storage/tier_backend.h b/src/umbp/include/umbp/local/storage/tier_backend.h index 3ecca87bf..cca2a5c82 100644 --- a/src/umbp/include/umbp/local/storage/tier_backend.h +++ b/src/umbp/include/umbp/local/storage/tier_backend.h @@ -114,8 +114,10 @@ class TierBackend { // SSDTier: O_DIRECT; SpdkProxyTier: skip ring buffer cache. virtual void SetColdRead(bool /*enable*/) {} - // Return an opaque location identifier for a previously written key. - // Callers prepend the store index before publishing to the Master. + // Return a tier-internal location identifier for a previously written key. + // This is NOT directly publishable to the Master — use + // LocalStorageManager::BuildTierLocationInfo() for that, which handles + // tier-specific formatting (e.g. DRAM chunk_index:offset). virtual std::optional GetLocationId(const std::string& key) const; // Which StorageTier does this backend represent? diff --git a/src/umbp/local/storage/dram_tier.cpp b/src/umbp/local/storage/dram_tier.cpp index 2aa57f5f9..c88c545e7 100644 --- a/src/umbp/local/storage/dram_tier.cpp +++ b/src/umbp/local/storage/dram_tier.cpp @@ -63,8 +63,14 @@ DRAMTier::DRAMTier(size_t capacity, bool use_shm, const std::string& shm_name) throw std::runtime_error("mmap failed: " + std::string(strerror(errno))); } - // Initialize free list with entire capacity - free_list_.push_back({0, capacity_}); + // Default to a single full-arena chunk. Distributed mode may call + // ConfigureChunks() later to re-slice for RDMA MR size limits. + DramChunk chunk; + chunk.base = base_ptr_; + chunk.size = capacity_; + chunk.free_list.push_back({0, capacity_}); + chunks_.push_back(std::move(chunk)); + chunks_configured_ = true; } DRAMTier::~DRAMTier() { @@ -77,45 +83,112 @@ DRAMTier::~DRAMTier() { } } -size_t DRAMTier::Allocate(size_t size) { - // First-fit allocation - for (auto it = free_list_.begin(); it != free_list_.end(); ++it) { - if (it->size >= size) { +void DRAMTier::ConfigureChunks(size_t chunk_size) { + std::lock_guard lock(mu_); + if (chunks_sealed_) { + throw std::runtime_error( + "DRAMTier::ConfigureChunks: chunk layout has been sealed and cannot be reconfigured"); + } + + chunks_.clear(); + + // chunk_size == 0, SIZE_MAX, or >= capacity_ → single full-arena chunk. + if (chunk_size == 0 || chunk_size >= capacity_) { + DramChunk chunk; + chunk.base = base_ptr_; + chunk.size = capacity_; + chunk.free_list.push_back({0, capacity_}); + chunks_.push_back(std::move(chunk)); + } else { + for (size_t off = 0; off < capacity_; off += chunk_size) { + size_t sz = std::min(chunk_size, capacity_ - off); + DramChunk chunk; + chunk.base = static_cast(base_ptr_) + off; + chunk.size = sz; + chunk.free_list.push_back({0, sz}); + chunks_.push_back(std::move(chunk)); + } + } + + chunks_configured_ = true; +} + +void DRAMTier::SealChunkLayout() { + std::lock_guard lock(mu_); + chunks_sealed_ = true; +} + +std::vector DRAMTier::GetExportableChunks() const { + std::lock_guard lock(mu_); + std::vector result; + result.reserve(chunks_.size()); + for (const auto& chunk : chunks_) { + result.push_back({chunk.base, chunk.size}); + } + return result; +} + +std::optional DRAMTier::GetSlotChunkLocation( + const std::string& key) const { + std::lock_guard lock(mu_); + auto it = slots_.find(key); + if (it == slots_.end()) return std::nullopt; + return ChunkLocation{it->second.chunk_index, it->second.offset}; +} + +std::optional> DRAMTier::Allocate(size_t size) { + if (!chunks_configured_) { + return std::nullopt; + } + + // Iterate chunks in ascending index order (first-fit across chunks). + for (uint32_t ci = 0; ci < static_cast(chunks_.size()); ++ci) { + auto& chunk = chunks_[ci]; + + // Fast reject: block cannot exceed this chunk's size. + if (size > chunk.size) continue; + + // First-fit within this chunk's free list. + for (auto it = chunk.free_list.begin(); it != chunk.free_list.end(); ++it) { + if (it->size < size) continue; + size_t offset = it->offset; if (it->size == size) { - free_list_.erase(it); + chunk.free_list.erase(it); } else { it->offset += size; it->size -= size; } - return offset; + return std::make_pair(ci, offset); } } - return static_cast(-1); // Allocation failed + return std::nullopt; } -void DRAMTier::Deallocate(size_t offset, size_t size) { - // Insert into sorted position and coalesce adjacent blocks - auto it = free_list_.begin(); - while (it != free_list_.end() && it->offset < offset) { +void DRAMTier::Deallocate(uint32_t chunk_index, size_t offset, size_t size) { + auto& free_list = chunks_[chunk_index].free_list; + + // Insert into sorted position and coalesce adjacent blocks. + auto it = free_list.begin(); + while (it != free_list.end() && it->offset < offset) { ++it; } - auto new_it = free_list_.insert(it, {offset, size}); + auto new_it = free_list.insert(it, {offset, size}); // Coalesce with next block auto next = std::next(new_it); - if (next != free_list_.end() && new_it->offset + new_it->size == next->offset) { + if (next != free_list.end() && new_it->offset + new_it->size == next->offset) { new_it->size += next->size; - free_list_.erase(next); + free_list.erase(next); } // Coalesce with previous block - if (new_it != free_list_.begin()) { + if (new_it != free_list.begin()) { auto prev = std::prev(new_it); if (prev->offset + prev->size == new_it->offset) { prev->size += new_it->size; - free_list_.erase(new_it); + free_list.erase(new_it); } } } @@ -135,7 +208,7 @@ void DRAMTier::EvictLRU() { const std::string& victim = lru_list_.back(); auto slot_it = slots_.find(victim); if (slot_it != slots_.end()) { - Deallocate(slot_it->second.offset, slot_it->second.size); + Deallocate(slot_it->second.chunk_index, slot_it->second.offset, slot_it->second.size); used_ -= slot_it->second.size; slots_.erase(slot_it); } @@ -149,7 +222,7 @@ bool DRAMTier::Write(const std::string& key, const void* data, size_t size) { // If key already exists, free its old slot first auto existing = slots_.find(key); if (existing != slots_.end()) { - Deallocate(existing->second.offset, existing->second.size); + Deallocate(existing->second.chunk_index, existing->second.offset, existing->second.size); used_ -= existing->second.size; slots_.erase(existing); auto lru_it = lru_map_.find(key); @@ -161,13 +234,15 @@ bool DRAMTier::Write(const std::string& key, const void* data, size_t size) { // Try to allocate — do NOT self-evict. // If no space, return false so upper layer can demote keys to SSD. - size_t offset = Allocate(size); - if (offset == static_cast(-1)) { + auto alloc = Allocate(size); + if (!alloc) { return false; } - std::memcpy(static_cast(base_ptr_) + offset, data, size); - slots_[key] = {offset, size}; + chunks_sealed_ = true; + auto [chunk_index, offset] = *alloc; + std::memcpy(static_cast(chunks_[chunk_index].base) + offset, data, size); + slots_[key] = {chunk_index, offset, size}; used_ += size; TouchLRU(key); return true; @@ -184,8 +259,9 @@ bool DRAMTier::ReadIntoPtr(const std::string& key, uintptr_t dst_ptr, size_t siz // would produce a partially-filled KV block with no error signal. if (size != it->second.size) return false; - std::memcpy(reinterpret_cast(dst_ptr), static_cast(base_ptr_) + it->second.offset, - size); + const auto& slot = it->second; + std::memcpy(reinterpret_cast(dst_ptr), + static_cast(chunks_[slot.chunk_index].base) + slot.offset, size); TouchLRU(key); return true; } @@ -196,9 +272,10 @@ const void* DRAMTier::ReadPtr(const std::string& key, size_t* out_size) { auto it = slots_.find(key); if (it == slots_.end()) return nullptr; - if (out_size) *out_size = it->second.size; + const auto& slot = it->second; + if (out_size) *out_size = slot.size; TouchLRU(key); - return static_cast(base_ptr_) + it->second.offset; + return static_cast(chunks_[slot.chunk_index].base) + slot.offset; } std::vector DRAMTier::Read(const std::string& key) { @@ -207,9 +284,10 @@ std::vector DRAMTier::Read(const std::string& key) { auto it = slots_.find(key); if (it == slots_.end()) return {}; - size_t sz = it->second.size; - std::vector buf(sz); - std::memcpy(buf.data(), static_cast(base_ptr_) + it->second.offset, sz); + const auto& slot = it->second; + std::vector buf(slot.size); + std::memcpy(buf.data(), static_cast(chunks_[slot.chunk_index].base) + slot.offset, + slot.size); TouchLRU(key); return buf; } @@ -231,7 +309,7 @@ bool DRAMTier::Evict(const std::string& key) { auto it = slots_.find(key); if (it == slots_.end()) return false; - Deallocate(it->second.offset, it->second.size); + Deallocate(it->second.chunk_index, it->second.offset, it->second.size); used_ -= it->second.size; slots_.erase(it); @@ -253,9 +331,16 @@ void DRAMTier::Clear() { slots_.clear(); lru_list_.clear(); lru_map_.clear(); - free_list_.clear(); - free_list_.push_back({0, capacity_}); used_ = 0; + + if (chunks_configured_) { + // Re-initialize each chunk's free list to cover its full size. + for (auto& chunk : chunks_) { + chunk.free_list.clear(); + chunk.free_list.push_back({0, chunk.size}); + } + } + // If chunks were never configured, the next Write() will auto-configure. } std::vector DRAMTier::GetLRUCandidates(size_t max_candidates) const { @@ -263,7 +348,6 @@ std::vector DRAMTier::GetLRUCandidates(size_t max_candidates) const std::lock_guard lock(mu_); std::vector result; result.reserve(std::min(max_candidates, lru_list_.size())); - // Walk from the back (LRU end) up to max_candidates entries. auto it = lru_list_.rbegin(); for (size_t i = 0; i < max_candidates && it != lru_list_.rend(); ++i, ++it) { result.push_back(*it); @@ -281,7 +365,11 @@ std::optional DRAMTier::GetSlotOffset(const std::string& key) const { std::lock_guard lock(mu_); auto it = slots_.find(key); if (it == slots_.end()) return std::nullopt; - return it->second.offset; + const auto& slot = it->second; + // Reconstruct global offset from chunk base pointer. + return static_cast(static_cast(chunks_[slot.chunk_index].base) - + static_cast(base_ptr_)) + + slot.offset; } std::optional DRAMTier::GetLocationId(const std::string& key) const { diff --git a/src/umbp/local/storage/local_storage_manager.cpp b/src/umbp/local/storage/local_storage_manager.cpp index cf2b18671..a778214ff 100644 --- a/src/umbp/local/storage/local_storage_manager.cpp +++ b/src/umbp/local/storage/local_storage_manager.cpp @@ -561,14 +561,26 @@ std::optional LocalStorageManager::BuildT return std::nullopt; } + TierLocationInfo info; + info.size = size; + + // DRAM: read structured chunk location directly from DRAMTier. + if (tier->tier_id() == StorageTier::CPU_DRAM) { + auto* dram = dynamic_cast(tier); + if (dram) { + auto loc = dram->GetSlotChunkLocation(key); + if (!loc) return std::nullopt; + info.location_id = std::to_string(loc->chunk_index) + ":" + std::to_string(loc->offset); + return info; + } + } + + // Non-DRAM tiers: single-buffer "0:" format. auto raw_id = tier->GetLocationId(key); if (!raw_id.has_value()) { return std::nullopt; } - - TierLocationInfo info; info.location_id = "0:" + *raw_id; - info.size = size; return info; } diff --git a/src/umbp/local/umbp_client.cpp b/src/umbp/local/umbp_client.cpp index e99917007..e386e103e 100644 --- a/src/umbp/local/umbp_client.cpp +++ b/src/umbp/local/umbp_client.cpp @@ -21,6 +21,7 @@ // SOFTWARE. #include "umbp/local/umbp_client.h" +#include #include #include @@ -61,6 +62,13 @@ UMBPClient::UMBPClient(const UMBPConfig& config) pc_config.io_engine_port = dist.io_engine_port; pc_config.staging_buffer_size = dist.staging_buffer_size; pc_config.peer_service_port = dist.peer_service_port; + pc_config.max_mr_chunk_size = dist.max_mr_chunk_size; + // Allow env var override (distributed config is set programmatically, + // not via FromEnvironment(), so we read the env var here). + const char* env_chunk = std::getenv("UMBP_MAX_MR_CHUNK_SIZE"); + if (env_chunk) { + pc_config.max_mr_chunk_size = static_cast(std::stoull(env_chunk)); + } // Export DramTier buffer so PoolClient registers it with the master at // Init() time, enabling remote RDMA reads into this node's DRAM. @@ -92,10 +100,23 @@ UMBPClient::UMBPClient(const UMBPConfig& config) // Register DRAM for local zero-copy RDMA and install tier-change callback. if (pool_client_) { + // Configure DRAMTier chunk layout from the effective MR chunk size + // computed by PoolClient. This makes the allocator chunk-aware so + // every block fits entirely within one chunk / one RDMA MR. + auto* dram_for_chunks = storage_.GetTierAs(StorageTier::CPU_DRAM); + if (dram_for_chunks) { + dram_for_chunks->ConfigureChunks(pool_client_->DramChunkSize()); + } + auto* dram = storage_.GetTierAs(StorageTier::CPU_DRAM); if (dram) { auto [used, total] = storage_.Capacity(StorageTier::CPU_DRAM); pool_client_->RegisterMemory(dram->GetBasePtr(), total); + // Seal the chunk layout now that MRs are registered and the master + // has recorded the buffer_index mapping. Any later ConfigureChunks() + // call (e.g. via the public Storage()/GetTierAs() accessors) would + // desync local placement from the RDMA/master view. + dram->SealChunkLayout(); } if (config_.distributed->peer_service_port > 0 && config_.ssd.enabled && @@ -148,12 +169,12 @@ UMBPClient::~UMBPClient() { void UMBPClient::MaybePublishLocal(const std::string& key, size_t size) { if (!pool_client_) return; - auto* dram = storage_.GetTierAs(StorageTier::CPU_DRAM); + auto* dram = storage_.GetTier(StorageTier::CPU_DRAM); if (!dram) return; - auto offset = dram->GetSlotOffset(key); - if (!offset) return; - std::string location_id = "0:" + std::to_string(*offset); - pool_client_->PublishLocalBlock(key, size, location_id, TierType::DRAM); + auto info = storage_.BuildTierLocationInfo(dram, key, size); + if (!info) return; + + pool_client_->PublishLocalBlock(key, size, info->location_id, TierType::DRAM); } bool UMBPClient::Put(const std::string& key, const void* data, size_t size) { @@ -431,6 +452,11 @@ size_t UMBPClient::BatchExistsConsecutive(const std::vector& keys) } void UMBPClient::Clear() { + if (pool_client_) { + throw std::runtime_error( + "UMBPClient::Clear: not supported in distributed mode — " + "published locations cannot be bulk-unregistered from the master"); + } index_.Clear(); storage_.Clear(); } diff --git a/tests/cpp/umbp/local/CMakeLists.txt b/tests/cpp/umbp/local/CMakeLists.txt index d8b6acb1a..0cce647dc 100644 --- a/tests/cpp/umbp/local/CMakeLists.txt +++ b/tests/cpp/umbp/local/CMakeLists.txt @@ -1,4 +1,8 @@ # C++ unit tests +add_executable(test_dram_tier test_dram_tier.cpp) +target_link_libraries(test_dram_tier PRIVATE umbp_core) +add_test(NAME test_dram_tier COMMAND test_dram_tier) + add_executable(test_umbp_local_block_index test_local_block_index.cpp) target_link_libraries(test_umbp_local_block_index PRIVATE umbp_core) add_test(NAME umbp_local_block_index COMMAND test_umbp_local_block_index) diff --git a/tests/cpp/umbp/local/test_dram_tier.cpp b/tests/cpp/umbp/local/test_dram_tier.cpp new file mode 100644 index 000000000..f398b58bd --- /dev/null +++ b/tests/cpp/umbp/local/test_dram_tier.cpp @@ -0,0 +1,574 @@ +// Copyright © Advanced Micro Devices, Inc. All rights reserved. +// +// MIT License +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +#include +#include +#include +#include +#include +#include + +#include "umbp/local/storage/dram_tier.h" +#include "umbp/local/storage/local_storage_manager.h" + +using namespace mori::umbp; + +void test_default_single_chunk() { + std::cout << "test_default_single_chunk... "; + + // Constructor initializes a single full-arena chunk — no ConfigureChunks needed. + DRAMTier tier(4096); + + std::vector data(1024, 'A'); + assert(tier.Write("k1", data.data(), data.size())); + assert(tier.Exists("k1")); + + auto loc = tier.GetSlotChunkLocation("k1"); + assert(loc.has_value()); + assert(loc->chunk_index == 0); + assert(loc->offset == 0); + + auto chunks = tier.GetExportableChunks(); + assert(chunks.size() == 1); + assert(chunks[0].size == 4096); + + std::cout << "PASSED" << std::endl; +} + +void test_configure_single_chunk() { + std::cout << "test_configure_single_chunk... "; + + DRAMTier tier(4096); + tier.ConfigureChunks(SIZE_MAX); + + std::vector data(1024, 'B'); + assert(tier.Write("k1", data.data(), data.size())); + + auto loc = tier.GetSlotChunkLocation("k1"); + assert(loc.has_value()); + assert(loc->chunk_index == 0); + assert(loc->offset == 0); + + auto chunks = tier.GetExportableChunks(); + assert(chunks.size() == 1); + assert(chunks[0].size == 4096); + + // Also test with chunk_size == 0 (should behave the same). + DRAMTier tier2(4096); + tier2.ConfigureChunks(0); + auto chunks2 = tier2.GetExportableChunks(); + assert(chunks2.size() == 1); + assert(chunks2[0].size == 4096); + + std::cout << "PASSED" << std::endl; +} + +void test_configure_multi_chunk() { + std::cout << "test_configure_multi_chunk... "; + + // 10 KB arena, 4 KB chunks → 3 chunks (4KB + 4KB + 2KB) + const size_t arena = 10 * 1024; + const size_t chunk_size = 4 * 1024; + DRAMTier tier(arena); + tier.ConfigureChunks(chunk_size); + + auto chunks = tier.GetExportableChunks(); + assert(chunks.size() == 3); + assert(chunks[0].size == 4096); + assert(chunks[1].size == 4096); + assert(chunks[2].size == 2048); + + std::cout << "PASSED" << std::endl; +} + +void test_first_fit_across_chunks() { + std::cout << "test_first_fit_across_chunks... "; + + // 8 KB arena, 4 KB chunks → 2 chunks + const size_t arena = 8 * 1024; + const size_t chunk_size = 4 * 1024; + DRAMTier tier(arena); + tier.ConfigureChunks(chunk_size); + + // Fill chunk 0 completely + std::vector d1(4096, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + auto loc1 = tier.GetSlotChunkLocation("k1"); + assert(loc1->chunk_index == 0); + + // Next allocation should go to chunk 1 + std::vector d2(1024, 'B'); + assert(tier.Write("k2", d2.data(), d2.size())); + auto loc2 = tier.GetSlotChunkLocation("k2"); + assert(loc2->chunk_index == 1); + assert(loc2->offset == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_no_cross_chunk_block() { + std::cout << "test_no_cross_chunk_block... "; + + // 8 KB arena, 4 KB chunks + const size_t arena = 8 * 1024; + const size_t chunk_size = 4 * 1024; + DRAMTier tier(arena); + tier.ConfigureChunks(chunk_size); + + // Write a 3 KB block into chunk 0 + std::vector d1(3072, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + auto loc1 = tier.GetSlotChunkLocation("k1"); + assert(loc1->chunk_index == 0); + assert(loc1->offset == 0); + + // 1 KB remaining in chunk 0. Write a 2 KB block — must go to chunk 1. + std::vector d2(2048, 'B'); + assert(tier.Write("k2", d2.data(), d2.size())); + auto loc2 = tier.GetSlotChunkLocation("k2"); + assert(loc2->chunk_index == 1); + + // Verify data integrity + std::vector buf(2048, 0); + assert(tier.ReadIntoPtr("k2", reinterpret_cast(buf.data()), buf.size())); + assert(buf == d2); + + std::cout << "PASSED" << std::endl; +} + +void test_reject_oversized_block() { + std::cout << "test_reject_oversized_block... "; + + // 8 KB arena, 2 KB chunks + const size_t arena = 8 * 1024; + const size_t chunk_size = 2 * 1024; + DRAMTier tier(arena); + tier.ConfigureChunks(chunk_size); + + // A 3 KB block exceeds the 2 KB chunk size — should fail. + std::vector data(3072, 'X'); + assert(!tier.Write("big", data.data(), data.size())); + + std::cout << "PASSED" << std::endl; +} + +void test_tail_chunk_smaller() { + std::cout << "test_tail_chunk_smaller... "; + + // 10 KB arena, 4 KB chunks → chunk2 = 2 KB + const size_t arena = 10 * 1024; + const size_t chunk_size = 4 * 1024; + DRAMTier tier(arena); + tier.ConfigureChunks(chunk_size); + + // Fill chunks 0 and 1 + std::vector d1(4096, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + assert(tier.Write("k2", d1.data(), d1.size())); + + // Write 1 KB into the tail chunk (2 KB available) + std::vector d3(1024, 'C'); + assert(tier.Write("k3", d3.data(), d3.size())); + auto loc3 = tier.GetSlotChunkLocation("k3"); + assert(loc3->chunk_index == 2); + + // 3 KB block should not fit in the 2 KB tail chunk (chunks 0,1 are full) + std::vector d4(3072, 'D'); + assert(!tier.Write("k4", d4.data(), d4.size())); + + // But 1 KB more should fit in chunk 2's remaining 1 KB + std::vector d5(1024, 'E'); + assert(tier.Write("k5", d5.data(), d5.size())); + auto loc5 = tier.GetSlotChunkLocation("k5"); + assert(loc5->chunk_index == 2); + assert(loc5->offset == 1024); + + std::cout << "PASSED" << std::endl; +} + +void test_per_chunk_coalescing() { + std::cout << "test_per_chunk_coalescing... "; + + // 8 KB arena, 4 KB chunks + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + // Write two 2 KB blocks into chunk 0 + std::vector d1(2048, 'A'), d2(2048, 'B'); + assert(tier.Write("k1", d1.data(), d1.size())); + assert(tier.Write("k2", d2.data(), d2.size())); + + auto loc1 = tier.GetSlotChunkLocation("k1"); + auto loc2 = tier.GetSlotChunkLocation("k2"); + assert(loc1->chunk_index == 0); + assert(loc2->chunk_index == 0); + + // Evict both — free space should coalesce back to 4 KB + assert(tier.Evict("k1")); + assert(tier.Evict("k2")); + + // Should be able to write a full 4 KB block into chunk 0 (coalesced) + std::vector d3(4096, 'C'); + assert(tier.Write("k3", d3.data(), d3.size())); + auto loc3 = tier.GetSlotChunkLocation("k3"); + assert(loc3->chunk_index == 0); + assert(loc3->offset == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_get_slot_chunk_location() { + std::cout << "test_get_slot_chunk_location... "; + + // 8 KB arena, 4 KB chunks + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + std::vector d1(1024, 'A'), d2(2048, 'B'), d3(1024, 'C'); + assert(tier.Write("k1", d1.data(), d1.size())); + assert(tier.Write("k2", d2.data(), d2.size())); + // k1: chunk 0, offset 0 + // k2: chunk 0, offset 1024 + + auto loc1 = tier.GetSlotChunkLocation("k1"); + assert(loc1.has_value()); + assert(loc1->chunk_index == 0); + assert(loc1->offset == 0); + + auto loc2 = tier.GetSlotChunkLocation("k2"); + assert(loc2.has_value()); + assert(loc2->chunk_index == 0); + assert(loc2->offset == 1024); + + // k3 spills to chunk 1 (only 1 KB left in chunk 0, k3 is 1 KB so it fits) + assert(tier.Write("k3", d3.data(), d3.size())); + auto loc3 = tier.GetSlotChunkLocation("k3"); + assert(loc3.has_value()); + assert(loc3->chunk_index == 0); + assert(loc3->offset == 3072); + + // Non-existent key + assert(!tier.GetSlotChunkLocation("nope").has_value()); + + std::cout << "PASSED" << std::endl; +} + +void test_get_exportable_chunks() { + std::cout << "test_get_exportable_chunks... "; + + DRAMTier tier(12 * 1024); + tier.ConfigureChunks(4 * 1024); + + auto chunks = tier.GetExportableChunks(); + assert(chunks.size() == 3); + assert(chunks[0].size == 4096); + assert(chunks[1].size == 4096); + assert(chunks[2].size == 4096); + + // Verify base pointers are sequential. + char* base0 = static_cast(chunks[0].buffer); + assert(static_cast(chunks[1].buffer) == base0 + 4096); + assert(static_cast(chunks[2].buffer) == base0 + 8192); + + std::cout << "PASSED" << std::endl; +} + +void test_reconfigure_before_allocation_ok() { + std::cout << "test_reconfigure_before_allocation_ok... "; + + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(0); // single chunk + + // No allocations yet — reconfiguring should succeed. + tier.ConfigureChunks(4 * 1024); // split into 2 chunks + + auto chunks = tier.GetExportableChunks(); + assert(chunks.size() == 2); + assert(chunks[0].size == 4096); + assert(chunks[1].size == 4096); + + // Write should work with the new layout. + std::vector data(4096, 'A'); + assert(tier.Write("k1", data.data(), data.size())); + auto loc = tier.GetSlotChunkLocation("k1"); + assert(loc->chunk_index == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_reject_reconfigure_after_allocation() { + std::cout << "test_reject_reconfigure_after_allocation... "; + + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + std::vector data(1024, 'A'); + assert(tier.Write("k1", data.data(), data.size())); + + // Reconfiguring after allocation should throw. + bool threw = false; + try { + tier.ConfigureChunks(2 * 1024); + } catch (const std::runtime_error&) { + threw = true; + } + assert(threw); + + std::cout << "PASSED" << std::endl; +} + +void test_get_location_id_returns_global_offset() { + std::cout << "test_get_location_id_returns_global_offset... "; + + // 8 KB arena, 4 KB chunks + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + // Fill chunk 0 + std::vector d1(4096, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + + // Write into chunk 1 + std::vector d2(512, 'B'); + assert(tier.Write("k2", d2.data(), d2.size())); + + // GetLocationId returns the global byte offset (base class contract). + auto lid1 = tier.GetLocationId("k1"); + assert(lid1.has_value()); + assert(*lid1 == "0"); + + auto lid2 = tier.GetLocationId("k2"); + assert(lid2.has_value()); + assert(*lid2 == "4096"); + + // Non-existent key + assert(!tier.GetLocationId("nope").has_value()); + + std::cout << "PASSED" << std::endl; +} + +void test_clear_resets_chunks() { + std::cout << "test_clear_resets_chunks... "; + + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + // Fill both chunks completely + std::vector d1(4096, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + assert(tier.Write("k2", d1.data(), d1.size())); + assert(!tier.Write("k3", d1.data(), d1.size())); // should fail — full + + // Clear resets all chunk free lists + tier.Clear(); + + // Should be able to write again + assert(tier.Write("k4", d1.data(), d1.size())); + auto loc4 = tier.GetSlotChunkLocation("k4"); + assert(loc4->chunk_index == 0); + assert(loc4->offset == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_get_slot_offset_global() { + std::cout << "test_get_slot_offset_global... "; + + // 8 KB arena, 4 KB chunks + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); + + // Fill chunk 0 + std::vector d1(4096, 'A'); + assert(tier.Write("k1", d1.data(), d1.size())); + + // Write 1 KB into chunk 1 + std::vector d2(1024, 'B'); + assert(tier.Write("k2", d2.data(), d2.size())); + + // k1: global offset should be 0 + auto off1 = tier.GetSlotOffset("k1"); + assert(off1.has_value()); + assert(*off1 == 0); + + // k2: global offset should be 4096 (chunk 1, offset 0 within chunk) + auto off2 = tier.GetSlotOffset("k2"); + assert(off2.has_value()); + assert(*off2 == 4096); + + std::cout << "PASSED" << std::endl; +} + +void test_data_integrity_multi_chunk() { + std::cout << "test_data_integrity_multi_chunk... "; + + // 8 KB arena, 2 KB chunks → 4 chunks + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(2 * 1024); + + // Write distinct data to each chunk + for (int i = 0; i < 4; ++i) { + std::string key = "k" + std::to_string(i); + std::vector data(2048, 'A' + i); + assert(tier.Write(key, data.data(), data.size())); + auto loc = tier.GetSlotChunkLocation(key); + assert(loc->chunk_index == static_cast(i)); + } + + // Read back and verify + for (int i = 0; i < 4; ++i) { + std::string key = "k" + std::to_string(i); + std::vector expected(2048, 'A' + i); + std::vector buf(2048, 0); + assert(tier.ReadIntoPtr(key, reinterpret_cast(buf.data()), buf.size())); + assert(buf == expected); + } + + std::cout << "PASSED" << std::endl; +} + +void test_clear_does_not_unlock_reconfigure() { + std::cout << "test_clear_does_not_unlock_reconfigure... "; + + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); // 2 chunks + + // Allocate a block — permanently locks the layout. + std::vector data(1024, 'A'); + assert(tier.Write("k1", data.data(), data.size())); + + // Clear() empties slots but does NOT unlock reconfiguration, + // because registered MRs depend on the layout staying fixed. + tier.Clear(); + + bool threw = false; + try { + tier.ConfigureChunks(2 * 1024); + } catch (const std::runtime_error&) { + threw = true; + } + assert(threw); + + // Writing with the original layout still works after Clear(). + assert(tier.Write("k2", data.data(), data.size())); + auto loc = tier.GetSlotChunkLocation("k2"); + assert(loc.has_value()); + assert(loc->chunk_index == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_seal_locks_layout_before_write() { + std::cout << "test_seal_locks_layout_before_write... "; + + DRAMTier tier(8 * 1024); + tier.ConfigureChunks(4 * 1024); // 2 chunks + + // Seal without any writes — simulates distributed init completing. + tier.SealChunkLayout(); + + // Reconfiguration must be rejected even though no writes happened. + bool threw = false; + try { + tier.ConfigureChunks(2 * 1024); + } catch (const std::runtime_error&) { + threw = true; + } + assert(threw); + + // Writing with the sealed layout still works. + std::vector data(4096, 'A'); + assert(tier.Write("k1", data.data(), data.size())); + auto loc = tier.GetSlotChunkLocation("k1"); + assert(loc->chunk_index == 0); + + std::cout << "PASSED" << std::endl; +} + +void test_build_tier_location_info_consistency() { + std::cout << "test_build_tier_location_info_consistency... "; + + // Create a LocalStorageManager with a chunked DRAMTier (via reconfiguration). + UMBPConfig config; + config.dram.capacity_bytes = 8 * 1024; + config.ssd.enabled = false; + LocalStorageManager mgr(config); + + // Reconfigure DRAM to use 4 KB chunks (no allocations yet, so this is safe). + auto* dram = mgr.GetTierAs(StorageTier::CPU_DRAM); + assert(dram != nullptr); + dram->ConfigureChunks(4 * 1024); + + // Fill chunk 0 + std::vector d1(4096, 'A'); + assert(mgr.Write("k1", d1.data(), d1.size())); + + // Write into chunk 1 + std::vector d2(512, 'B'); + assert(mgr.Write("k2", d2.data(), d2.size())); + + // Verify BuildTierLocationInfo produces chunk-aware location_ids. + auto info1 = mgr.BuildTierLocationInfo(dram, "k1", d1.size()); + assert(info1.has_value()); + assert(info1->location_id == "0:0"); + + auto info2 = mgr.BuildTierLocationInfo(dram, "k2", d2.size()); + assert(info2.has_value()); + assert(info2->location_id == "1:0"); + + // Verify GetSlotChunkLocation agrees. + auto loc1 = dram->GetSlotChunkLocation("k1"); + assert(loc1.has_value()); + std::string expected1 = std::to_string(loc1->chunk_index) + ":" + std::to_string(loc1->offset); + assert(info1->location_id == expected1); + + auto loc2 = dram->GetSlotChunkLocation("k2"); + assert(loc2.has_value()); + std::string expected2 = std::to_string(loc2->chunk_index) + ":" + std::to_string(loc2->offset); + assert(info2->location_id == expected2); + + std::cout << "PASSED" << std::endl; +} + +int main() { + std::cout << "=== DRAMTier Chunk Tests ===" << std::endl; + test_default_single_chunk(); + test_configure_single_chunk(); + test_configure_multi_chunk(); + test_first_fit_across_chunks(); + test_no_cross_chunk_block(); + test_reject_oversized_block(); + test_tail_chunk_smaller(); + test_per_chunk_coalescing(); + test_get_slot_chunk_location(); + test_get_exportable_chunks(); + test_reconfigure_before_allocation_ok(); + test_reject_reconfigure_after_allocation(); + test_get_location_id_returns_global_offset(); + test_clear_resets_chunks(); + test_get_slot_offset_global(); + test_data_integrity_multi_chunk(); + test_clear_does_not_unlock_reconfigure(); + test_seal_locks_layout_before_write(); + test_build_tier_location_info_consistency(); + std::cout << "All DRAMTier chunk tests passed!" << std::endl; + return 0; +}