wget https://apache.mirrors.nublue.co.uk/kafka/x.x.x/kafka_x.x-x.x.x.tgz
# Extract files
tar -xzf kafka_x.x-x.x.x.tgz
# Move to a convenient directory
9
mv kafka_x.x-x.x.x /usr/local/kafka

啟動 Kafka 服務(wù): 啟動 Kafka 代理服務(wù)和 Zookeeper 服務(wù)。

# Start Zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
# Start Kafka Broker
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties

創(chuàng)建 Kafka 主題: 創(chuàng)建生產(chǎn)者將寫入、消費者將讀取的主題

/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flight-data

方案 2:設(shè)置亞馬遜 MSK

創(chuàng)建 Amazon MSK 群集: 轉(zhuǎn)到 Amazon MSK 控制臺并創(chuàng)建一個新群集。選擇要使用的 Kafka 版本,并指定所需的代理數(shù)量。

設(shè)置網(wǎng)絡(luò): 確保您的 MSK 群集設(shè)置在 VPC 中,并具有適當(dāng)?shù)淖泳W(wǎng)和安全組配置,以允許來自 EC2 實例或 Lambda 功能的流量。

創(chuàng)建 Kafka 主題: 使用 AWS CLI 或 MSK 控制臺創(chuàng)建所需的 Kafka 主題:

aws kafka create-topic --cluster-arn "ClusterArn" --topic-name "flight-data" --partitions 1 --replication-factor 3

安全與監(jiān)控

無論選擇哪種設(shè)置方法,都要確保

一旦完成 Kafka 設(shè)置,您就可以生成和消費與飛行數(shù)據(jù)相關(guān)的消息,從而實現(xiàn)實時分析和決策過程。 Kafka 將充當(dāng)數(shù)據(jù)攝取的中心樞紐,處理高吞吐量并確保數(shù)據(jù)在架構(gòu)的不同組件之間可靠傳輸。

2.將數(shù)據(jù)寫入 AWS RDS 實例

設(shè)置好 Kafka 集群后,下一步就是將數(shù)據(jù)寫入 AWS RDS 實例。為此,您可以使用帶有 JDBC sink 連接器的 Kafka Connect,這樣就可以直接將數(shù)據(jù)從 Kafka 主題流式傳輸?shù)?RDS 表中。

設(shè)置 AWS RDS 實例

啟動 RDS 實例: 從 AWS 管理控制臺啟動一個新的 RDS 實例。選擇你喜歡的 SQL 數(shù)據(jù)庫引擎,如 MySQL、PostgreSQL 或 SQL Server。

配置數(shù)據(jù)庫: 設(shè)置實例類、存儲、VPC、安全組和數(shù)據(jù)庫名稱等參數(shù)。確保允許來自 Kafka Connect 節(jié)點的入站流量使用數(shù)據(jù)庫端口(例如,MySQL 為 3306)。

創(chuàng)建數(shù)據(jù)庫表: 使用數(shù)據(jù)庫客戶端連接到 RDS 實例,創(chuàng)建用于存儲 Kafka 數(shù)據(jù)的表。例如,您可以為航班數(shù)據(jù)創(chuàng)建一個表:

CREATE TABLE flight_data (
id SERIAL PRIMARY KEY,
aircraft_id VARCHAR(255),
timestamp BIGINT,
altitude INT,
speed INT,
heading INT,
...
);
18

配置 Kafka 連接

安裝 Kafka Connect: 如果 Kafka 安裝中尚未包含 Kafka Connect,請安裝 Kafka Connect。在安裝了 Kafka 的 EC2 實例上,可以使用 Confluent Hub 客戶端安裝 Kafka Connect JDBC 連接器:

confluent-hub install confluentinc/kafka-connect-jdbc:latest

配置 JDBC Sink 連接器: 為 JDBC sink 連接器創(chuàng)建 Kafka Connect 配置文件。您需要指定詳細(xì)信息,如 RDS 端點、數(shù)據(jù)庫憑據(jù)、要寫入的表以及自動創(chuàng)建表格等任何附加行為。

name=rds-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=flight-data
connection.url=jdbc:mysql://your-rds-endpoint:3306/your-database
connection.user=your-username
connection.password=your-password
auto.create=true
insert.mode=upsert
pk.mode=record_key
pk.fields=id

啟動 Kafka Connect: 使用 JDBC sink 配置運行 Kafka Connect Worker。

   /usr/local/kafka/bin/connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties /path/to/your-jdbc-sink-connector.properties

此過程將開始把數(shù)據(jù)從 Kafka 中的 flight-data 主題流式傳輸?shù)?RDS 實例中的 flight_data 表。自動創(chuàng)建=true “配置允許 Kafka Connect 根據(jù)主題模式在 RDS 中自動創(chuàng)建表格。

監(jiān)控和優(yōu)化數(shù)據(jù)流

監(jiān)控 Kafka Connect: 密切關(guān)注 Kafka Connect 日志,確保數(shù)據(jù)正確高效地流動。留意可能表明數(shù)據(jù)類型、網(wǎng)絡(luò)連接或權(quán)限問題的錯誤或警告。

優(yōu)化性能: 根據(jù)數(shù)據(jù)量和速度,您可能需要調(diào)整 Kafka Connect 和 RDS 實例的性能。這可能涉及調(diào)整 Kafka Connect 中的任務(wù)數(shù)量、為 RDS 表編制索引或擴展 RDS 實例。

確保數(shù)據(jù)一致性: 實施檢查,確保寫入 RDS 的數(shù)據(jù)與 Kafka 中的數(shù)據(jù)一致。這可能包括比較計數(shù)、校驗和,或使用 Debezium 等工具進(jìn)行變更數(shù)據(jù)捕獲 (CDC)。

按照這些步驟,您可以有效地將 Apache Kafka 中的實時數(shù)據(jù)寫入 AWS RDS 實例,使下游應(yīng)用程序能夠根據(jù)最新的飛行數(shù)據(jù)執(zhí)行分析、生成報告或觸發(fā)事件。

3.使用 AWS Lambda 從 RDS 讀取數(shù)據(jù)

AWS Lambda 可用于從 AWS RDS 實例讀取數(shù)據(jù),并將其提供給各種應(yīng)用程序或端點。Lambda 函數(shù)是無服務(wù)器的,這意味著它們可以根據(jù)需求自動擴展

配置 AWS Lambda 執(zhí)行角色

創(chuàng)建 IAM 角色: 轉(zhuǎn)到 IAM 控制臺,使用 AWSLambdaVPCAccessExecutionRole 策略創(chuàng)建一個新角色。此角色允許 Lambda 在 Amazon CloudWatch Logs 中執(zhí)行和創(chuàng)建日志流。

附加 RDS 訪問策略: 創(chuàng)建并向 IAM 角色附加策略,授予 Lambda 函數(shù)訪問 RDS 數(shù)據(jù)庫的權(quán)限。

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"rds-db:connect"
],
"Resource": [
"arn:aws:rds:region:account-id:db:db-instance-name"
]
}
]
}

創(chuàng)建 Lambda 函數(shù)

定義函數(shù): 在 AWS Lambda 控制臺中,從頭開始創(chuàng)建一個新函數(shù)。選擇符合您首選編程語言的運行時,如 Node.js 或 Python。

設(shè)置 VPC: 配置連接到 VPC 的功能,指定可訪問 RDS 實例的子網(wǎng)和安全組。

執(zhí)行查詢邏輯: 編寫連接到 RDS 實例的功能代碼,并執(zhí)行 SQL 查詢以獲取所需數(shù)據(jù)。

下面是一個使用 pymysql 的 Python 示例:

import json
import pymysql
# Configuration values
endpoint = 'your-rds-instance-endpoint'
username = 'your-username'
password = 'your-password'
database_name = 'your-database-name'
# Connection
connection = pymysql.connect(host=endpoint, user=username, passwd=password, db=database_name)

def lambda_handler(event, context):
with connection.cursor() as cursor:
cursor.execute('SELECT * FROM flight_data;')
result = cursor.fetchall()
return {
'statusCode': 200,
'body': json.dumps(result)
}
40

部署函數(shù): 配置函數(shù)和編寫代碼后,點擊 AWS Lambda 控制臺中的 “部署 “按鈕部署函數(shù)。

計劃定期調(diào)用或按需觸發(fā)

定時輪詢: 如果需要定期輪詢 RDS 以獲取新數(shù)據(jù),可以使用 Amazon EventBridge(前身為 CloudWatch Events)按計劃觸發(fā) Lambda 函數(shù)。

按需調(diào)用: 對于按需訪問,您可以將 API Gateway 設(shè)置為觸發(fā)器,以便在出現(xiàn) HTTP 請求時調(diào)用 Lambda 函數(shù)。

錯誤處理和重試

實施錯誤處理: 確保您的 Lambda 函數(shù)具有 try-catch 塊,以處理任何數(shù)據(jù)庫連接問題或查詢錯誤。

配置死鎖隊列 (DLQ): 設(shè)置 DLQ 以捕獲和分析調(diào)用失敗。

優(yōu)化性能

連接池: 使用 RDS 代理或在 Lambda 函數(shù)中實施連接池,以重復(fù)使用數(shù)據(jù)庫連接,減少每次函數(shù)調(diào)用都要建立新連接的開銷。

內(nèi)存和超時: 根據(jù)查詢的復(fù)雜程度和預(yù)期執(zhí)行時間調(diào)整 Lambda 函數(shù)的內(nèi)存和超時設(shè)置,以優(yōu)化性能和成本。

監(jiān)控和調(diào)試

監(jiān)控日志: 使用 Amazon CloudWatch 監(jiān)控日志,并針對 Lambda 函數(shù)執(zhí)行過程中可能出現(xiàn)的任何錯誤或性能問題設(shè)置警報。

跟蹤和調(diào)試: 利用 AWS X-Ray 跟蹤和調(diào)試 Lambda 函數(shù)調(diào)用 RDS 查詢時發(fā)生的情況。

按照這些步驟,您的 AWS Lambda 函數(shù)就能高效地從 AWS RDS 實例讀取數(shù)據(jù)。 這種設(shè)置可實現(xiàn)數(shù)據(jù)請求的無服務(wù)器處理,為從 RDS 實例向應(yīng)用架構(gòu)的其他部分提供數(shù)據(jù)提供了一個可擴展且經(jīng)濟高效的解決方案。

4.使用 API 網(wǎng)關(guān)向網(wǎng)絡(luò)應(yīng)用程序傳送數(shù)據(jù)

AWS API Gateway 是應(yīng)用程序從后端服務(wù)訪問數(shù)據(jù)、業(yè)務(wù)邏輯或功能的前門。 通過將 API Gateway 與 AWS Lambda 集成(AWS Lambda 反過來又從 AWS RDS 實例讀取數(shù)據(jù)),您可以高效地將實時數(shù)據(jù)饋送到您的 Web 應(yīng)用程序。下面將逐步介紹如何設(shè)置:

在 API Gateway 中創(chuàng)建新的 API

導(dǎo)航至 API Gateway: 轉(zhuǎn)到 AWS 管理控制臺,選擇 API Gateway,然后選擇創(chuàng)建新的 API。

選擇 REST API: 選擇 “REST”,它適用于無服務(wù)器架構(gòu)和網(wǎng)絡(luò)應(yīng)用程序。點擊 “構(gòu)建”。

配置 API: 為應(yīng)用程序接口提供名稱,并設(shè)置端點類型等其他配置。對于大多數(shù)網(wǎng)絡(luò)應(yīng)用程序來說,區(qū)域端點是合適的。

定義新資源和方法

創(chuàng)建資源: 在 API Gateway 控制臺中,在您的 API 下創(chuàng)建一個新資源。該資源代表一個實體(例如,flightData),并將成為 API URL(/flightData)的一部分。

創(chuàng)建 GET 方法: 為資源附加一個 GET 方法。網(wǎng)絡(luò)應(yīng)用程序?qū)⑹褂迷摲椒z索數(shù)據(jù)。

將 GET 方法與 AWS Lambda 集成

與 Lambda 集成: 對于 GET 方法集成類型,請選擇 Lambda 函數(shù)。指定從 RDS 實例讀取數(shù)據(jù)的 Lambda 函數(shù)的區(qū)域和名稱。

部署 API: 將您的 API 部署到新的或現(xiàn)有的階段。部署后,您就可以從互聯(lián)網(wǎng)訪問您的 API。請注意部署時提供的調(diào)用 URL。

啟用 CORS(跨源資源共享)

如果您的網(wǎng)絡(luò)應(yīng)用程序與您的 API 托管在不同的域上,則需要在您的 API 網(wǎng)關(guān)上啟用 CORS:

  1. 選擇資源:在 API Gateway 控制臺中選擇資源(如 “flightData”)。
  2. 啟用 CORS:選擇 “操作 “下拉菜單并單擊 “啟用 CORS”。根據(jù)應(yīng)用程序的要求輸入允許的方法、標(biāo)頭和起源,然后部署更改。

在網(wǎng)絡(luò)應(yīng)用程序中使用應(yīng)用程序接口

使用調(diào)用 URL: 在網(wǎng)絡(luò)應(yīng)用程序中,使用 API Gateway 部署中的 invoke URL 向 /flightData 資源發(fā)出 GET 請求。您可以使用 JavaScript 的 fetch API、Axios 或任何 HTTP 客戶端庫。

 fetch('https://your-api-id.execute-api.region.amazonaws.com/stage/flightData')
.then(response => response.json())
.then(data => console.log(data))
.catch(error => console.error('Error fetching data:', error));

顯示數(shù)據(jù): 接收數(shù)據(jù)后,根據(jù)需要在網(wǎng)絡(luò)應(yīng)用程序的用戶界面中處理和顯示數(shù)據(jù)。

6.監(jiān)控和保護 API

保護和監(jiān)控由 Apache Kafka、AWS RDS、AWS Lambda 和 API Gateway 組成的數(shù)據(jù)管道對于確保數(shù)據(jù)完整性、保密性和系統(tǒng)可靠性至關(guān)重要。下面介紹如何保護和監(jiān)控管道的每個組件:

確保管道安全

  1. 卡夫卡安全:
    1. 加密: 使用 TLS 加密 Kafka 中間商和客戶端之間傳輸?shù)臄?shù)據(jù)。
    2. 身份驗證: 實施 SASL/SCRAM 或相互 TLS (mTLS),以進(jìn)行客戶端-代理驗證。
    3. 授權(quán): 使用 Kafka 的 ACL 控制對主題的訪問,確保只有經(jīng)過授權(quán)的服務(wù)才能生成或消費消息。
  2. AWS RDS 安全性:
    1. 加密: 使用 AWS 密鑰管理服務(wù) (KMS) 啟用靜態(tài)加密,并在傳輸過程中通過 SSL 連接到 RDS 實例執(zhí)行加密。
    2. 網(wǎng)絡(luò)安全: 將 RDS 實例置于 VPC 中的私有子網(wǎng)中,并使用安全組限制對已知 IP 或服務(wù)的訪問。
    3. 訪問管理: 使用 IAM 角色和數(shù)據(jù)庫憑證,在授予數(shù)據(jù)庫訪問權(quán)限時遵循權(quán)限最小原則。
  3. AWS Lambda 安全性:
    1. IAM 角色: 為 Lambda 函數(shù)分配 IAM 角色,并賦予其執(zhí)行任務(wù)所需的最小權(quán)限集。
    2. 環(huán)境變量: 使用 AWS KMS 將數(shù)據(jù)庫憑據(jù)等敏感信息存儲在加密的環(huán)境變量中。
    3. VPC 配置: 如果您的 Lambda 函數(shù)訪問 VPC 中的資源,請將其配置為 VPC,使其與公共互聯(lián)網(wǎng)訪問隔離。
  4. API 網(wǎng)關(guān)安全:
    1. API 密鑰: 使用 API 密鑰是控制 API 訪問的一種簡單方法。
    2. IAM 權(quán)限: 利用 AWS IAM 角色和策略實現(xiàn)更精細(xì)的訪問控制。
    3. Lambda 授權(quán)器: 為 JWT 或 OAuth 令牌驗證實施 Lambda 授權(quán)器,以保護您的 API 端點。
    4. 節(jié)流: 設(shè)置節(jié)流規(guī)則,保護后端服務(wù)免受流量高峰和拒絕服務(wù) (DoS) 攻擊。

監(jiān)測管道

  1. 卡夫卡監(jiān)控:
    1. 使用 LinkedIn 的 Cruise Control、Confluent Control Center 或 Kafka Manager 等開源替代工具進(jìn)行集群管理和監(jiān)控。
    2. 監(jiān)控關(guān)鍵指標(biāo),如消息吞吐量、代理延遲和消費者滯后。
  2. AWS RDS 監(jiān)控:
    1. 利用 Amazon CloudWatch 監(jiān)控 RDS 實例。關(guān)鍵指標(biāo)包括 CPU 利用率、連接數(shù)、讀/寫 IOPS 和存儲使用情況。
    2. 啟用 “增強監(jiān)控”,以更詳細(xì)地查看數(shù)據(jù)庫引擎的性能和活動。
  3. AWS Lambda 監(jiān)控:
    1. 使用 Amazon CloudWatch 監(jiān)控函數(shù)調(diào)用、錯誤和執(zhí)行持續(xù)時間。
    2. 使用 AWS X-Ray 進(jìn)行跟蹤,深入了解函數(shù)的執(zhí)行流程和性能。
  4. API 網(wǎng)關(guān)監(jiān)控:
    1. 利用 CloudWatch 監(jiān)控 API 網(wǎng)關(guān)指標(biāo),如 API 調(diào)用數(shù)量、延遲和 4XX/5XX 錯誤。
    2. 啟用 CloudWatch 日志,以記錄 API 的所有請求和響應(yīng),用于調(diào)試和合規(guī)性目的。

安全和監(jiān)控的最佳做法

保護和監(jiān)控數(shù)據(jù)管道是一個持續(xù)的過程,需要隨時了解最佳實踐和不斷變化的威脅。 通過實施強大的安全措施和監(jiān)控系統(tǒng),您可以保護數(shù)據(jù)并確保數(shù)據(jù)管道的可靠性和性能。

原文鏈接: https://dzone.com/articles/Best-Practices-for-Kafka-AWS-RDS-Lambda-and-API-Gateway-Integration

上一篇:

反應(yīng)過度數(shù)據(jù)暴露:示例和預(yù)防

下一篇:

12 條基于風(fēng)險的 API 安全控制指南
#你可能也喜歡這些API文章!

我們有何不同?

API服務(wù)商零注冊

多API并行試用

數(shù)據(jù)驅(qū)動選型,提升決策效率

查看全部API→
??

熱門場景實測,選對API

#AI文本生成大模型API

對比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力

25個渠道
一鍵對比試用API 限時免費

#AI深度推理大模型API

對比大模型API的邏輯推理準(zhǔn)確性、分析深度、可視化建議合理性

10個渠道
一鍵對比試用API 限時免費