讀取和寫入 Parquet 檔案#
另請參閱
Parquet 格式是一種空間效率高的欄狀儲存格式,適用於複雜資料。Parquet C++ 實作是 Apache Arrow 專案的一部分,並受益於與 Arrow C++ 類別和功能的緊密整合。
讀取 Parquet 檔案#
arrow::FileReader
類別將資料讀取到 Arrow 表格和 Record Batch 中。
StreamReader
類別允許使用 C++ 輸入流方法讀取資料,以逐欄逐列方式讀取欄位。提供此方法是為了易於使用和類型安全。當然,當資料必須在檔案以增量方式讀取和寫入時進行串流傳輸時,此方法也很有用。
請注意,由於類型檢查以及欄位值一次處理一個,StreamReader
的效能將不如 FileReader。
FileReader#
若要將 Parquet 資料讀取到 Arrow 結構中,請使用 arrow::FileReader
。要建構,它需要一個代表輸入檔案的 ::arrow::io::RandomAccessFile
實例。若要一次讀取整個檔案,請使用 arrow::FileReader::ReadTable()
// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(path_to_file));
// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, parquet::arrow::OpenFile(input, pool));
// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
更細緻的選項可通過 arrow::FileReaderBuilder
輔助類別取得,該類別接受 ReaderProperties
和 ArrowReaderProperties
類別。
若要以批次串流方式讀取,請使用 arrow::FileReader::GetRecordBatchReader()
方法來檢索 arrow::RecordBatchReader
。它將使用在 ArrowReaderProperties
中設定的批次大小。
// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Configure general Parquet reader settings
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(128 * 1024); // default 64 * 1024
parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(
reader_builder.OpenFile(path_to_file, /*memory_map=*/false, reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(&rb_reader));
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *rb_reader) {
// Operate on each batch...
}
另請參閱
若要讀取多檔案資料集或向下推送篩選器以修剪列組,請參閱 表格資料集。
效能和記憶體效率#
對於遠端檔案系統,請使用讀取合併(預先緩衝)以減少 API 呼叫次數
auto arrow_reader_props = parquet::ArrowReaderProperties();
reader_properties.set_prebuffer(true);
預設值通常針對良好的效能進行調整,但預設情況下並行欄解碼是關閉的。在 ArrowReaderProperties
的建構函式中啟用它
auto arrow_reader_props = parquet::ArrowReaderProperties(/*use_threads=*/true);
如果記憶體效率比效能更重要,那麼
請勿在
parquet::ArrowReaderProperties
中開啟讀取合併(預先緩衝)。使用
arrow::FileReader::GetRecordBatchReader()
以批次方式讀取資料。在
parquet::ReaderProperties
中開啟enable_buffered_stream
。
此外,如果您知道某些欄位包含許多重複值,您可以將它們讀取為字典編碼欄位。這可以使用 ArrowReaderProperties
上的 set_read_dictionary
設定啟用。如果檔案是使用 Arrow C++ 寫入且 store_schema
已啟用,則將自動讀取原始 Arrow 結構描述並覆蓋此設定。
StreamReader#
StreamReader
允許使用標準 C++ 輸入運算子讀取 Parquet 檔案,從而確保類型安全。
請注意,類型必須與結構描述完全匹配,即如果結構描述欄位是無號 16 位元整數,則您必須提供 uint16_t
類型。
例外狀況用於指示錯誤。在以下情況下會拋出 ParquetException
嘗試透過提供不正確的類型來讀取欄位。
嘗試讀取超出列尾。
嘗試讀取超出檔案尾。
#include "arrow/io/file.h"
#include "parquet/stream_reader.h"
{
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(
infile,
arrow::io::ReadableFile::Open("test.parquet"));
parquet::StreamReader stream{parquet::ParquetFileReader::Open(infile)};
std::string article;
float price;
uint32_t quantity;
while ( !stream.eof() )
{
stream >> article >> price >> quantity >> parquet::EndRow;
// ...
}
}
寫入 Parquet 檔案#
WriteTable#
arrow::WriteTable()
函式將整個 ::arrow::Table
寫入到輸出檔案。
// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, GetTable());
// Choose compression
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(),
arrow::default_memory_pool(), outfile,
/*chunk_size=*/3, props, arrow_props));
注意
在 C++ 中,欄壓縮預設為關閉。請參閱下方,了解如何在寫入器屬性中選擇壓縮編解碼器。
若要逐批寫出資料,請使用 arrow::FileWriter
。
// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
// Data is in RBR
std::shared_ptr<arrow::RecordBatchReader> batch_stream;
ARROW_ASSIGN_OR_RAISE(batch_stream, GetRBR());
// Choose compression
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
// Create a writer
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
std::unique_ptr<parquet::arrow::FileWriter> writer;
ARROW_ASSIGN_OR_RAISE(
writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(),
arrow::default_memory_pool(), outfile,
props, arrow_props));
// Write each batch as a row_group
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *batch_stream) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
ARROW_ASSIGN_OR_RAISE(auto table,
arrow::Table::FromRecordBatches(batch->schema(), {batch}));
ARROW_RETURN_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}
// Write file footer and close
ARROW_RETURN_NOT_OK(writer->Close());
StreamWriter#
StreamWriter
允許使用標準 C++ 輸出運算子寫入 Parquet 檔案,類似於使用 StreamReader
類別讀取。這種類型安全的方法還可確保在寫入列時不會遺漏欄位,並允許自動(在達到一定資料量後)或透過使用 EndRowGroup
串流修飾符來明確建立新的列組。
例外狀況用於指示錯誤。在以下情況下會拋出 ParquetException
嘗試使用不正確的類型寫入欄位。
嘗試在一列中寫入太多欄位。
嘗試略過必要欄位。
#include "arrow/io/file.h"
#include "parquet/stream_writer.h"
{
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile,
arrow::io::FileOutputStream::Open("test.parquet"));
parquet::WriterProperties::Builder builder;
std::shared_ptr<parquet::schema::GroupNode> schema;
// Set up builder with required compression type etc.
// Define schema.
// ...
parquet::StreamWriter os{
parquet::ParquetFileWriter::Open(outfile, schema, builder.build())};
// Loop over some data structure which provides the required
// fields to be written and write each row.
for (const auto& a : getArticles())
{
os << a.name() << a.price() << a.quantity() << parquet::EndRow;
}
}
寫入器屬性#
若要設定 Parquet 檔案的寫入方式,請使用 WriterProperties::Builder
#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using parquet::ParquetVersion;
using parquet::ParquetDataPageVersion;
using arrow::Compression;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.max_row_group_length(64 * 1024)
.created_by("My Application")
.version(ParquetVersion::PARQUET_2_6)
.data_page_version(ParquetDataPageVersion::V2)
.compression(Compression::SNAPPY)
.build();
max_row_group_length
設定每個列組的列數上限,該上限優先於寫入方法中傳遞的 chunk_size
。
您可以使用 version
設定要寫入的 Parquet 版本,這決定了哪些邏輯類型可用。此外,您可以使用 data_page_version
設定資料頁面版本。預設為 V1;設定為 V2 將允許更佳的壓縮(跳過壓縮沒有空間效益的頁面),但並非所有讀取器都支援此資料頁面版本。
壓縮預設為關閉,但為了充分利用 Parquet,您也應該選擇壓縮編解碼器。您可以為整個檔案選擇一個,或為個別欄位選擇一個。如果您選擇混合使用,檔案層級選項將適用於沒有特定壓縮編解碼器的欄位。請參閱 ::arrow::Compression
以取得選項。
欄位資料編碼同樣可以在檔案層級或欄位層級應用。預設情況下,寫入器將嘗試對所有支援的欄位進行字典編碼,除非字典變得太大。可以使用 disable_dictionary()
在檔案層級或欄位層級更改此行為。當不使用字典編碼時,它將回退到為欄位或整個檔案設定的編碼;預設為 Encoding::PLAIN
,但可以使用 encoding()
更改。
#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using arrow::Compression;
using parquet::Encoding;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.compression(Compression::SNAPPY) // Fallback
->compression("colA", Compression::ZSTD) // Only applies to column "colA"
->encoding(Encoding::BIT_PACKED) // Fallback
->encoding("colB", Encoding::RLE) // Only applies to column "colB"
->disable_dictionary("colB") // Never dictionary-encode column "colB"
->build();
預設情況下,所有欄位都啟用統計資訊。您可以使用建構器上的 disable_statistics
停用所有欄位或特定欄位的統計資訊。有一個 max_statistics_size
限制可用於最小值和最大值的最大位元組數,這對於字串或二進位 blob 等類型很有用。如果欄位使用 enable_write_page_index
啟用了頁面索引,則它不會將統計資訊寫入頁面標頭,因為它在 ColumnIndex 中重複。
還有一些 Arrow 特定的設定可以使用 parquet::ArrowWriterProperties
進行配置
#include "parquet/arrow/writer.h"
using parquet::ArrowWriterProperties;
std::shared_ptr<ArrowWriterProperties> arrow_props = ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps() // default False
->store_schema() // default False
->build();
這些選項主要決定了 Arrow 類型如何轉換為 Parquet 類型。開啟 store_schema
將導致寫入器將序列化的 Arrow 結構描述儲存在檔案中繼資料中。由於 Parquet 結構描述和 Arrow 結構描述之間沒有雙射關係,因此儲存 Arrow 結構描述允許 Arrow 讀取器更忠實地重新建立原始資料。從 Parquet 類型到原始 Arrow 類型的此對應包括
讀取具有原始時區資訊的時間戳記(Parquet 不支援時區);
從其儲存類型讀取 Arrow 類型(例如從 int64 欄位的 Duration);
將字串和二進位欄位讀取回具有 64 位元偏移量的大型變體;
將欄位讀取回字典編碼(Arrow 欄位和序列化的 Parquet 版本是否為字典編碼是獨立的)。
支援的 Parquet 功能#
Parquet 格式具有許多功能,而 Parquet C++ 支援其中一部分。
頁面類型#
頁面類型 |
註解 |
---|---|
DATA_PAGE |
|
DATA_PAGE_V2 |
|
DICTIONARY_PAGE |
不支援的頁面類型: INDEX_PAGE。讀取 Parquet 檔案時,將忽略此類型的頁面。
壓縮#
壓縮編解碼器 |
註解 |
---|---|
SNAPPY |
|
GZIP |
|
BROTLI |
|
LZ4 |
(1) |
ZSTD |
(1)在讀取端,Parquet C++ 能夠解壓縮常規 LZ4 區塊格式和參考 Parquet 實作使用的 ad-hoc Hadoop LZ4 格式。在寫入端,Parquet C++ 始終產生 ad-hoc Hadoop LZ4 格式。
不支援的壓縮編解碼器: LZO。
編碼#
編碼 |
讀取 |
寫入 |
註解 |
---|---|---|---|
PLAIN |
✓ |
✓ |
|
PLAIN_DICTIONARY |
✓ |
✓ |
|
BIT_PACKED |
✓ |
✓ |
(1) |
RLE |
✓ |
✓ |
(1) |
RLE_DICTIONARY |
✓ |
✓ |
(2) |
BYTE_STREAM_SPLIT |
✓ |
✓ |
|
DELTA_BINARY_PACKED |
✓ |
✓ |
|
DELTA_BYTE_ARRAY |
✓ |
✓ |
|
DELTA_LENGTH_BYTE_ARRAY |
✓ |
✓ |
(1)僅支援用於編碼定義和重複層級以及布林值。
(2)在寫入路徑上,只有在
WriterProperties::version()
中選擇了 Parquet 格式版本 2.4 或更高版本時,才會啟用 RLE_DICTIONARY。
類型#
物理類型#
物理類型 |
對應的 Arrow 類型 |
註解 |
---|---|---|
BOOLEAN |
布林值 |
|
INT32 |
Int32 / 其他 |
(1) |
INT64 |
Int64 / 其他 |
(1) |
INT96 |
時間戳記(奈秒) |
(2) |
FLOAT |
Float32 |
|
DOUBLE |
Float64 |
|
BYTE_ARRAY |
二進位 / 其他 |
(1) (3) |
FIXED_LENGTH_BYTE_ARRAY |
FixedSizeBinary / 其他 |
(1) |
(1)可以根據邏輯類型(請參閱下文)對應到其他 Arrow 類型。
(2)在寫入端,必須啟用
ArrowWriterProperties::support_deprecated_int96_timestamps()
。(3)在寫入端,Arrow LargeBinary 也可以對應到 BYTE_ARRAY。
邏輯類型#
特定的邏輯類型可以覆蓋給定物理類型的預設 Arrow 類型對應。
邏輯類型 |
物理類型 |
對應的 Arrow 類型 |
註解 |
---|---|---|---|
NULL |
任何 |
Null |
(1) |
INT |
INT32 |
Int8 / UInt8 / Int16 / UInt16 / Int32 / UInt32 |
|
INT |
INT64 |
Int64 / UInt64 |
|
DECIMAL |
INT32 / INT64 / BYTE_ARRAY / FIXED_LENGTH_BYTE_ARRAY |
Decimal128 / Decimal256 |
(2) |
DATE |
INT32 |
Date32 |
(3) |
TIME |
INT32 |
Time32(毫秒) |
|
TIME |
INT64 |
Time64(微秒或奈秒) |
|
TIMESTAMP |
INT64 |
時間戳記(毫秒、微秒或奈秒) |
|
STRING |
BYTE_ARRAY |
Utf8 |
(4) |
LIST |
任何 |
List |
(5) |
MAP |
任何 |
Map |
(6) |
FLOAT16 |
FIXED_LENGTH_BYTE_ARRAY |
HalfFloat |
(1)在寫入端,產生 Parquet 物理類型 INT32。
(2)在寫入端,始終發出 FIXED_LENGTH_BYTE_ARRAY。
(3)在寫入端,Arrow Date64 也會對應到 Parquet DATE INT32。
(4)在寫入端,Arrow LargeUtf8 也會對應到 Parquet STRING。
(5)在寫入端,Arrow LargeList 或 FixedSizedList 也會對應到 Parquet LIST。
(6)在讀取端,具有多個值的索引鍵不會被去重複,這與 Parquet 規格相矛盾。
不支援的邏輯類型: JSON、BSON、UUID。如果在讀取 Parquet 檔案時遇到此類類型,則使用預設物理類型對應(例如,Parquet JSON 欄位可以讀取為 Arrow Binary 或 FixedSizeBinary)。
轉換類型#
雖然轉換類型在 Parquet 格式中已棄用(它們已被邏輯類型取代),但 Parquet C++ 實作會識別和發出它們,以便最大限度地提高與其他 Parquet 實作的相容性。
特殊情況#
Arrow Extension 類型以其儲存類型寫出。它仍然可以使用 Parquet 中繼資料在讀取時重新建立(請參閱下方的「Arrow 類型往返」)。
Arrow Dictionary 類型以其值類型寫出。它仍然可以使用 Parquet 中繼資料在讀取時重新建立(請參閱下方的「Arrow 類型往返」)。
Arrow 類型和結構描述往返#
雖然 Arrow 類型和 Parquet 類型之間沒有雙射關係,但可以將 Arrow 結構描述序列化為 Parquet 檔案中繼資料的一部分。這可以使用 ArrowWriterProperties::store_schema()
啟用。
在讀取路徑上,序列化的結構描述將被自動識別,並將重新建立原始 Arrow 資料,並根據需要轉換 Parquet 資料。
舉例來說,當將 Arrow LargeList 序列化為 Parquet 時
資料以 Parquet LIST 寫出
讀回時,如果在寫入檔案時啟用了
ArrowWriterProperties::store_schema()
,則 Parquet LIST 資料會解碼為 Arrow LargeList;否則,它會解碼為 Arrow List。
Parquet 欄位 ID#
Parquet 格式支援可選的整數欄位 ID,可以將其指派給給定的欄位。例如,這在 Apache Iceberg 規格中使用。
在寫入器端,如果 PARQUET:field_id
作為 Arrow 欄位的中繼資料索引鍵存在,則其值會被剖析為非負整數,並用作對應 Parquet 欄位的欄位 ID。
在讀取器端,Arrow 會將此類欄位 ID 轉換為名為 PARQUET:field_id
的中繼資料索引鍵,位於對應的 Arrow 欄位上。
序列化詳細資訊#
Arrow 結構描述被序列化為 Arrow IPC 結構描述訊息,然後進行 base64 編碼並儲存在 Parquet 檔案中繼資料中的 ARROW:schema
中繼資料索引鍵下。
限制#
不支援寫入或讀取回具有空項目的 FixedSizedList 資料。
加密#
Parquet C++ 實作了 加密規格中指定的所有功能,但欄索引和 bloom 篩選器模組的加密除外。
更具體地說,Parquet C++ 支援
AES_GCM_V1 和 AES_GCM_CTR_V1 加密演算法。
Footer、ColumnMetaData、Data Page、Dictionary Page、Data PageHeader、Dictionary PageHeader 模組類型的 AAD 後綴。不支援其他模組類型(ColumnIndex、OffsetIndex、BloomFilter Header、BloomFilter Bitset)。
EncryptionWithFooterKey 和 EncryptionWithColumnKey 模式。
加密 Footer 和純文字 Footer 模式。
雜項#
功能 |
讀取 |
寫入 |
註解 |
---|---|---|---|
欄索引 |
✓ |
✓ |
(1) |
偏移索引 |
✓ |
✓ |
(1) |
Bloom 篩選器 |
✓ |
✓ |
(2) |
CRC 檢查總和 |
✓ |
✓ |
(1)提供對欄索引和偏移索引結構的存取,但資料讀取 API 目前不使用它們。
(2)提供用於建立、序列化和還原序列化 Bloom 篩選器的 API,但它們未整合到資料讀取 API 中。