資料集

  • Arrow Java 資料集:Java 程式碼的 Arrow 資料集程式庫實作。透過 JNI 以 Java API 的方式實作 C++ 資料集。

建立資料集

我們可以使用自動推論的架構來建立資料集。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import java.util.stream.StreamSupport;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options)
) {
    System.out.println(StreamSupport.stream(scanner.scan().spliterator(), false).count());
} catch (Exception e) {
    e.printStackTrace();
}
1

讓我們使用預先定義的架構來建立資料集。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import java.util.stream.StreamSupport;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish(datasetFactory.inspect());
    Scanner scanner = dataset.newScan(options)
) {
    System.out.println(StreamSupport.stream(scanner.scan().spliterator(), false).count());
} catch (Exception e) {
    e.printStackTrace();
}
1

取得架構

建立資料集期間

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri)
) {
    Schema schema = datasetFactory.inspect();

    System.out.println(schema);
} catch (Exception e) {
    e.printStackTrace();
}
Schema<id: Int(32, true), name: Utf8>(metadata: {parquet.avro.schema={"type":"record","name":"User","namespace":"org.apache.arrow.dataset","fields":[{"name":"id","type":["int","null"]},{"name":"name","type":["string","null"]}]}, writer.model.name=avro})

從資料集中取得資料集

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.types.pojo.Schema;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options)
) {
    Schema schema = scanner.schema();

    System.out.println(schema);
} catch (Exception e) {
    e.printStackTrace();
}
Schema<id: Int(32, true), name: Utf8>(metadata: {parquet.avro.schema={"type":"record","name":"User","namespace":"org.apache.arrow.dataset","fields":[{"name":"id","type":["int","null"]},{"name":"name","type":["string","null"]}]}, writer.model.name=avro})

查詢 Parquet 檔案

來查詢 Parquet 檔案的資訊。

查詢檔案資料內容

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            System.out.print(root.contentToTSVString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
id    name
1    David
2    Gladis
3    Juan

讓我們嘗試讀取一個使用 gzip 壓縮並且有 3 個列群組的 Parquet 檔案。

$ parquet-tools meta data4_3rg_gzip.parquet

file schema: schema
age:         OPTIONAL INT64 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    int totalBatchSize = 0;
    int count = 1;
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            totalBatchSize += root.getRowCount();
            System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
            System.out.print(root.contentToTSVString());
        }
    }
    System.out.println("Total batch size: " + totalBatchSize);
} catch (Exception e) {
    e.printStackTrace();
}
Number of rows per batch[1]: 4
age    name
10    Jean
10    Lu
10    Kei
10    Sophia
Number of rows per batch[2]: 4
age    name
10    Mara
20    Arit
20    Neil
20    Jason
Number of rows per batch[3]: 3
age    name
20    John
20    Peter
20    Ismael
Total batch size: 11

查詢目錄資料內容

設想我們有這些檔案:data1 有 3 列、data2 有 3 列、data3 有 250 列。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/";
ScanOptions options = new ScanOptions(/*batchSize*/ 100);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    int count = 1;
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            System.out.println("Batch: " + count++ + ", RowCount: " + root.getRowCount());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
Batch: 1, RowCount: 3
Batch: 2, RowCount: 3
Batch: 3, RowCount: 100
Batch: 4, RowCount: 100
Batch: 5, RowCount: 50
Batch: 6, RowCount: 4
Batch: 7, RowCount: 4
Batch: 8, RowCount: 3

查詢附帶投射的資料內容

如果我們只想要投射特定的欄位,那麼我們可以使用會產生所需投射的 ScanOptions。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
String[] projection = new String[] {"name"};
ScanOptions options = new ScanOptions(/*batchSize*/ 32768, Optional.of(projection));
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            System.out.print(root.contentToTSVString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
name
David
Gladis
Juan

查詢 Arrow 檔案

查詢檔案資料內容

讓我們讀取一個 Arrow 檔案,其中有 3 個紀錄批次,每個紀錄批次有 3 列。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

import java.io.IOException;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/arrowfiles/random_access.arrow";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ARROW_IPC, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    int count = 1;
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}
Number of rows per batch[1]: 3
Number of rows per batch[2]: 3
Number of rows per batch[3]: 3

查詢 ORC 檔案

查詢檔案資料內容

讓我們讀取一個使用 zlib 壓縮並且有 385 條線條的 ORC 檔案,每條線條有 5000 列。

$ orc-metadata demo-11-zlib.orc | more

{ "name": "demo-11-zlib.orc",
  "type": "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>",
  "stripe count": 385,
  "compression": "zlib", "compression block": 262144,
  "stripes": [
    { "stripe": 0, "rows": 5000,
      "offset": 3, "length": 1031,
      "index": 266, "data": 636, "footer": 129
    },
...
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/orc/data1-zlib.orc";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ORC, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    int totalBatchSize = 0;
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            totalBatchSize += root.getRowCount();
        }
    }
    System.out.println("Total batch size: " + totalBatchSize);
} catch (Exception e) {
    e.printStackTrace();
}
Total batch size: 1920800

查詢 CSV 檔案

查詢檔案的資料內容

我們來讀取一個 CSV 檔案。

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;

String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/csv/tech_acquisitions.csv";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    int totalBatchSize = 0;
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            totalBatchSize += root.getRowCount();
            System.out.print(root.contentToTSVString());
        }
    }
    System.out.println("Total batch size: " + totalBatchSize);
} catch (Exception e) {
    e.printStackTrace();
}
Acquirer    Acquiree    Amount in billions (USD)    Date of acquisition
NVIDIA    Mellanox    6.9    04/05/2020
AMD    Xilinx    35.0    27/10/2020
Salesforce    Slack    27.7    01/12/2020
Total batch size: 3