Dataset#
警告
實驗性功能:Java 模組 dataset
目前處於早期開發階段。在 Apache Arrow 達到成熟階段之前,API 可能會在每個版本中變更。
Dataset 是 Apache Arrow 中的通用層,用於查詢不同格式或不同分割策略的資料。通常要查詢的資料應該位於傳統檔案系統中,但是 Arrow Dataset 的設計不僅限於查詢檔案,還可以擴展到服務所有可能的資料來源,例如來自跨程序通訊或其他網路位置等等。
開始使用#
目前支援的檔案格式為
Apache Arrow (
.arrow
)Apache ORC (
.orc
)Apache Parquet (
.parquet
)逗號分隔值 (
.csv
)換行符號分隔的 JSON 值 (
.json
)
下面展示了在 Java 中使用 Dataset 查詢 Parquet 檔案的最簡單範例
// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
final VectorUnloader unloader = new VectorUnloader(root);
batches.add(unloader.getRecordBatch());
}
}
// do something with read record batches, for example:
analyzeArrowData(batches);
// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
} catch (Exception e) {
e.printStackTrace();
}
注意
ArrowRecordBatch
是一種低階複合 Arrow 資料交換格式,不提供直接從中讀取類型化資料的 API。建議使用工具程式 VectorLoader
將其載入到感知 schema 的容器 VectorSchemaRoot
中,使用者可以藉此在 Java 中方便地存取解碼後的資料。
ScanOptions batchSize
引數只有在設定為小於 recordbatch 中的列數時才會生效。
另請參閱
使用 VectorSchemaRoot 載入 record batch。
Schema#
要查詢的資料的 Schema 可以透過方法 DatasetFactory#inspect()
在實際讀取之前進行檢查。例如
// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
// inspect schema
Schema schema = factory.inspect();
對於某些與使用者定義的 schema 相容的資料格式,使用者可以使用方法 DatasetFactory#inspect(Schema schema)
來建立 dataset
Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);
否則,當呼叫非參數方法 DatasetFactory#inspect()
時,schema 將會從資料來源自動推斷。與 DatasetFactory#inspect()
的結果相同。
此外,如果在掃描期間指定了投影器 (請參閱下一節 投影 (欄位的子集)),則可以使用方法 Scanner::schema()
取得輸出資料的實際 schema
Scanner scanner = dataset.newScan(
new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();
投影 (欄位的子集)#
使用者可以在 ScanOptions 中指定投影。例如
String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));
如果不需要投影,則在 ScanOptions 中省略可選的投影引數
ScanOptions options = new ScanOptions(32768, Optional.empty());
或使用快捷建構子
ScanOptions options = new ScanOptions(32768);
然後,在掃描期間將會發出所有欄位。
投影 (產生新欄位) 和篩選器#
使用者可以使用 Substrait 在 ScanOptions 中指定投影 (新欄位) 或篩選器。例如
ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitExpressionFilter(substraitExpressionFilter)
.substraitExpressionProjection(getSubstraitExpressionProjection())
.build();
另請參閱
- 使用擴充表示式執行投影和篩選器
使用 Substrait 的投影和篩選器。
從 HDFS 讀取資料#
FileSystemDataset
支援從非本機檔案系統讀取資料。HDFS 支援包含在官方 Apache Arrow Java 套件發行版中,可以直接使用,無需重新建置原始碼。
若要使用 Dataset API 存取 HDFS 資料,請將一般 HDFS URI 傳遞給 FilesSystemDatasetFactory
String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
原生記憶體管理#
為了獲得更好的效能並降低程式碼複雜度,Java FileSystemDataset
在內部依賴 C++ arrow::dataset::FileSystemDataset
(透過 JNI)。因此,從 FileSystemDataset
讀取的所有 Arrow 資料都應該在 JVM 堆積之外配置。為了管理這部分記憶體,向使用者提供了工具程式類別 NativeMemoryPool
。
作為一個基本範例,透過使用可監聽的 NativeMemoryPool
,使用者可以傳遞一個監聽器,以掛鉤 C++ 緩衝區的配置/釋放
AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
@Override
public void reserve(long size) {
reserved.getAndAdd(size);
}
@Override
public void unreserve(long size) {
reserved.getAndAdd(-size);
}
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
pool, FileFormat.PARQUET, uri);
此外,為從 dataset 讀取的資料保留相同數量的 JVM 直接記憶體是一種非常常見的情況。為此,提供了內建工具程式類別 DirectReservationListener
NativeMemoryPool pool = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
這樣一來,一旦 Arrow 緩衝區的已配置位元組計數達到 JVM 直接記憶體的限制,掃描期間就會拋出 OutOfMemoryError: Direct buffer memory
。
注意
預設執行個體 NativeMemoryPool.getDefaultMemoryPool()
在緩衝區配置/釋放時不執行任何操作。在 POC 或測試的情況下可以使用它,但對於複雜環境中的生產用途,建議使用可監聽的記憶體池來管理記憶體。
注意
傳遞給 FileSystemDatasetFactory
建構子的 BufferAllocator
執行個體也知道產生的 dataset 執行個體的整體記憶體用量。一旦建立 Java 緩衝區,傳遞的分配器將成為它們的父分配器。
使用注意事項#
原生物件資源管理#
作為依賴 JNI 的另一個結果,所有與 FileSystemDataset
相關的元件都應該手動關閉,或者在使用後使用 try-with-resources 來釋放對應的原生物件。例如
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory factory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(options)
) {
// do something
} catch (Exception e) {
e.printStackTrace();
}
如果使用者忘記關閉它們,則可能會導致原生物件洩漏。
BatchSize#
ScanOptions
的 batchSize
引數是單個批次大小的限制。
例如,讓我們嘗試讀取一個使用 gzip 壓縮和 3 個列組的 Parquet 檔案
# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age: OPTIONAL INT64 R:0 D:1
name: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
在這裡,我們將 ScanOptions 中的 batchSize 設定為 32768。因為它大於下一個批次中的列數 (由於第一個列組只有 4 列,因此為 4 列),所以程式只獲得 4 列。掃描器不會合併較小的批次以達到限制,但它會分割較大的批次以保持在限制之下。因此,如果列組的列數超過 32768 列,它將被分割成 32768 列或更少的區塊。