Substrait¶
Arrow 能夠使用 Substrait 與其他語言進行整合。
查詢資料集¶
Arrow 中的 Substrait 支援服務會結合 資料集 及 substrait-java,並使用 Acero 作為後端,來查詢資料集。
Acero 目前支援
讀取 Arrow、CSV、ORC 及 Parquet 檔案
篩選
投影
聯結
加總
下列是 Java 程式查詢 Parquet 檔案的範例
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
static Plan queryTableNation() throws SqlParseException {
String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
return plan;
}
static void queryDatasetThruSubstraitPlanDefinition() {
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
Plan plan = queryTableNation();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
queryDatasetThruSubstraitPlanDefinition();
N_NATIONKEY N_NAME N_REGIONKEY N_COMMENT
17 PERU 1 platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
您也可以一次查詢多個資料集,並根據某些條件將其聯結起來。舉例來說,我們可以聯結 TPC-H 效能測試中的國家與客戶表格
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
static Plan queryTableNationJoinCustomer() throws SqlParseException {
String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
"ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
"GROUP BY n.n_name";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
"N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
"C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
"C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
"C_COMMENT VARCHAR(117) )";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql,
ImmutableList.of(nation, customer));
return plan;
}
static void queryTwoDatasetsThruSubstraitPlanDefinition() {
String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uriNation);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader readerNation = scanner.scanBatches();
DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uriCustomer);
Dataset datasetCustomer = datasetFactoryCustomer.finish();
Scanner scannerCustomer = datasetCustomer.newScan(options);
ArrowReader readerCustomer = scannerCustomer.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", readerNation);
mapTableToArrowReader.put("CUSTOMER", readerCustomer);
// get binary plan
Plan plan = queryTableNationJoinCustomer();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(
allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
queryTwoDatasetsThruSubstraitPlanDefinition();
N_NAME NUMBER_CUSTOMER
PERU 573