Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 115 additions & 68 deletions cpp/src/arrow/io/caching.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,19 @@ struct ReadRangeCache::Impl {
IOContext ctx;
CacheOptions options;

// Ordered by offset (so as to find a matching region by binary search)
// Ordered by offset (so as to find a matching region by binary search).
// Mutation of `entries` and of individual entries' futures must be
// serialized via `entry_mutex`. Every public method that touches either
// acquires the mutex before delegating to the protected *Locked helpers
// below, so both the eager and lazy variants are safe to call concurrently
// from multiple threads.
std::vector<RangeCacheEntry> entries;
std::mutex entry_mutex;

virtual ~Impl() = default;

// -- Polymorphic hooks. Always called with entry_mutex held. --

// Get the future corresponding to a range
virtual Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) {
return entry->future;
Expand All @@ -172,46 +180,60 @@ struct ReadRangeCache::Impl {
return new_entries;
}

// Add the given ranges to the cache, coalescing them where possible
virtual Status Cache(std::vector<ReadRange> ranges) {
// -- Public entry points (acquire entry_mutex, then delegate). --

// Add the given ranges to the cache, coalescing them where possible.
Status Cache(std::vector<ReadRange> ranges) {
ARROW_ASSIGN_OR_RAISE(
ranges, internal::CoalesceReadRanges(std::move(ranges), options.hole_size_limit,
options.range_size_limit));
std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
// Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
std::merge(entries.begin(), entries.end(), new_entries.begin(), new_entries.end(),
merged.begin());
entries = std::move(merged);
} else {
entries = std::move(new_entries);
Status st;
{
std::unique_lock<std::mutex> guard(entry_mutex);
std::vector<RangeCacheEntry> new_entries = MakeCacheEntries(ranges);
// Add new entries, themselves ordered by offset
if (entries.size() > 0) {
std::vector<RangeCacheEntry> merged(entries.size() + new_entries.size());
std::merge(entries.begin(), entries.end(), new_entries.begin(),
new_entries.end(), merged.begin());
entries = std::move(merged);
} else {
entries = std::move(new_entries);
}
}
// Prefetch immediately, regardless of executor availability, if possible
auto st = file->WillNeed(ranges);
// Prefetch immediately, regardless of executor availability, if possible.
// Do this outside the lock: WillNeed() may block on an mmap advise / I/O
// hint and we don't want to serialize concurrent Reads on it.
st = file->WillNeed(ranges);
// As this is optimisation only, I/O failures should not be treated as fatal
if (st.IsIOError()) {
return Status::OK();
}
return st;
}

// Read the given range from the cache, blocking if needed. Cannot read a range
// that spans cache entries.
virtual Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
// Read the given range from the cache, blocking if needed. Cannot read a
// range that spans cache entries.
Result<std::shared_ptr<Buffer>> Read(ReadRange range) {
if (range.length == 0) {
static const uint8_t byte = 0;
return std::make_shared<Buffer>(&byte, 0);
}

const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
auto fut = MaybeRead(&*it);
ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
Future<std::shared_ptr<Buffer>> fut;
int64_t slice_offset = 0;
{
std::unique_lock<std::mutex> guard(entry_mutex);
const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it == entries.end() || !it->range.Contains(range)) {
return Status::Invalid("ReadRangeCache did not find matching cache entry");
}
fut = MaybeRead(&*it);
slice_offset = range.offset - it->range.offset;
if (options.lazy && options.prefetch_limit > 0) {
int64_t num_prefetched = 0;
for (auto next_it = it + 1;
Expand All @@ -224,53 +246,94 @@ struct ReadRangeCache::Impl {
++num_prefetched;
}
}
return SliceBuffer(std::move(buf), range.offset - it->range.offset, range.length);
}
return Status::Invalid("ReadRangeCache did not find matching cache entry");
// Drop the lock before blocking on the I/O future so other threads can
// still do lookups while a previously queued read is in flight.
ARROW_ASSIGN_OR_RAISE(auto buf, fut.result());
return SliceBuffer(std::move(buf), slice_offset, range.length);
}

virtual Future<> Wait() {
Future<> Wait() {
std::vector<Future<>> futures;
for (auto& entry : entries) {
futures.emplace_back(MaybeRead(&entry));
{
std::unique_lock<std::mutex> guard(entry_mutex);
futures.reserve(entries.size());
for (auto& entry : entries) {
futures.emplace_back(MaybeRead(&entry));
}
}
return AllComplete(futures);
}

// Evict cache entries whose byte range is fully contained within
// [start_offset, start_offset + length).
Result<int64_t> EvictEntriesInRange(int64_t start_offset, int64_t length) {
if (length <= 0) {
return 0;
}
const int64_t end_offset = start_offset + length;
int64_t n_evicted = 0;
std::unique_lock<std::mutex> guard(entry_mutex);
// entries is sorted by range.offset, so we can fast-forward to the first
// entry whose offset is >= start_offset and stop once we pass end_offset.
auto it = std::lower_bound(
entries.begin(), entries.end(), start_offset,
[](const RangeCacheEntry& entry, int64_t offset) {
return entry.range.offset < offset;
});
while (it != entries.end() && it->range.offset < end_offset) {
if (it->range.offset + it->range.length <= end_offset) {
it = entries.erase(it);
++n_evicted;
} else {
// Entry extends beyond the requested window (e.g. coalesced with a
// neighboring range). Leave it alone and keep scanning in case
// smaller siblings follow.
++it;
}
}
return n_evicted;
}

// Return a Future that completes when the given ranges have been read.
virtual Future<> WaitFor(std::vector<ReadRange> ranges) {
Future<> WaitFor(std::vector<ReadRange> ranges) {
auto end = std::remove_if(ranges.begin(), ranges.end(),
[](const ReadRange& range) { return range.length == 0; });
ranges.resize(end - ranges.begin());
std::vector<Future<>> futures;
futures.reserve(ranges.size());
for (auto& range : ranges) {
const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length < range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
futures.push_back(Future<>(MaybeRead(&*it)));
} else {
return Status::Invalid("Range was not requested for caching: offset=",
range.offset, " length=", range.length);
{
std::unique_lock<std::mutex> guard(entry_mutex);
for (auto& range : ranges) {
const auto it = std::lower_bound(
entries.begin(), entries.end(), range,
[](const RangeCacheEntry& entry, const ReadRange& range) {
return entry.range.offset + entry.range.length <
range.offset + range.length;
});
if (it != entries.end() && it->range.Contains(range)) {
futures.push_back(Future<>(MaybeRead(&*it)));
} else {
return Status::Invalid("Range was not requested for caching: offset=",
range.offset, " length=", range.length);
}
}
}
return AllComplete(futures);
}
};

// Don't read ranges when they're first added. Instead, wait until they're requested
// (either through Read or WaitFor).
// Don't read ranges when they're first added. Instead, wait until they're
// requested (either through Read or WaitFor). Thread safety is inherited from
// the base Impl: both MakeCacheEntries and MaybeRead are only ever invoked
// from the public entry points, and those hold the base class's entry_mutex
// before delegating.
struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
// Protect against concurrent modification of entries[i]->future
std::mutex entry_mutex;

virtual ~LazyImpl() = default;

Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
// Called by superclass Read()/WaitFor() so we have the lock
// Called by the superclass under entry_mutex, so it is safe to mutate
// entry->future here.
if (!entry->future.is_valid()) {
entry->future = file->ReadAsync(ctx, entry->range.offset, entry->range.length);
}
Expand All @@ -288,26 +351,6 @@ struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
}
return new_entries;
}

Status Cache(std::vector<ReadRange> ranges) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Cache(std::move(ranges));
}

Result<std::shared_ptr<Buffer>> Read(ReadRange range) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Read(range);
}

Future<> Wait() override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::Wait();
}

Future<> WaitFor(std::vector<ReadRange> ranges) override {
std::unique_lock<std::mutex> guard(entry_mutex);
return ReadRangeCache::Impl::WaitFor(std::move(ranges));
}
};

ReadRangeCache::ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file,
Expand Down Expand Up @@ -336,6 +379,10 @@ Future<> ReadRangeCache::WaitFor(std::vector<ReadRange> ranges) {
return impl_->WaitFor(std::move(ranges));
}

Result<int64_t> ReadRangeCache::EvictEntriesInRange(int64_t start_offset, int64_t length) {
return impl_->EvictEntriesInRange(start_offset, length);
}

} // namespace internal
} // namespace io
} // namespace arrow
15 changes: 15 additions & 0 deletions cpp/src/arrow/io/caching.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ class ARROW_EXPORT ReadRangeCache {
/// \brief Wait until all given ranges have been cached.
Future<> WaitFor(std::vector<ReadRange> ranges);

/// \brief Evict cache entries whose byte range is fully contained within
/// [start_offset, start_offset + length).
///
/// This releases the memory held by those entries, allowing buffers to be
/// freed as soon as no other owner retains a reference. Entries that are not
/// fully contained in the given window (for example, because I/O range
/// coalescing merged them with an adjacent region the caller still needs)
/// are not evicted, so this method is safe to call even if some cached
/// ranges have been merged across unrelated consumers.
///
/// \param[in] start_offset Start of the window (inclusive).
/// \param[in] length Length of the window.
/// \return Number of cache entries that were evicted.
Result<int64_t> EvictEntriesInRange(int64_t start_offset, int64_t length);

protected:
struct Impl;
struct LazyImpl;
Expand Down
Loading