Arrow Flight¶
受益於 Arrow Flight 協定的食譜
運用 Arrow Flight 的簡單 Parquet 儲存服務¶
假設您想要實作一項服務,它可以使用 Arrow Flight 協定儲存、傳送和接收 Parquet 檔案,pyarrow
在 pyarrow.flight
以及特別是透過 pyarrow.flight.FlightServerBase
類別提供一個實作架構。
import pathlib
import pyarrow as pa
import pyarrow.flight
import pyarrow.parquet
class FlightServer(pa.flight.FlightServerBase):
def __init__(self, location="grpc://0.0.0.0:8815",
repo=pathlib.Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
self._repo = repo
def _make_flight_info(self, dataset):
dataset_path = self._repo / dataset
schema = pa.parquet.read_schema(dataset_path)
metadata = pa.parquet.read_metadata(dataset_path)
descriptor = pa.flight.FlightDescriptor.for_path(
dataset.encode('utf-8')
)
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pyarrow.flight.FlightInfo(schema,
descriptor,
endpoints,
metadata.num_rows,
metadata.serialized_size)
def list_flights(self, context, criteria):
for dataset in self._repo.iterdir():
yield self._make_flight_info(dataset.name)
def get_flight_info(self, context, descriptor):
return self._make_flight_info(descriptor.path[0].decode('utf-8'))
def do_put(self, context, descriptor, reader, writer):
dataset = descriptor.path[0].decode('utf-8')
dataset_path = self._repo / dataset
data_table = reader.read_all()
pa.parquet.write_table(data_table, dataset_path)
def do_get(self, context, ticket):
dataset = ticket.ticket.decode('utf-8')
dataset_path = self._repo / dataset
return pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path))
def list_actions(self, context):
return [
("drop_dataset", "Delete a dataset."),
]
def do_action(self, context, action):
if action.type == "drop_dataset":
self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
else:
raise NotImplementedError
def do_drop_dataset(self, dataset):
dataset_path = self._repo / dataset
dataset_path.unlink()
範例伺服器顯示 pyarrow.flight.FlightServerBase.list_flights()
,它是負責傳回資料串流清單的方法,可供擷取。
類似的方式,pyarrow.flight.FlightServerBase.get_flight_info()
提供單一特定資料串流的資訊。
接著,我們公開 pyarrow.flight.FlightServerBase.do_get()
,負責實際擷取公開的資料串流,並將它們傳送給客戶端。
如果我們沒有公開建立串流的方式,那麼允許列出和下載資料串流將會毫無用處,這正是 pyarrow.flight.FlightServerBase.do_put()
的責任所在,它負責接收來自客戶端的資料,並處理資料(在本案例中,將其儲存成 parquet 檔案)
這些是最常見的 Arrow Flight 要求,如果我們需要新增更多功能,我們可以使用自訂動作來執行。
在前面的範例中,有一個 drop_dataset
自訂動作。所有自訂動作都是透過 pyarrow.flight.FlightServerBase.do_action()
方法執行的,因此伺服器子類別負責適當地調度它們。在本範例中,當 action.type 為我們預期時,我們會呼叫 do_drop_dataset 方法。
然後,可以使用 pyarrow.flight.FlightServerBase.serve()
啟動我們的伺服器
if __name__ == '__main__':
server = FlightServer()
server._repo.mkdir(exist_ok=True)
server.serve()
啟動伺服器後,我們可以建立一個用戶端來執行請求
import pyarrow as pa
import pyarrow.flight
client = pa.flight.connect("grpc://0.0.0.0:8815")
我們可以建立一個新表,並上傳它,以便儲存在新的 Parquet 檔案中
# Upload a new dataset
data_table = pa.table(
[["Mario", "Luigi", "Peach"]],
names=["Character"]
)
upload_descriptor = pa.flight.FlightDescriptor.for_path("uploaded.parquet")
writer, _ = client.do_put(upload_descriptor, data_table.schema)
writer.write_table(data_table)
writer.close()
上傳後,我們應該可以擷取我們新上傳表的資料
# Retrieve metadata of newly uploaded dataset
flight = client.get_flight_info(upload_descriptor)
descriptor = flight.descriptor
print("Path:", descriptor.path[0].decode('utf-8'), "Rows:", flight.total_records, "Size:", flight.total_bytes)
print("=== Schema ===")
print(flight.schema)
print("==============")
Path: uploaded.parquet Rows: 3 Size: ...
=== Schema ===
Character: string
==============
我們可以擷取資料集的內容
# Read content of the dataset
reader = client.do_get(flight.endpoints[0].ticket)
read_table = reader.read_all()
print(read_table.to_pandas().head())
Character
0 Mario
1 Luigi
2 Peach
完成後,我們可以呼叫自訂動作,來刪除我們新上傳的資料集
# Drop the newly uploaded dataset
client.do_action(pa.flight.Action("drop_dataset", "uploaded.parquet".encode('utf-8')))
若要確認我們的資料集已刪除,我們可以列出伺服器目前儲存的所有 Parquet 檔案
# List existing datasets.
for flight in client.list_flights():
descriptor = flight.descriptor
print("Path:", descriptor.path[0].decode('utf-8'), "Rows:", flight.total_records, "Size:", flight.total_bytes)
print("=== Schema ===")
print(flight.schema)
print("==============")
print("")
串流 Parquet 儲存服務¶
我們可以改善 Parquet 儲存服務,並透過串流資料來避免在記憶體中保留整個資料集。與 PyArrow 中的其他人一樣,Flight 讀取器和寫入器是可以重複執行的,因此讓我們更新之前的伺服器,來利用這項功能
import pathlib
import pyarrow as pa
import pyarrow.flight
import pyarrow.parquet
class FlightServer(pa.flight.FlightServerBase):
def __init__(self, location="grpc://0.0.0.0:8815",
repo=pathlib.Path("./datasets"), **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
self._repo = repo
def _make_flight_info(self, dataset):
dataset_path = self._repo / dataset
schema = pa.parquet.read_schema(dataset_path)
metadata = pa.parquet.read_metadata(dataset_path)
descriptor = pa.flight.FlightDescriptor.for_path(
dataset.encode('utf-8')
)
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pyarrow.flight.FlightInfo(schema,
descriptor,
endpoints,
metadata.num_rows,
metadata.serialized_size)
def list_flights(self, context, criteria):
for dataset in self._repo.iterdir():
yield self._make_flight_info(dataset.name)
def get_flight_info(self, context, descriptor):
return self._make_flight_info(descriptor.path[0].decode('utf-8'))
def do_put(self, context, descriptor, reader, writer):
dataset = descriptor.path[0].decode('utf-8')
dataset_path = self._repo / dataset
# Read the uploaded data and write to Parquet incrementally
with dataset_path.open("wb") as sink:
with pa.parquet.ParquetWriter(sink, reader.schema) as writer:
for chunk in reader:
writer.write_table(pa.Table.from_batches([chunk.data]))
def do_get(self, context, ticket):
dataset = ticket.ticket.decode('utf-8')
# Stream data from a file
dataset_path = self._repo / dataset
reader = pa.parquet.ParquetFile(dataset_path)
return pa.flight.GeneratorStream(
reader.schema_arrow, reader.iter_batches())
def list_actions(self, context):
return [
("drop_dataset", "Delete a dataset."),
]
def do_action(self, context, action):
if action.type == "drop_dataset":
self.do_drop_dataset(action.body.to_pybytes().decode('utf-8'))
else:
raise NotImplementedError
def do_drop_dataset(self, dataset):
dataset_path = self._repo / dataset
dataset_path.unlink()
首先,我們修改了 pyarrow.flight.FlightServerBase.do_put()
。我們沒有在寫入之前將所有上傳資料讀取至 pyarrow.Table
中,而是對每個批次進行重複處理,並將其新增至 Parquet 檔案中。
然後,我們修改了 pyarrow.flight.FlightServerBase.do_get()
,將資料串流至用戶端。這是使用 pyarrow.flight.GeneratorStream
,它會採用架構和任何可重複處理或重複處理器。然後,Flight 會重複處理並將每個記錄批次傳送至用戶端,讓我們能夠處理即使無法放入記憶體中的大型 Parquet 檔案。
雖然 GeneratorStream 具備串流數據的優點,這表示 Flight 必須在每次的記錄批次中呼叫 Python 才能進行傳送。相較之下,RecordBatchStream 要求所有資料都必須事先儲存在記憶體中,但在建立後,所有資料傳輸都純粹在 C++ 中處理,而無需呼叫 Python 程式碼。
我們來試試看伺服器。和之前一樣,我們將啟動伺服器
if __name__ == '__main__':
server = FlightServer()
server._repo.mkdir(exist_ok=True)
server.serve()
我們會建立一個客戶端,並且這一次,我們會將批次寫入ライター,就像我們有一串資料,而不是一個儲存在記憶體中的表格。
import pyarrow as pa
import pyarrow.flight
client = pa.flight.connect("grpc://0.0.0.0:8815")
# Upload a new dataset
NUM_BATCHES = 1024
ROWS_PER_BATCH = 4096
upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet")
batch = pa.record_batch([
pa.array(range(ROWS_PER_BATCH)),
], names=["ints"])
writer, _ = client.do_put(upload_descriptor, batch.schema)
with writer:
for _ in range(NUM_BATCHES):
writer.write_batch(batch)
我們可以在收到後,和之前一樣,將其讀回來。再再一次,我們會從串流中讀取每一批次,而不是將所有批次讀入至一個表格中
# Read content of the dataset
flight = client.get_flight_info(upload_descriptor)
reader = client.do_get(flight.endpoints[0].ticket)
total_rows = 0
for chunk in reader:
total_rows += chunk.data.num_rows
print("Got", total_rows, "rows total, expected", NUM_BATCHES * ROWS_PER_BATCH)
Got 4194304 rows total, expected 4194304
使用使用者名稱/密碼進行身分驗證¶
服務通常需要一個方法來驗證使用者身分並辨識他們的身分。Flight 提供了 實作驗證的幾種方法;最簡單的方法是使用使用者密碼機制。在啟動時,客戶端會使用使用者名稱和密碼向伺服器驗證其自身的身分。伺服器會傳回一個授權令牌,用於納入後續的請求中。
警告
驗證只應該在一個安全的加密管道中使用,也就是說,應該啟用 TLS 。
注意
雖然這個機制被描述成「(HTTP) 基本驗證」,但它並未實際實作 HTTP 驗證 (RFC 7325) 本身。
儘管 Flight 提供了一些介面來實作這樣的機制,但伺服器必須提供實際的實作,如下所示。這裡的實作並不安全,而且僅作為一個最小範例提供。
import base64
import secrets
import pyarrow as pa
import pyarrow.flight
class EchoServer(pa.flight.FlightServerBase):
"""A simple server that just echoes any requests from DoAction."""
def do_action(self, context, action):
return [action.type.encode("utf-8"), action.body]
class BasicAuthServerMiddlewareFactory(pa.flight.ServerMiddlewareFactory):
"""
Middleware that implements username-password authentication.
Parameters
----------
creds: Dict[str, str]
A dictionary of username-password values to accept.
"""
def __init__(self, creds):
self.creds = creds
# Map generated bearer tokens to users
self.tokens = {}
def start_call(self, info, headers):
"""Validate credentials at the start of every call."""
# Search for the authentication header (case-insensitive)
auth_header = None
for header in headers:
if header.lower() == "authorization":
auth_header = headers[header][0]
break
if not auth_header:
raise pa.flight.FlightUnauthenticatedError("No credentials supplied")
# The header has the structure "AuthType TokenValue", e.g.
# "Basic <encoded username+password>" or "Bearer <random token>".
auth_type, _, value = auth_header.partition(" ")
if auth_type == "Basic":
# Initial "login". The user provided a username/password
# combination encoded in the same way as HTTP Basic Auth.
decoded = base64.b64decode(value).decode("utf-8")
username, _, password = decoded.partition(':')
if not password or password != self.creds.get(username):
raise pa.flight.FlightUnauthenticatedError("Unknown user or invalid password")
# Generate a secret, random bearer token for future calls.
token = secrets.token_urlsafe(32)
self.tokens[token] = username
return BasicAuthServerMiddleware(token)
elif auth_type == "Bearer":
# An actual call. Validate the bearer token.
username = self.tokens.get(value)
if username is None:
raise pa.flight.FlightUnauthenticatedError("Invalid token")
return BasicAuthServerMiddleware(value)
raise pa.flight.FlightUnauthenticatedError("No credentials supplied")
class BasicAuthServerMiddleware(pa.flight.ServerMiddleware):
"""Middleware that implements username-password authentication."""
def __init__(self, token):
self.token = token
def sending_headers(self):
"""Return the authentication token to the client."""
return {"authorization": f"Bearer {self.token}"}
class NoOpAuthHandler(pa.flight.ServerAuthHandler):
"""
A handler that implements username-password authentication.
This is required only so that the server will respond to the internal
Handshake RPC call, which the client calls when authenticate_basic_token
is called. Otherwise, it should be a no-op as the actual authentication is
implemented in middleware.
"""
def authenticate(self, outgoing, incoming):
pass
def is_valid(self, token):
return ""
然後我們可以啟動伺服器
if __name__ == '__main__':
server = EchoServer(
auth_handler=NoOpAuthHandler(),
location="grpc://0.0.0.0:8816",
middleware={
"basic": BasicAuthServerMiddlewareFactory({
"test": "password",
})
},
)
server.serve()
接著,我們可以建立一個客戶端並登入
import pyarrow as pa
import pyarrow.flight
client = pa.flight.connect("grpc://0.0.0.0:8816")
token_pair = client.authenticate_basic_token(b'test', b'password')
print(token_pair)
(b'authorization', b'Bearer ...')
針對後續的呼叫,我們將驗證令牌納入呼叫中
action = pa.flight.Action("echo", b"Hello, world!")
options = pa.flight.FlightCallOptions(headers=[token_pair])
for response in client.do_action(action=action, options=options):
print(response.body.to_pybytes())
b'echo'
b'Hello, world!'
如果我們未能這樣做,我們會得到一條驗證錯誤
try:
list(client.do_action(action=action))
except pa.flight.FlightUnauthenticatedError as e:
print("Unauthenticated:", e)
else:
raise RuntimeError("Expected call to fail")
Unauthenticated: No credentials supplied. Detail: Unauthenticated
或者,如果我們在登入時使用了錯誤的憑據,我們也會得到一條錯誤
try:
client.authenticate_basic_token(b'invalid', b'password')
except pa.flight.FlightUnauthenticatedError as e:
print("Unauthenticated:", e)
else:
raise RuntimeError("Expected call to fail")
Unauthenticated: Unknown user or invalid password. Detail: Unauthenticated
使用 TLS 保護連線¶
以下續接我們先前的場景,其中提供給伺服器的網路流量是透過使用者名稱和密碼進行管理,HTTPS (更具體地說,是 TLS) 通訊允許透過加密客戶端和伺服器之間的訊息來增加一層額外的安全性。這是透過憑證來達成。在進行開發期間,最容易的做法是使用自簽憑證。在啟動時,伺服器會載入公鑰和私鑰,而客戶端則使用 TLS 根憑證來驗證伺服器。
注意
在生產環境中,建議使用由憑證授權單位簽署的憑證。
步驟 1 - 產生自簽憑證
使用 dotnet 在 Windows 中或使用 openssl 在 Linux 或 MacOS 中產生自簽證書。另外,可以使用 Arrow 測試資料存放庫 中的自簽證書。根據所產生的檔案,您可能需要將其轉換成 Arrow 伺服器所需的 .crt 和 .key 檔案。其中一種達成此目的的方法是 openssl,請參閱這篇 IBM 文章 以取得更多資訊。
第 2 步 - 執行啟用 TLS 的伺服器
下方程式碼是一個用於接收 TLS 資料的 Arrow 伺服器最小作業範例。
import argparse
import pyarrow
import pyarrow.flight
class FlightServer(pyarrow.flight.FlightServerBase):
def __init__(self, host="localhost", location=None,
tls_certificates=None, verify_client=False,
root_certificates=None, auth_handler=None):
super(FlightServer, self).__init__(
location, auth_handler, tls_certificates, verify_client,
root_certificates)
self.flights = {}
@classmethod
def descriptor_to_key(self, descriptor):
return (descriptor.descriptor_type.value, descriptor.command,
tuple(descriptor.path or tuple()))
def do_put(self, context, descriptor, reader, writer):
key = FlightServer.descriptor_to_key(descriptor)
print(key)
self.flights[key] = reader.read_all()
print(self.flights[key])
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--tls", nargs=2, default=None, metavar=('CERTFILE', 'KEYFILE'))
args = parser.parse_args()
tls_certificates = []
scheme = "grpc+tls"
host = "localhost"
port = "5005"
with open(args.tls[0], "rb") as cert_file:
tls_cert_chain = cert_file.read()
with open(args.tls[1], "rb") as key_file:
tls_private_key = key_file.read()
tls_certificates.append((tls_cert_chain, tls_private_key))
location = "{}://{}:{}".format(scheme, host, port)
server = FlightServer(host, location,
tls_certificates=tls_certificates)
print("Serving on", location)
server.serve()
if __name__ == '__main__':
main()
執行伺服器,您應該會看到 Serving on grpc+tls://127.0.0.1:5005
。
第 3 步 - 安全地連線到伺服器 假設我們想要連線到客戶端並傳送一些資料給它。下列程式碼會使用 TLS 加密安全地將資訊傳送至伺服器。
import argparse
import pyarrow
import pyarrow.flight
import pandas as pd
# Assumes incoming data object is a Pandas Dataframe
def push_to_server(name, data, client):
object_to_send = pyarrow.Table.from_pandas(data)
writer, _ = client.do_put(pyarrow.flight.FlightDescriptor.for_path(name), object_to_send.schema)
writer.write_table(object_to_send)
writer.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--tls-roots', default=None,
help='Path to trusted TLS certificate(s)')
parser.add_argument('--host', default="localhost",
help='Host endpoint')
parser.add_argument('--port', default=5005,
help='Host port')
args = parser.parse_args()
kwargs = {}
with open(args.tls_roots, "rb") as root_certs:
kwargs["tls_root_certs"] = root_certs.read()
client = pyarrow.flight.FlightClient(f"grpc+tls://{args.host}:{args.port}", **kwargs)
data = {'Animal': ['Dog', 'Cat', 'Mouse'], 'Size': ['Big', 'Small', 'Tiny']}
df = pd.DataFrame(data, columns=['Animal', 'Size'])
push_to_server("AnimalData", df, client)
if __name__ == '__main__':
try:
main()
except Exception as e:
print(e)
執行客戶端腳本,您應該會看到伺服器列印出剛剛接收到的資料資訊。
傳播 OpenTelemetry 追蹤¶
透過 OpenTelemetry 進行分散式追蹤,可以在 Flight 服務中收集呼叫層級的效能度量。為了關聯 Flight 客戶端和伺服器中的跨距,必須在兩者之間傳遞追蹤內容。這可以透過 pyarrow.flight.FlightCallOptions
中的標頭手動傳遞,或者可以使用中間軟體自動傳播。
這個範例說明如何透過中間軟體來完成追蹤傳播。客戶端中間軟體需要將追蹤內容注入呼叫標頭中。伺服器中間軟體需要從標頭中提取追蹤內容,並將內容傳遞到新的跨距中。此外,客戶端中間軟體也可以建立新的跨距來計時客戶端端的呼叫。
第 1 步:定義客戶端中間軟體
import pyarrow.flight as flight
from opentelemetry import trace
from opentelemetry.propagate import inject
from opentelemetry.trace.status import StatusCode
class ClientTracingMiddlewareFactory(flight.ClientMiddlewareFactory):
def __init__(self):
self._tracer = trace.get_tracer(__name__)
def start_call(self, info):
span = self._tracer.start_span(f"client.{info.method}")
return ClientTracingMiddleware(span)
class ClientTracingMiddleware(flight.ClientMiddleware):
def __init__(self, span):
self._span = span
def sending_headers(self):
ctx = trace.set_span_in_context(self._span)
carrier = {}
inject(carrier=carrier, context=ctx)
return carrier
def call_completed(self, exception):
if exception:
self._span.record_exception(exception)
self._span.set_status(StatusCode.ERROR)
print(exception)
else:
self._span.set_status(StatusCode.OK)
self._span.end()
第 2 步:定義伺服器中間軟體
import pyarrow.flight as flight
from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.trace.status import StatusCode
class ServerTracingMiddlewareFactory(flight.ServerMiddlewareFactory):
def __init__(self):
self._tracer = trace.get_tracer(__name__)
def start_call(self, info, headers):
context = extract(headers)
span = self._tracer.start_span(f"server.{info.method}", context=context)
return ServerTracingMiddleware(span)
class ServerTracingMiddleware(flight.ServerMiddleware):
def __init__(self, span):
self._span = span
def call_completed(self, exception):
if exception:
self._span.record_exception(exception)
self._span.set_status(StatusCode.ERROR)
print(exception)
else:
self._span.set_status(StatusCode.OK)
self._span.end()
第 3 步:設定追蹤匯出器、處理器和提供者
伺服器與用戶端都需要使用 OpenTelemetry SDK 設定,才能記錄跨距並將其匯出至某處。 為了範例,我們會將跨距收集到 Python 清單中,但通常會將其設定為匯出至像 Jaeger 等服務。 更多匯出工具的範例,請參閱 OpenTelemetry Exporters。
在此過程中,你需要定義跨距執行的資源。 這項定義至少包含服務名稱,但可以包含其他資訊,例如主機名稱、程序識別碼、服務版本與作業系統。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
class TestSpanExporter(SpanExporter):
def __init__(self):
self.spans = []
def export(self, spans):
self.spans.extend(spans)
return SpanExportResult.SUCCESS
def configure_tracing():
# Service name is required for most backends,
# and although it's not necessary for console export,
# it's good to set service name anyways.
resource = Resource(attributes={
SERVICE_NAME: "my-service"
})
exporter = TestSpanExporter()
provider = TracerProvider(resource=resource)
processor = SimpleSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return exporter
步驟 4:將中間層加入伺服器
現在,我們可以在早先的 EchoServer 中使用中間層。
if __name__ == '__main__':
exporter = configure_tracing()
server = EchoServer(
location="grpc://0.0.0.0:8816",
middleware={
"tracing": ServerTracingMiddlewareFactory()
},
)
server.serve()
步驟 5:將中間層加入用戶端
client = pa.flight.connect(
"grpc://0.0.0.0:8816",
middleware=[ClientTracingMiddlewareFactory()],
)
步驟 6:在活動跨距中使用用戶端
當我們在 OpenTelemetry 跨距中使用用戶端進行呼叫時,我們的用戶端中間層會為用戶端 Flight 呼叫建立子跨距,並將跨距內容傳送至伺服器。 我們的伺服器中間層會擷取該追蹤內容,並建立另一個子跨距。
from opentelemetry import trace
# Client would normally also need to configure tracing, but for this example
# the client and server are running in the same Python process.
# exporter = configure_tracing()
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("hello_world") as span:
action = pa.flight.Action("echo", b"Hello, world!")
# Call list() on do_action to drain all results.
list(client.do_action(action=action))
print(f"There are {len(exporter.spans)} spans.")
print(f"The span names are:\n {list(span.name for span in exporter.spans)}.")
print(f"The span status codes are:\n "
f"{list(span.status.status_code for span in exporter.spans)}.")
There are 3 spans.
The span names are:
['server.FlightMethod.DO_ACTION', 'client.FlightMethod.DO_ACTION', 'hello_world'].
The span status codes are:
[<StatusCode.OK: 1>, <StatusCode.OK: 1>, <StatusCode.UNSET: 0>].
預期我們會有三個跨距:一個在用戶端程式碼中,一個在用戶端中間層中,一個在伺服器中間層中。