Substrait#

arrow-dataset 模組可以透過 Acero 查詢引擎執行 Substrait 計劃。

使用 Substrait 計劃執行查詢#

計劃可以透過 URI 引用檔案中的資料,或必須與計劃一起提供的「命名表格」。

這是一個 Java 程式的範例,該程式使用 Java Substrait 查詢 Parquet 檔案(此範例使用 Substrait Java 專案將 SQL 查詢編譯為 Substrait 計劃)

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class ClientSubstrait {
    public static void main(String[] args) {
        String uri = "file:///data/tpch_parquet/nation.parquet";
        ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
        try (
            BufferAllocator allocator = new RootAllocator();
            DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
                    FileFormat.PARQUET, uri);
            Dataset dataset = datasetFactory.finish();
            Scanner scanner = dataset.newScan(options);
            ArrowReader reader = scanner.scanBatches()
        ) {
            // map table to reader
            Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
            mapTableToArrowReader.put("NATION", reader);
            // get binary plan
            Plan plan = getPlan();
            ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
            substraitPlan.put(plan.toByteArray());
            // run query
            try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
                    substraitPlan,
                    mapTableToArrowReader
            )) {
                while (arrowReader.loadNextBatch()) {
                    System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static Plan getPlan() throws SqlParseException {
        String sql = "SELECT * from nation";
        String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
                "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
        SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
        Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
        return plan;
    }
}
// Results example:
FieldPath(0)    FieldPath(1)    FieldPath(2)    FieldPath(3)
0               ALGERIA         0               haggle. carefully final deposits detect slyly agai
1               ARGENTINA       1               al foxes promise slyly according to the regular accounts. bold requests alon

使用擴展表達式執行投影和篩選#

Dataset 也支援使用 Substrait 的 擴展表達式進行投影和篩選。這需要 substrait-java 函式庫。

這個 Java 程式

  • 載入包含來自 TPC-H 基準測試的「nation」表格的 Parquet 檔案。

  • 套用篩選器
    • N_NATIONKEY > 18

  • 投影兩個新欄位
    • N_REGIONKEY + 10

    • N_NAME || ' - ' || N_COMMENT

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlExpressionToSubstrait;
import io.substrait.proto.ExtendedExpression;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;

public class ClientSubstraitExtendedExpressionsCookbook {

  public static void main(String[] args) throws SqlParseException {
    projectAndFilterDataset();
  }

  private static void projectAndFilterDataset() throws SqlParseException {
    String uri = "file:///Users/data/tpch_parquet/nation.parquet";
    ScanOptions options =
        new ScanOptions.Builder(/*batchSize*/ 32768)
            .columns(Optional.empty())
            .substraitFilter(getByteBuffer(new String[]{"N_NATIONKEY > 18"}))
            .substraitProjection(getByteBuffer(new String[]{"N_REGIONKEY + 10",
                "N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"}))
            .build();
    try (BufferAllocator allocator = new RootAllocator();
         DatasetFactory datasetFactory =
             new FileSystemDatasetFactory(
                 allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
         Dataset dataset = datasetFactory.finish();
         Scanner scanner = dataset.newScan(options);
         ArrowReader reader = scanner.scanBatches()) {
      while (reader.loadNextBatch()) {
        System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
      }
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  private static ByteBuffer getByteBuffer(String[] sqlExpression) throws SqlParseException {
    String schema =
        "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, "
            + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
    SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait();
    ExtendedExpression expression =
        expressionToSubstrait.convert(sqlExpression, ImmutableList.of(schema));
    byte[] expressionToByte =
        Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray()));
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length);
    byteBuffer.put(expressionToByte);
    return byteBuffer;
  }
}
column-1  column-2
13        ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
14        SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely
12        VIETNAM - hely enticingly express accounts. even, final
13        RUSSIA -  requests against the platelets use never according to the quickly regular pint
13        UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull
11        UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be