Dataset#

警告

實驗性功能:Java 模組 dataset 目前處於早期開發階段。在 Apache Arrow 達到成熟階段之前,API 可能會在每個版本中變更。

Dataset 是 Apache Arrow 中的通用層,用於查詢不同格式或不同分割策略的資料。通常要查詢的資料應該位於傳統檔案系統中,但是 Arrow Dataset 的設計不僅限於查詢檔案,還可以擴展到服務所有可能的資料來源,例如來自跨程序通訊或其他網路位置等等。

開始使用#

目前支援的檔案格式為

  • Apache Arrow (.arrow)

  • Apache ORC (.orc)

  • Apache Parquet (.parquet)

  • 逗號分隔值 (.csv)

  • 換行符號分隔的 JSON 值 (.json)

下面展示了在 Java 中使用 Dataset 查詢 Parquet 檔案的最簡單範例

// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    List<ArrowRecordBatch> batches = new ArrayList<>();
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            final VectorUnloader unloader = new VectorUnloader(root);
            batches.add(unloader.getRecordBatch());
        }
    }

    // do something with read record batches, for example:
    analyzeArrowData(batches);

    // finished the analysis of the data, close all resources:
    AutoCloseables.close(batches);
} catch (Exception e) {
    e.printStackTrace();
}

注意

ArrowRecordBatch 是一種低階複合 Arrow 資料交換格式,不提供直接從中讀取類型化資料的 API。建議使用工具程式 VectorLoader 將其載入到感知 schema 的容器 VectorSchemaRoot 中,使用者可以藉此在 Java 中方便地存取解碼後的資料。

ScanOptions batchSize 引數只有在設定為小於 recordbatch 中的列數時才會生效。

另請參閱

使用 VectorSchemaRoot 載入 record batch。

Schema#

要查詢的資料的 Schema 可以透過方法 DatasetFactory#inspect() 在實際讀取之前進行檢查。例如

// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

// inspect schema
Schema schema = factory.inspect();

對於某些與使用者定義的 schema 相容的資料格式,使用者可以使用方法 DatasetFactory#inspect(Schema schema) 來建立 dataset

Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);

否則,當呼叫非參數方法 DatasetFactory#inspect() 時,schema 將會從資料來源自動推斷。與 DatasetFactory#inspect() 的結果相同。

此外,如果在掃描期間指定了投影器 (請參閱下一節 投影 (欄位的子集)),則可以使用方法 Scanner::schema() 取得輸出資料的實際 schema

Scanner scanner = dataset.newScan(
    new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();

投影 (欄位的子集)#

使用者可以在 ScanOptions 中指定投影。例如

String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));

如果不需要投影,則在 ScanOptions 中省略可選的投影引數

ScanOptions options = new ScanOptions(32768, Optional.empty());

或使用快捷建構子

ScanOptions options = new ScanOptions(32768);

然後,在掃描期間將會發出所有欄位。

投影 (產生新欄位) 和篩選器#

使用者可以使用 Substrait 在 ScanOptions 中指定投影 (新欄位) 或篩選器。例如

ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
             .columns(Optional.empty())
             .substraitExpressionFilter(substraitExpressionFilter)
             .substraitExpressionProjection(getSubstraitExpressionProjection())
             .build();

另請參閱

使用擴充表示式執行投影和篩選器

使用 Substrait 的投影和篩選器。

從 HDFS 讀取資料#

FileSystemDataset 支援從非本機檔案系統讀取資料。HDFS 支援包含在官方 Apache Arrow Java 套件發行版中,可以直接使用,無需重新建置原始碼。

若要使用 Dataset API 存取 HDFS 資料,請將一般 HDFS URI 傳遞給 FilesSystemDatasetFactory

String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

原生記憶體管理#

為了獲得更好的效能並降低程式碼複雜度,Java FileSystemDataset 在內部依賴 C++ arrow::dataset::FileSystemDataset (透過 JNI)。因此,從 FileSystemDataset 讀取的所有 Arrow 資料都應該在 JVM 堆積之外配置。為了管理這部分記憶體,向使用者提供了工具程式類別 NativeMemoryPool

作為一個基本範例,透過使用可監聽的 NativeMemoryPool,使用者可以傳遞一個監聽器,以掛鉤 C++ 緩衝區的配置/釋放

AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
  @Override
  public void reserve(long size) {
    reserved.getAndAdd(size);
  }

  @Override
  public void unreserve(long size) {
    reserved.getAndAdd(-size);
  }
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
    pool, FileFormat.PARQUET, uri);

此外,為從 dataset 讀取的資料保留相同數量的 JVM 直接記憶體是一種非常常見的情況。為此,提供了內建工具程式類別 DirectReservationListener

NativeMemoryPool pool = NativeMemoryPool.createListenable(
    DirectReservationListener.instance());

這樣一來,一旦 Arrow 緩衝區的已配置位元組計數達到 JVM 直接記憶體的限制,掃描期間就會拋出 OutOfMemoryError: Direct buffer memory

注意

預設執行個體 NativeMemoryPool.getDefaultMemoryPool() 在緩衝區配置/釋放時不執行任何操作。在 POC 或測試的情況下可以使用它,但對於複雜環境中的生產用途,建議使用可監聽的記憶體池來管理記憶體。

注意

傳遞給 FileSystemDatasetFactory 建構子的 BufferAllocator 執行個體也知道產生的 dataset 執行個體的整體記憶體用量。一旦建立 Java 緩衝區,傳遞的分配器將成為它們的父分配器。

使用注意事項#

原生物件資源管理#

作為依賴 JNI 的另一個結果,所有與 FileSystemDataset 相關的元件都應該手動關閉,或者在使用後使用 try-with-resources 來釋放對應的原生物件。例如

String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory factory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = factory.finish();
    Scanner scanner = dataset.newScan(options)
) {

    // do something

} catch (Exception e) {
    e.printStackTrace();
}

如果使用者忘記關閉它們,則可能會導致原生物件洩漏。

BatchSize#

ScanOptionsbatchSize 引數是單個批次大小的限制。

例如,讓我們嘗試讀取一個使用 gzip 壓縮和 3 個列組的 Parquet 檔案

# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);

$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age:         OPTIONAL INT64 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838

在這裡,我們將 ScanOptions 中的 batchSize 設定為 32768。因為它大於下一個批次中的列數 (由於第一個列組只有 4 列,因此為 4 列),所以程式只獲得 4 列。掃描器不會合併較小的批次以達到限制,但它會分割較大的批次以保持在限制之下。因此,如果列組的列數超過 32768 列,它將被分割成 32768 列或更少的區塊。