8000 Unify Parquet Metadata cache invalidation logic with Cached File System cache invalidation by Mytherin · Pull Request #17334 · duckdb/duckdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Unify Parquet Metadata cache invalidation logic with Cached File System cache invalidation #17334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extension/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(PARQUET_EXTENSION_FILES
column_writer.cpp
parquet_crypto.cpp
parquet_extension.cpp
parquet_file_metadata_cache.cpp
parquet_float16.cpp
parquet_multi_file_info.cpp
parquet_metadata.cpp
Expand Down
32 changes: 13 additions & 19 deletions extension/parquet/include/parquet_file_metadata_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,35 @@
#pragma once

#include "duckdb.hpp"
#ifndef DUCKDB_AMALGAMATION
#include "duckdb/storage/object_cache.hpp"
#include "geo_parquet.hpp"
#endif
#include "parquet_types.h"

namespace duckdb {
struct CachingFileHandle;

//! ParquetFileMetadataCache
class ParquetFileMetadataCache : public ObjectCacheEntry {
public:
ParquetFileMetadataCache() : metadata(nullptr) {
}
ParquetFileMetadataCache(unique_ptr<duckdb_parquet::FileMetaData> file_metadata, time_t r_time,
unique_ptr<GeoParquetFileMetadata> geo_metadata)
: metadata(std::move(file_metadata)), read_time(r_time), geo_metadata(std::move(geo_metadata)) {
}

ParquetFileMetadataCache(unique_ptr<duckdb_parquet::FileMetaData> file_metadata, CachingFileHandle &handle,
unique_ptr<GeoParquetFileMetadata> geo_metadata);
~ParquetFileMetadataCache() override = default;

//! Parquet file metadata
unique_ptr<const duckdb_parquet::FileMetaData> metadata;

//! read time
time_t read_time;

//! GeoParquet metadata
unique_ptr<GeoParquetFileMetadata> geo_metadata;

public:
static string ObjectType() {
return "parquet_metadata";
}
static string ObjectType();
string GetObjectType() override;

string GetObjectType() override {
return ObjectType();
}
bool IsValid(CachingFileHandle &new_handle) const;

private:
bool validate;
time_t last_modified;
string version_tag;
};

} // namespace duckdb
27 changes: 27 additions & 0 deletions extension/parquet/parquet_file_metadata_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include "parquet_file_metadata_cache.hpp"
#include "duckdb/storage/external_file_cache.hpp"
#include "duckdb/storage/caching_file_system.hpp"

namespace duckdb {

ParquetFileMetadataCache::ParquetFileMetadataCache(unique_ptr<duckdb_parquet::FileMetaData> file_metadata,
CachingFileHandle &handle,
unique_ptr<GeoParquetFileMetadata> geo_metadata)
: metadata(std::move(file_metadata)), geo_metadata(std::move(geo_metadata)), validate(handle.Validate()),
last_modified(handle.GetLastModifiedTime()), version_tag(handle.GetVersionTag()) {
}

string ParquetFileMetadataCache::ObjectType() {
return "parquet_metadata";
}

string ParquetFileMetadataCache::GetObjectType() {
return ObjectType();
}

bool ParquetFileMetadataCache::IsValid(CachingFileHandle &new_handle) const {
return ExternalFileCache::IsValid(validate, version_tag, last_modified, new_handle.GetVersionTag(),
new_handle.GetLastModifiedTime());
}

} // namespace duckdb
9 changes: 2 additions & 7 deletions extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ static shared_ptr<ParquetFileMetadataCache>
LoadMetadata(ClientContext &context, Allocator &allocator, CachingFileHandle &file_handle,
const shared_ptr<const ParquetEncryptionConfig> &encryption_config, const EncryptionU 10000 til &encryption_util,
optional_idx footer_size) {
auto current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());

auto file_proto = CreateThriftFileProtocol(file_handle, false);
auto &transport = reinterpret_cast<ThriftFileTransport &>(*file_proto->getTransport());
auto file_size = transport.GetSize();
Expand Down Expand Up @@ -175,8 +173,7 @@ LoadMetadata(ClientContext &context, Allocator &allocator, CachingFileHandle &fi

// Try to read the GeoParquet metadata (if present)
auto geo_metadata = GeoParquetFileMetadata::TryRead(*metadata, context);

return make_shared_ptr<ParquetFileMetadataCache>(std::move(metadata), current_time, std::move(geo_metadata));
return make_shared_ptr<ParquetFileMetadataCache>(std::move(metadata), file_handle, std::move(geo_metadata));
}

LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, ParquetColumnSchema &schema) const {
Expand Down Expand Up @@ -781,7 +778,6 @@ ParquetReader::ParquetReader(ClientContext &context_p, OpenFileInfo file_p, Parq
} else {
encryption_util = make_shared_ptr<duckdb_mbedtls::MbedTlsWrapper::AESStateMBEDTLSFactory>();
}

// If metadata cached is disabled
// or if this file has cached metadata
// or if the cached version already expired
Expand All @@ -792,9 +788,8 @@ ParquetReader::ParquetReader(ClientContext &context_p, OpenFileInfo file_p, Parq
metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
*encryption_util, footer_size);
} else {
auto last_modify_time = file_handle->GetLastModifiedTime();
metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file.path);
if (!metadata || (last_modify_time + 10 >= metadata->read_time)) {
if (!metadata || !metadata->IsValid(*file_handle)) {
metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
*encryption_util, footer_size);
ObjectCache::GetObjectCache(context_p).Put(file.path, metadata);
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/storage/caching_file_system.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ struct CachingFileHandle {
DUCKDB_API string GetPath() const;
DUCKDB_API idx_t GetFileSize();
DUCKDB_API time_t GetLastModifiedTime();
DUCKDB_API string GetVersionTag();
DUCKDB_API bool Validate() const;
DUCKDB_API bool CanSeek();
DUCKDB_API bool IsRemoteFile() const;
DUCKDB_API bool OnDiskFile();
Expand Down
5 changes: 4 additions & 1 deletion src/include/duckdb/storage/external_file_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ExternalFileCache {
void Verify(const unique_ptr<StorageLockKey> &guard) const;
//! Whether the CachedFile is still valid given the current modified/version tag
bool IsValid(const unique_ptr<StorageLockKey> &guard, bool validate, const string &current_version_tag,
time_t current_last_modified, int64_t access_time);
time_t current_last_modified);

//! Get reference to properties (must hold the lock)
idx_t &FileSize(const unique_ptr<StorageLockKey> &guard);
Expand Down Expand Up @@ -101,6 +101,9 @@ class ExternalFileCache {
//! Gets the cached file, or creates it if is not yet present
CachedFile &GetOrCreateCachedFile(const string &path);

DUCKDB_API static bool IsValid(bool validate, const string &cached_version_tag, time_t cached_last_modified,
const string &current_version_tag, time_t current_last_modified);

private:
//! The BufferManager used to cache files
BufferManager &buffer_manager;
Expand Down
16 changes: 14 additions & 2 deletions src/storage/caching_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ CachingFileHandle::~CachingFileHandle() {

FileHandle &CachingFileHandle::GetFileHandle() {
if (!file_handle) {
const auto current_time = duration_cast<std::chrono::seconds>(system_clock::now().time_since_epoch()).count();
file_handle = caching_file_system.file_system.OpenFile(path, flags);
last_modified = caching_file_system.file_system.GetLastModifiedTime(*file_handle);
version_tag = caching_file_system.file_system.GetVersionTag(*file_handle);

auto guard = cached_file.lock.GetExclusiveLock();
if (!cached_file.IsValid(guard, validate, version_tag, last_modified, current_time)) {
if (!cached_file.IsValid(guard, validate, version_tag, last_modified)) {
cached_file.Ranges(guard).clear(); // Invalidate entire cache
}
cached_file.FileSize(guard) = file_handle->GetFileSize();
Expand Down Expand Up @@ -165,6 +164,19 @@ time_t CachingFileHandle::GetLastModifiedTime() {
return cached_file.LastModified(guard);
}

string CachingFileHandle::GetVersionTag() {
if (file_handle || validate) {
GetFileHandle();
return version_tag;
}
auto guard = cached_file.lock.GetSharedLock();
return cached_file.VersionTag(guard);
}

bool CachingFileHandle::Validate() const {
return validate;
}

bool CachingFileHandle::CanSeek() {
if (file_handle || validate) {
return GetFileHandle().CanSeek();
Expand Down
19 changes: 14 additions & 5 deletions src/storage/external_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,37 @@ void ExternalFileCache::CachedFile::Verify(const unique_ptr<StorageLockKey> &gua
#endif
}

bool ExternalFileCache::CachedFile::IsValid(const unique_ptr<StorageLockKey> &guard, bool validate,
const string &current_version_tag, time_t current_last_modified,
int64_t access_time) {
bool ExternalFileCache::IsValid(bool validate, const string &cached_version_tag, time_t cached_last_modified,
const string &current_version_tag, time_t current_last_modified) {
if (!validate) {
return true; // Assume valid
}
if (!current_version_tag.empty()) {
return VersionTag(guard) == current_version_tag; // Validity checked by version tag (httpfs)
return cached_version_tag == current_version_tag; // Validity checked by version tag (httpfs)
}
if (LastModified(guard) != current_last_modified) {
if (cached_last_modified != current_last_modified) {
return false; // The file has certainly been modified
}
// The last modified time matches. However, we cannot blindly trust this,
// because some file systems use a low resolution clock to set the last modified time.
// So, we will require that the last modified time is more than 10 seconds ago.
static constexpr int64_t LAST_MODIFIED_THRESHOLD = 10;
const auto access_time = duration_cast<std::chrono::seconds>(system_clock::now().time_since_epoch()).count();
if (access_time < current_last_modified) {
return false; // Last modified in the future?
}
return access_time - current_last_modified > LAST_MODIFIED_THRESHOLD;
}

bool ExternalFileCache::CachedFile::IsValid(const unique_ptr<StorageLockKey> &guard, bool validate,
const string &current_version_tag, time_t current_last_modified) {
if (!validate) {
return true; // Assume valid
}
return ExternalFileCache::IsValid(validate, VersionTag(guard), LastModified(guard), current_version_tag,
current_last_modified);
}

idx_t &ExternalFileCache::CachedFile::FileSize(const unique_ptr<StorageLockKey> &guard) {
return file_size;
}
Expand Down
Loading
0