Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

namespace orc {
Expand Down Expand Up @@ -696,6 +697,16 @@ namespace orc {
virtual void preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) = 0;

/**
* Calculate prefetch ranges by selected stripes and columns.
* It is thread safe and does not cache data.
* @param stripes the stripes to prefetch
* @param includeTypes the types to prefetch
* @return prefetch ranges as offset/length pairs
*/
virtual std::vector<std::pair<uint64_t, uint64_t>> preBufferRange(
const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) = 0;
Comment thread
lucasfang marked this conversation as resolved.

/**
* Release cached entries whose right boundary is less than or equal to the given boundary.
* @param boundary the boundary value to release cache entries
Expand Down
31 changes: 26 additions & 5 deletions c++/src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1770,8 +1770,8 @@ namespace orc {
contents_->evictCache(boundary);
}

void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) {
std::vector<std::pair<uint64_t, uint64_t>> ReaderImpl::preBufferRange(
const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) {
std::vector<uint32_t> newStripes;
for (auto stripe : stripes) {
if (stripe < static_cast<uint32_t>(footer_->stripes_size())) newStripes.push_back(stripe);
Expand All @@ -1783,7 +1783,7 @@ namespace orc {
}

if (newStripes.empty() || newIncludeTypes.empty()) {
return;
return {};
}

orc::RowReaderOptions rowReaderOptions;
Expand All @@ -1792,12 +1792,33 @@ namespace orc {
std::vector<bool> selectedColumns;
columnSelector.updateSelected(selectedColumns, rowReaderOptions);

std::vector<std::pair<uint64_t, uint64_t>> ranges;

for (auto stripe : newStripes) {
const auto& stripeInfo = footer_->stripes(stripe);
proto::StripeFooter stripeFooter = getStripeFooter(stripeInfo, *contents_);
auto ranges = extractReadRangesForStripe(stripe, stripeInfo, stripeFooter, selectedColumns);
contents_->cacheRanges(std::move(ranges));
auto stripeRanges =
extractReadRangesForStripe(stripe, stripeInfo, stripeFooter, selectedColumns);
for (const auto& range : stripeRanges) {
ranges.emplace_back(range.offset, range.length);
}
}
return ranges;
Comment thread
lucasfang marked this conversation as resolved.
}

void ReaderImpl::preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) {
auto ranges = preBufferRange(stripes, includeTypes);
if (ranges.empty()) {
return;
}

std::vector<ReadRange> readRanges;
readRanges.reserve(ranges.size());
for (const auto& range : ranges) {
readRanges.emplace_back(range.first, range.second);
}
contents_->cacheRanges(std::move(readRanges));
Comment thread
lucasfang marked this conversation as resolved.
}

RowReader::~RowReader() {
Expand Down
3 changes: 3 additions & 0 deletions c++/src/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ namespace orc {
std::map<uint32_t, BloomFilterIndex> getBloomFilters(
uint32_t stripeIndex, const std::set<uint32_t>& included) const override;

std::vector<std::pair<uint64_t, uint64_t>> preBufferRange(
const std::vector<uint32_t>& stripes, const std::list<uint64_t>& includeTypes) override;

void preBuffer(const std::vector<uint32_t>& stripes,
const std::list<uint64_t>& includeTypes) override;
void releaseBuffer(uint64_t boundary) override;
Expand Down
Loading