Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions include/miniocpp/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,34 @@ class ListObjectsResult {
Client* client_ = nullptr;
ListObjectsArgs args_;
bool failed_ = false;
ListObjectsResponse resp_;
std::shared_ptr<ListObjectsResponse> resp_;
std::list<Item>::iterator itr_;
std::shared_ptr<std::future<ListObjectsResponse>> prefetch_future_;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

void Populate();
void StartPrefetch();
void UpdatePaginationArgs();

public:
explicit ListObjectsResult(error::Error err);
ListObjectsResult(Client* const client, const ListObjectsArgs& args);
ListObjectsResult(Client* const client, ListObjectsArgs&& args);
~ListObjectsResult() = default;
ListObjectsResult(const ListObjectsResult&) = default;
ListObjectsResult& operator=(const ListObjectsResult&) = default;
ListObjectsResult(ListObjectsResult&&) = default;
ListObjectsResult& operator=(ListObjectsResult&&) = default;

Item& operator*() const { return *itr_; }
explicit operator bool() const { return itr_ != resp_.contents.end(); }
explicit operator bool() {
if (prefetch_future_ && (!resp_ || resp_->contents.empty())) Populate();
return itr_ != resp_->contents.end();
}
explicit operator bool() const { return itr_ != resp_->contents.end(); }

ListObjectsResult& operator++() {
itr_++;
if (!failed_ && itr_ == resp_.contents.end() && resp_.is_truncated) {
if (!failed_ && itr_ == resp_->contents.end() && resp_->is_truncated) {
Populate();
}
return *this;
Expand Down
4 changes: 4 additions & 0 deletions include/miniocpp/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ struct ListObjectsResponse : public Response {
std::string_view next_version_id_marker;

ListObjectsResponse() = default;
ListObjectsResponse(const ListObjectsResponse&) = delete;
ListObjectsResponse& operator=(const ListObjectsResponse&) = delete;
ListObjectsResponse(ListObjectsResponse&&) = default;
ListObjectsResponse& operator=(ListObjectsResponse&&) = default;

explicit ListObjectsResponse(error::Error err) : Response(std::move(err)) {}

Expand Down
107 changes: 74 additions & 33 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,60 +134,101 @@ struct ScopedRDMARegistration {
} // namespace

ListObjectsResult::ListObjectsResult(error::Error err) : failed_(true) {
this->resp_.contents.push_back(Item(std::move(err)));
this->itr_ = resp_.contents.begin();
resp_ = std::make_shared<ListObjectsResponse>();
resp_->contents.push_back(Item(std::move(err)));
itr_ = resp_->contents.begin();
}

ListObjectsResult::ListObjectsResult(Client* const client,
const ListObjectsArgs& args)
: client_(client), args_(args) {
Populate();
resp_ = std::make_shared<ListObjectsResponse>();
StartPrefetch();
}

ListObjectsResult::ListObjectsResult(Client* const client,
ListObjectsArgs&& args)
: client_(client), args_(std::move(args)) {
Populate();
resp_ = std::make_shared<ListObjectsResponse>();
StartPrefetch();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

void ListObjectsResult::Populate() {
void ListObjectsResult::UpdatePaginationArgs() {
if (args_.include_versions) {
args_.key_marker = resp_.next_key_marker;
args_.version_id_marker = resp_.next_version_id_marker;
args_.key_marker = resp_->next_key_marker;
args_.version_id_marker = resp_->next_version_id_marker;
} else if (args_.use_api_v1) {
args_.marker = resp_.next_marker;
args_.marker = resp_->next_marker;
} else {
args_.start_after = resp_.start_after;
args_.continuation_token = resp_.next_continuation_token;
args_.start_after = resp_->start_after;
args_.continuation_token = resp_->next_continuation_token;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

std::string region;
if (GetRegionResponse resp = client_->GetRegion(args_.bucket, args_.region)) {
region = resp.region;
if (args_.recursive) {
args_.delimiter = "";
} else if (args_.delimiter.empty()) {
args_.delimiter = "/";
}

if (args_.include_versions || !args_.version_id_marker.empty()) {
resp_ = client_->ListObjectVersions(ListObjectVersionsArgs(args_));
} else if (args_.use_api_v1) {
resp_ = client_->ListObjectsV1(ListObjectsV1Args(args_));
} else {
resp_ = client_->ListObjectsV2(ListObjectsV2Args(args_));
}
void ListObjectsResult::StartPrefetch() {
ListObjectsArgs next_args = args_;
try {
prefetch_future_ =
std::make_shared<std::future<ListObjectsResponse>>(std::async(
std::launch::async,
[client = client_, next_args = std::move(next_args)]() mutable {
try {
GetRegionResponse resp =
client->GetRegion(next_args.bucket, next_args.region);
if (resp) {
next_args.region = resp.region;
if (next_args.recursive) {
next_args.delimiter = "";
} else if (next_args.delimiter.empty()) {
next_args.delimiter = "/";
}

if (next_args.include_versions ||
!next_args.version_id_marker.empty()) {
return client->ListObjectVersions(
ListObjectVersionsArgs(next_args));
} else if (next_args.use_api_v1) {
return client->ListObjectsV1(ListObjectsV1Args(next_args));
} else {
return client->ListObjectsV2(ListObjectsV2Args(next_args));
}
}
return ListObjectsResponse(resp);
} catch (const std::exception& e) {
return ListObjectsResponse(
error::Error(std::string("prefetch failed: ") + e.what()));
}
}));
} catch (const std::exception& e) {
std::promise<ListObjectsResponse> p;
p.set_value(ListObjectsResponse(
error::Error(std::string("failed to launch prefetch: ") + e.what())));
prefetch_future_ =
std::make_shared<std::future<ListObjectsResponse>>(p.get_future());
}
}

if (!resp_) {
failed_ = true;
resp_.contents.push_back(Item(resp_));
}
} else {
void ListObjectsResult::Populate() {
if (!prefetch_future_ || !prefetch_future_->valid()) {
return;
}
try {
resp_ = std::make_shared<ListObjectsResponse>(prefetch_future_->get());
} catch (const std::exception& e) {
resp_ = std::make_shared<ListObjectsResponse>(
error::Error(std::string("prefetch result failed: ") + e.what()));
}
prefetch_future_.reset();
if (!*resp_) {
failed_ = true;
resp_.contents.push_back(Item(resp));
resp_->contents.push_back(Item(*resp_));
}
itr_ = resp_->contents.begin();

itr_ = resp_.contents.begin();
if (*resp_ && resp_->is_truncated) {
UpdatePaginationArgs();
StartPrefetch();
}
}

RemoveObjectsResult::RemoveObjectsResult(error::Error err) {
Expand Down
Loading