
掌握API建模:基本概念和實踐
{
//該方法用于處理查詢創(chuàng)建事件,它接受一個 QueryCreatedEvent 類型的參數(shù),該參數(shù)包含了查詢創(chuàng)建時的詳細信息。
default void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
}
//該方法用于處理查詢完成事件,它接受一個 QueryCompletedEvent 類型的參數(shù),該參數(shù)包含了查詢完成時的詳細信息。
default void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
}
//該方法用于處理分割完成事件,它接受一個 SplitCompletedEvent 類型的參數(shù),該參數(shù)包含了分割完成時的詳細信息。
default void splitCompleted(SplitCompletedEvent splitCompletedEvent)
{
}
}
下面通過一個簡單的例子來說明如何實現(xiàn)自定義Event Listener。假設我們的目標是創(chuàng)建一個監(jiān)聽器,用于記錄查詢任務的一些狀態(tài)信息,并將這些信息發(fā)送kafka 中。
package com.ds;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Map;
public class QueryEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
return "query-event-listener";
}
@Override
///直接使用Presto傳遞過來的Map<String, String>配置,
public EventListener create(Map<String, String> config) {
if (!config.containsKey("kafka.bootstrap.servers")) {
throw new RuntimeException("event-listener.properties file missing kafka.bootstrap.servers");
}
// 檢查Kafka主題配置
if (!config.containsKey("kafka.topic")) {
throw new RuntimeException("event-listener.properties file missing kafka.topic");
}
return new QueryEventListener(config);
}
}
代碼簡介:
1.QueryEventListenerFactory類實現(xiàn)了EventListenerFactory接口,這個接口是用來創(chuàng)建事件監(jiān)聽器的工廠。
2.getName方法返回一個字符串"query-event-listener",表示這個工廠的名稱。
3.create方法接收一個Map<String, String>類型的配置參數(shù),用于創(chuàng)建事件監(jiān)聽器。在方法內部,首先檢查配置中是否包含"kafka.bootstrap.servers"和"kafka.topic"這兩個關鍵配置,如果缺少其中之一,就會拋出運行時異常。
4.如果配置完整,就會創(chuàng)建一個新的QueryEventListener對象,將配置參數(shù)傳遞給它,并返回該對象作為事件監(jiān)聽器。
5.說明:Presto在啟動過程中會讀取/etc 目錄下以.properties結尾的配置文件,并將配置項以Map<String, String>的形式傳遞給EventListenerFactory的create方法。因此,QueryEventListenerFactory可以從這個Map中讀取所需的配置信息,如果沒有找到必要的配置項,會拋出異常。
Plugin
接口。package com.ds;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Arrays;
public class QueryEventPlugin implements Plugin {
@Override
public Iterable<EventListenerFactory> getEventListenerFactories() {
EventListenerFactory listenerFactory = new QueryEventListenerFactory();
return Arrays.asList(listenerFactory);
}
}
代碼簡介:
package com.ds;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class QueryEventListener implements EventListener {
private KafkaProducer<String, String> producer;
private String KafKaTopic;
private Map<String, String> config;
public QueryEventListener(Map<String, String> config) {
this.config = new HashMap<>();
this.config.putAll(config);
init();
}
private void init() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", config.get("kafka.bootstrap.servers"));
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
this.producer = new KafkaProducer<>(kafkaProps);
this.KafKaTopic = (String) config.get("kafka.topic");
}
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
}
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
String queryId = queryCompletedEvent.getMetadata().getQueryId();
String querySql = queryCompletedEvent.getMetadata().getQuery();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
String queryUser = queryCompletedEvent.getContext().getUser();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();
long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)
.toMillis();
long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();
int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();
double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();
long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();
long outputRows = queryCompletedEvent.getStatistics().getOutputRows();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
long totalRows = queryCompletedEvent.getStatistics().getTotalRows();
long writtenBytes = queryCompletedEvent.getStatistics().getWrittenOutputBytes();
long writtenRows = queryCompletedEvent.getStatistics().getWrittenOutputRows();
// 構造事件數(shù)據(jù)
String eventData = "{" +
"\"queryId\": \"" + queryId + "\"," +
"\"querySql\": \"" + querySql + "\"," +
"\"queryState\": \"" + queryState + "\"," +
"\"queryUser\": \"" + queryUser + "\"," +
"\"createTime\": " + createTime + "," +
"\"endTime\": " + endTime + "," +
"\"startTime\": " + startTime + "," +
"\"analysisTime\": " + analysisTime + "," +
"\"cpuTime\": " + cpuTime + "," +
"\"queuedTime\": " + queuedTime + "," +
"\"wallTime\": " + wallTime + "," +
"\"completedSplits\": " + completedSplits + "," +
"\"cumulativeMemory\": " + cumulativeMemory + "," +
"\"outputBytes\": " + outputBytes + "," +
"\"outputRows\": " + outputRows + "," +
"\"totalBytes\": " + totalBytes + "," +
"\"totalRows\": " + totalRows + "," +
"\"writtenBytes\": " + writtenBytes + "," +
"\"writtenRows\": " + writtenRows +
"}";
// 發(fā)送到 Kafka 主題
ProducerRecord<String, String> Completedrecord = new ProducerRecord<>(KafKaTopic, eventData);
producer.send(Completedrecord);
queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int code = queryFailureInfo.getErrorCode().getCode();
String name = queryFailureInfo.getErrorCode().getName();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();
String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();
String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();
String failuresJson = queryFailureInfo.getFailuresJson();
// 構造 JSON 字符串
String failureData = "{"
+ "\"code\": " + code + ","
+ "\"name\": \"" + name + "\","
+ "\"failureType\": \"" + failureType + "\","
+ "\"failureHost\": \"" + failureHost + "\","
+ "\"failureMessage\": \"" + failureMessage + "\","
+ "\"failureTask\": \"" + failureTask + "\","
+ "\"failuresJson\": \"" + failuresJson + "\""
+ "}";
// 發(fā)送到 Kafka 主題
ProducerRecord<String, String> failurerecord = new ProducerRecord<>(KafKaTopic, failureData);
producer.send(failurerecord);
});
}
@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();
String payload = splitCompletedEvent.getPayload();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();
String taskId = splitCompletedEvent.getTaskId();
long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();
long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();
long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();
long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();
long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();
long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();
////
// 構造拆分事件數(shù)據(jù)
String splitEventData = "{ "
+ "\"createTime\": " + createTime + ", "
+ "\"endTime\": " + endTime + ", "
+ "\"payload\": \"" + payload + "\", "
+ "\"queryId\": \"" + queryId + "\", "
+ "\"stageId\": \"" + stageId + "\", "
+ "\"startTime\": " + startTime + ", "
+ "\"taskId\": \"" + taskId + "\", "
+ "\"completedDataSizeBytes\": " + completedDataSizeBytes + ", "
+ "\"completedPositions\": " + completedPositions + ", "
+ "\"completedReadTime\": " + completedReadTime + ", "
+ "\"cpuTime\": " + cpuTime + ", "
+ "\"queuedTime\": " + queuedTime + ", "
+ "\"wallTime\": " + wallTime
+ " }";
// 發(fā)送到 Kafka 主題
ProducerRecord<String, String> splitrecord = new ProducerRecord<>(KafKaTopic, splitEventData);
producer.send(splitrecord);
}
public void close() {
if (producer != null) {
producer.close();
}
}
}
代碼簡介:
在 QueryEventListener 類中,首先聲明了一個 KafkaProducer 對象和一些其他實例變量。
構造函數(shù)接受一個 config 參數(shù),該參數(shù)被用于初始化監(jiān)聽器的配置信息,并且在初始化過程中會建立 Kafka 生產者連接。
init 方法用于初始化 Kafka 生產者對象,配置 Kafka 連接信息,并設置 Kafka 主題。
queryCreated 方法用于處理查詢創(chuàng)建事件,但在代碼中未給出具體的處理邏輯。
queryCompleted 方法用于處理查詢完成事件,根據(jù)傳入的 queryCompletedEvent 對象,提取了查詢完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數(shù)據(jù),然后發(fā)送到 Kafka 主題中。同時,如果查詢失敗,也會將相應的失敗信息發(fā)送到 Kafka 主題中。
splitCompleted 方法用于處理分割完成事件,根據(jù)傳入的 splitCompletedEvent 對象,提取了分割完成時的一系列詳細信息,并將這些信息構造成 JSON 格式的數(shù)據(jù),然后發(fā)送到 Kafka 主題中。
close 方法用于關閉 Kafka 生產者連接。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ds</groupId>
<artifactId>PrestoHook</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.275</version>
<scope>compile</scope>
</dependency>
<!-- Kafka 相關依賴 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version> <!-- 替換為你所使用的 Kafka 版本 -->
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
打包說明:
在 Presto 中,服務提供者接口(Service Provider Interface,SPI)用于擴展 Presto 并提供額外的功能,例如加載連接器、功能、類型和系統(tǒng)訪問控制等。SPI 通過元數(shù)據(jù)文件加載,這使得 Presto 能夠動態(tài)地加載和識別這些擴展點。
為了使用 SPI 擴展 Presto,我們需要創(chuàng)建一個特定的元數(shù)據(jù)文件并將其放置在 src/main/resources/META-INF/services/ 目錄下。這個元數(shù)據(jù)文件的名稱應該與要擴展的接口的完全限定名相匹配。對于插件接口來說,文件的名稱應該是 com.facebook.presto.spi.Plugin,文件的內容包含 com.ds.QueryEventPlugin。
在這個元數(shù)據(jù)文件中,我們需要列出實現(xiàn)了對應接口的類的完全限定名。在這種情況下,就是列出實現(xiàn)了 com.facebook.presto.spi.Plugin 接口的類的完全限定名。這樣,當 Presto 加載時,它會檢查這個元數(shù)據(jù)文件,并根據(jù)文件中列出的類的名稱來加載對應的插件。
event-listener.name=query-event-listener
kafka.bootstrap.servers=10.82.4.11:9092
kafka.topic=prestojob
注意:這里需要部署到所有的 presto 服務節(jié)點,然后重啟 presto 服務生效。
使用 presto 執(zhí)行一條查詢語句“select 1”,對應的日志輸出到 kafka 的日志內容如下:
{"queryId": "20240604_090914_00005_bb9j6","querySql": "select 1","queryState": "FINISHED","queryUser": "presto","createTime": 1717492154262,"endTime": 1717492154326,"startTime": 1717492154264,"analysisTime": 16,"cpuTime": 6,"queuedTime": 1,"wallTime": 21,"completedSplits": 17,"cumulativeMemory": 0.0,"outputBytes": 5,"outputRows": 1,"totalBytes": 0,"totalRows": 0,"writtenBytes": 0,"writtenRows": 0}
后續(xù)就可以直接消費 kafka 里面的數(shù)據(jù),實現(xiàn)自定義的處理。
自定義Event Listener是Presto生態(tài)系統(tǒng)中一個強大而靈活的功能,它不僅增強了系統(tǒng)的可觀測性,還提供了與外部系統(tǒng)交互的能力。通過本文的介紹和示例,希望能激發(fā)讀者在自己的Presto部署中探索和實現(xiàn)自定義Event Listener,從而更好地滿足業(yè)務需求和優(yōu)化數(shù)據(jù)處理流程。
本文章轉載微信公眾號@滌生大數(shù)據(jù)