Substrait

Arrow 能夠使用 Substrait 與其他語言進行整合。

查詢資料集

Arrow 中的 Substrait 支援服務會結合 資料集substrait-java,並使用 Acero 作為後端,來查詢資料集。

Acero 目前支援

  • 讀取 Arrow、CSV、ORC 及 Parquet 檔案

  • 篩選

  • 投影

  • 聯結

  • 加總

下列是 Java 程式查詢 Parquet 檔案的範例

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNation() throws SqlParseException {
   String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
   String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
           "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
   SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
   Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
   return plan;
}

static void queryDatasetThruSubstraitPlanDefinition() {
   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
   try (
       BufferAllocator allocator = new RootAllocator();
       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
               FileFormat.PARQUET, uri);
       Dataset dataset = datasetFactory.finish();
       Scanner scanner = dataset.newScan(options);
       ArrowReader reader = scanner.scanBatches()
   ) {
       Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
       mapTableToArrowReader.put("NATION", reader);
       // get binary plan
       Plan plan = queryTableNation();
       ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
       substraitPlan.put(plan.toByteArray());
       // run query
       try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
           substraitPlan,
           mapTableToArrowReader
       )) {
           while (arrowReader.loadNextBatch()) {
               System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
           }
       }
   } catch (Exception e) {
       e.printStackTrace();
   }
}

queryDatasetThruSubstraitPlanDefinition();
N_NATIONKEY    N_NAME    N_REGIONKEY    N_COMMENT
17    PERU    1    platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun

您也可以一次查詢多個資料集,並根據某些條件將其聯結起來。舉例來說,我們可以聯結 TPC-H 效能測試中的國家與客戶表格

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNationJoinCustomer() throws SqlParseException {
    String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
        "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
        "GROUP BY n.n_name";
    String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
        "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
    String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
        "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
        "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
        "C_COMMENT VARCHAR(117) )";
    SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
    Plan plan = sqlToSubstrait.execute(sql,
        ImmutableList.of(nation, customer));
    return plan;
}

static void queryTwoDatasetsThruSubstraitPlanDefinition() {
    String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
    String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uriNation);
        Dataset dataset = datasetFactory.finish();
        Scanner scanner = dataset.newScan(options);
        ArrowReader readerNation = scanner.scanBatches();
        DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uriCustomer);
        Dataset datasetCustomer = datasetFactoryCustomer.finish();
        Scanner scannerCustomer = datasetCustomer.newScan(options);
        ArrowReader readerCustomer = scannerCustomer.scanBatches()
    ) {
        // map table to reader
        Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
        mapTableToArrowReader.put("NATION", readerNation);
        mapTableToArrowReader.put("CUSTOMER", readerCustomer);
        // get binary plan
        Plan plan = queryTableNationJoinCustomer();
        ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
            plan.toByteArray().length);
        substraitPlan.put(plan.toByteArray());
        // run query
        try (ArrowReader arrowReader = new AceroSubstraitConsumer(
            allocator).runQuery(
            substraitPlan,
            mapTableToArrowReader
        )) {
            while (arrowReader.loadNextBatch()) {
                System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

queryTwoDatasetsThruSubstraitPlanDefinition();
N_NAME    NUMBER_CUSTOMER
PERU    573