
Ollama Python 調(diào)用:本地大模型的高效交互方式
24/11/LLM異步API調(diào)用/LLM_API調(diào)用/agenerate/llm_api_example.py
https://github.com/JieShenAI/csdn/blob/main/24/11/LLM%E5%BC%82%E6%AD%A5API%E8%B0%83%E7%94%A8/LLM_API%E8%B0%83%E7%94%A8/agenerate/llm_api_example.py
py
和 jupyter.ipynb
的不同寫法;agenerate
支持異步 ,對(duì)比invoke
不支持異步新建一個(gè).env
文件,在其中存放BASE_URL
和API_KEY
裝包
pip install aiolimiter
以下是一個(gè)簡單的異步編程 Demo,展示如何通過協(xié)程和令牌池并發(fā)處理任務(wù):
import random
import asyncio
from uuid import uuid4
from tqdm import tqdm
from dataclasses import dataclass
from aiolimiter import AsyncLimiter
# 創(chuàng)建限速器,每秒最多發(fā)出 5 個(gè)請(qǐng)求
limiter = AsyncLimiter(10, 1)
@dataclass
class
Token:
uid: str
idx: int
cnt: int = 0
# 將 connect_web 改為異步函數(shù)
async
def
llm_api(data):
t = random.randint(0, 2)
# 使用 asyncio.sleep, 模擬API調(diào)用
await asyncio.sleep(t)
return data * 10
# 保持 call_api 異步
async
def
call_api(token, data, rate_limit_seconds=0.5):
token.cnt += 1
async
with limiter:
await asyncio.sleep(rate_limit_seconds)
return
await llm_api(data)
workders = 1
tokens = [Token(uid=str(uuid4()), idx=i) for i in range(workders)]
async
def
_run_task_with_progress(task, pbar):
"""包裝任務(wù)以更新進(jìn)度條"""
result = await task
pbar.update(1)
return result
# 主函數(shù)
async
def
main():
nums = 100
data = [i for i in range(nums)]
results = [call_api(tokens[int(i % workders)], item) for i, item in enumerate(data)]
# 使用 tqdm 創(chuàng)建一個(gè)進(jìn)度條
with tqdm(total=len(results)) as pbar:
# 使用 asyncio.gather 并行執(zhí)行任務(wù)
results = await asyncio.gather(
*(_run_task_with_progress(task, pbar) for task in results)
)
return results
# 運(yùn)行程序
result = asyncio.run(main())
print(result)
在使用異步協(xié)程時(shí),一定要限速,不然會(huì)被封。
limiter = AsyncLimiter(5, 1)
, 創(chuàng)建限速器,每秒最多發(fā)出 5 個(gè)請(qǐng)求。tokens[int(i % workders)
令牌輪轉(zhuǎn),避免同一個(gè)token訪問頻率過高被封。
假如 AsyncLimiter 限速 每秒15條請(qǐng)求,令牌池中有3個(gè)token,那么相當(dāng)于每個(gè)token的請(qǐng)求速度降低到了每秒5(15 / 3)條請(qǐng)求。每個(gè)token的頻率降低了,但是總的頻率還是很高的。
建議:最好使用多個(gè)平臺(tái)的API接口。服務(wù)商能夠看到我們主機(jī)的IP,即便使用了多個(gè)token,但是IP是同一個(gè),容易被封IP。目前API的服務(wù)器提供商很多,咱們用多個(gè)平臺(tái)的 API 對(duì)服務(wù)商也好,壓力散布到多個(gè)服務(wù)商,不用只霍霍一家服務(wù)商。
使用tqdm
與_run_task_with_progress
結(jié)合構(gòu)建進(jìn)度條
asyncio.gather
函數(shù)用于并行運(yùn)行多個(gè)協(xié)程,并在所有協(xié)程完成后返回結(jié)果。利用asyncio.gather實(shí)現(xiàn)一個(gè)進(jìn)度條工具,創(chuàng)建一個(gè)協(xié)程來更新進(jìn)度條,同時(shí)使用asyncio.gather來跟蹤其他協(xié)程的完成情況。
使用 tqdm 創(chuàng)建一個(gè)進(jìn)度條對(duì)象 pbar,并設(shè)置 total 為任務(wù)的數(shù)量。
使用 asyncio.gather 并行執(zhí)行所有任務(wù),同時(shí)通過 _run_task_with_progress 包裝每個(gè)任務(wù)以更新進(jìn)度條。
await
是錯(cuò)誤的,正確的做法是構(gòu)建任務(wù)列表,然后通過asyncio.gather
并發(fā)執(zhí)行任務(wù)。result = [await call_api(tokens[int(i % workers)], item) for i, item in enumerate(data)]
result = [call_api(tokens[int(i % workers)], item) for i, item in enumerate(data)]
asyncio.gather
并發(fā)運(yùn)行任務(wù)可以充分利用異步特性,縮短總執(zhí)行時(shí)間。下面的代碼展示了如何使用多個(gè) API 密鑰組成的令牌池來優(yōu)化 LLM API 調(diào)用。我們以.env
文件存儲(chǔ) API 密鑰為例。
創(chuàng)建.env
文件,存放多個(gè)api key 構(gòu)成令牌池
API_KEY=sk-xxx,sk-xxx,sk-xxx
utils.py
import re
import json
import random
import time
from typing import Union, Dict
def
generate_arithmetic_expression(num: int):
"""
num: 幾個(gè)操作符
"""
# 定義操作符和數(shù)字范圍,除法
operators = ['+', '-', '*']
expression = f"{random.randint(1, 100)}
{random.choice(operators)}
{random.randint(1, 100)}"
num -= 1
for _ in range(num):
expression = f"{expression}
{random.choice(operators)}
{random.randint(1, 100)}"
result = eval(expression)
expression = expression.replace('*', 'x')
return expression, result
def
re_parse_json(text) -> Union[Dict, None]:
# 提取 JSON 內(nèi)容
json_match = re.search(r'\{.*?\}', text, re.DOTALL)
if json_match:
json_data = json_match.group(0)
response_data = json.loads(json_data)
return response_data
print(f"異常:\n{text}")
return
None
def
calculate_time_difference(start_time, end_time):
elapsed_time = end_time - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
milliseconds = (elapsed_time - int(elapsed_time)) * 1000
print(
f"executed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03} (h:m:s.ms)"
)
def
time_logger(func):
def
wrapper(*args, **kwargs):
start_time = time.time() # 記錄開始時(shí)間
result = func(*args, **kwargs) # 執(zhí)行目標(biāo)函數(shù)
end_time = time.time() # 記錄結(jié)束時(shí)間
elapsed_time = end_time - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
milliseconds = (elapsed_time - int(elapsed_time)) * 1000
print(
f"Function '{func.__name__}' executed in {int(hours):02}:{int(minutes):02}:{int(seconds):02}.{int(milliseconds):03} (h:m:s.ms)")
return result
return wrapper
# 測(cè)試生成
if __name__ == "__main__":
expr, res = generate_arithmetic_expression(4)
print(f"生成的運(yùn)算表達(dá)式: {expr}")
print(f"計(jì)算結(jié)果: {res}")
異步協(xié)程核心代碼:
import asyncio
import os
import time
from tqdm import tqdm
from dataclasses import dataclass, field
from typing import List, Tuple, TypedDict
from aiolimiter import AsyncLimiter
# 創(chuàng)建限速器,每秒最多發(fā)出 5 個(gè)請(qǐng)求
limiter = AsyncLimiter(5, 1)
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from utils import (
generate_arithmetic_expression,
re_parse_json,
calculate_time_difference,
)
@dataclass
class
LLMAPI:
"""
大模型API的調(diào)用類
"""
base_url: str
api_key: str # 每個(gè)API的key不一樣
uid: int
cnt: int = 0
# 統(tǒng)計(jì)每個(gè)API被調(diào)用了多少次
llm: ChatOpenAI = field(init=False) # 自動(dòng)創(chuàng)建的對(duì)象,不需要用戶傳入
def
__post_init__(self):
# 初始化 llm 對(duì)象
self.llm = self.create_llm()
def
create_llm(self):
# 創(chuàng)建 llm 對(duì)象
return ChatOpenAI(
model="gpt-4o-mini",
base_url=self.base_url,
api_key=self.api_key,
)
async
def
agenerate(self, text):
self.cnt += 1
res = await self.llm.agenerate([text])
return res
async
def
call_llm(llm: LLMAPI, text: str):
# 異步協(xié)程 限速
async
with limiter:
res = await llm.agenerate(text)
return res
async
def
_run_task_with_progress(task, pbar):
"""包裝任務(wù)以更新進(jìn)度條"""
result = await task
pbar.update(1)
return result
async
def
run_api(llms: List[LLMAPI], data: List[str]) -> Tuple[List[str], List[LLMAPI]]:
results = [call_llm(llms[i % len(llms)], text) for i, text in enumerate(data)]
# 使用 tqdm 創(chuàng)建一個(gè)進(jìn)度條
with tqdm(total=len(results)) as pbar:
# 使用 asyncio.gather 并行執(zhí)行任務(wù)
results = await asyncio.gather(
*[_run_task_with_progress(task, pbar) for task in results]
)
return results, llms
if __name__ == "__main__":
load_dotenv()
# 四則運(yùn)算提示詞模板
prompt_template = """
請(qǐng)將以下表達(dá)式的計(jì)算結(jié)果返回為 JSON 格式:
{{
"expression": "{question}",
"infer": ?
}}
"""
questions = []
labels = []
for _ in range(10000):
question, label = generate_arithmetic_expression(2)
questions.append(prompt_template.format(question=question))
labels.append(label)
start_time = time.time()
# for jupyter
# results, llms = await run_api(api_keys, questions)
api_keys = os.getenv("API_KEY").split(",")
base_url = os.getenv("BASE_URL")
# 創(chuàng)建LLM
llms = [LLMAPI(base_url=base_url, api_key=key, uid=i) for i, key in enumerate(api_keys)]
results, llms = asyncio.run(run_api(llms, questions))
right = 0
# 大模型回答正確
except_cnt = 0
# 大模型不按照json格式返回結(jié)果
not_equal = 0
# 大模型解答錯(cuò)誤
for q, res, label in zip(questions, results, labels):
res = res.generations[0][0].text
try:
res = re_parse_json(res)
if res is
None:
except_cnt += 1
continue
res = res.get("infer", None)
if res is
None:
except_cnt += 1
continue
res = int(res)
if res == label:
right += 1
else:
not_equal += 1
except Exception as e:
print(e)
print(f"question:{q}\nresult:{res}")
print("accuracy: {}%".format(right / len(questions) * 100))
end_time = time.time()
calculate_time_difference(start_time, end_time)
print(right, except_cnt, not_equal)
上述是大模型進(jìn)行四則運(yùn)算實(shí)戰(zhàn)的代碼,雖然寫的內(nèi)容有點(diǎn)多了,但是相信大家看完還是會(huì)有所收獲的。
如果大家想直接將其應(yīng)用到自己的代碼中,建議瀏覽run_api
函數(shù)。仿照上述類似的流程完成代碼的編寫即可實(shí)現(xiàn)。
如下圖是API調(diào)用的網(wǎng)頁后臺(tái)數(shù)據(jù),其在短時(shí)間內(nèi),發(fā)出了多個(gè)請(qǐng)求。如果不使用協(xié)程,則必須收到上一個(gè)請(qǐng)求的結(jié)果后,才能發(fā)送下一個(gè)請(qǐng)求。
在異步協(xié)程不限速時(shí),在90條四則運(yùn)算進(jìn)行推理,對(duì)比花費(fèi)的時(shí)間:
1個(gè)key | 3個(gè)key | |
---|---|---|
invoke | 5分半 | / |
agenerate | 15秒 | 15秒 |
invoke
不支持異步,agenerate
支持異步。
在異步協(xié)程不限速的情況下,發(fā)現(xiàn)使用1個(gè)key和多key的運(yùn)行時(shí)間是一樣的。這是因?yàn)椴幌匏俚那闆r下,會(huì)在第一時(shí)間把所有的請(qǐng)求發(fā)出去,令牌池效果體現(xiàn)不出來。
只有在對(duì)異步協(xié)程限速的情況下,才能體現(xiàn)出令牌池的效果。在上文的限速部分進(jìn)行了細(xì)致的舉例說明。
若只使用一個(gè)令牌,對(duì)它限速,確保不讓服務(wù)商封號(hào),使用異步協(xié)程保持在一個(gè)恰當(dāng)?shù)乃俣?,比較省事。注冊(cè)很多賬號(hào),也很磨人。
上圖是運(yùn)行程序輸出的結(jié)果:
如上圖的進(jìn)度條所示,20秒跑完100條數(shù)據(jù),平均每秒處理4.88條數(shù)據(jù),大模型計(jì)算四則運(yùn)算的準(zhǔn)確率 85%(只在100條數(shù)據(jù)上實(shí)驗(yàn),會(huì)有波動(dòng))。
print(right, except_cnt, not_equal)
的輸出結(jié)果是 85 0 15,大模型計(jì)算正確85條數(shù)據(jù),異常0條,計(jì)算錯(cuò)誤15條。
Question是輸入到大模型的提示詞,LLM Infer是大模型生成的答案,label 是真實(shí)的結(jié)果。
在實(shí)驗(yàn)中發(fā)現(xiàn),上述提示詞讓大模型做四則運(yùn)算的準(zhǔn)確率不夠高。本文更新了一版提示詞后,準(zhǔn)確率達(dá)到98%。
我不想花時(shí)間琢磨提示詞的編寫,故提示詞也是讓大模型自己生成的。
你是一名擅長數(shù)學(xué)運(yùn)算的助手,負(fù)責(zé)逐步推理并解決四則運(yùn)算問題。請(qǐng)按照以下步驟進(jìn)行:
1. 閱讀并理解問題。
2. 分步計(jì)算,逐步解決問題。
3. 給出最終的結(jié)果。
4. 按照 JSON 格式輸出結(jié)果,包括:
- reason: 詳細(xì)的推理過程。
- infer: 最終的計(jì)算結(jié)果。
問題:{問題描述}
請(qǐng)給出分析和結(jié)果。
使用上述提示詞后,準(zhǔn)確率達(dá)到98%。
通過異步編程結(jié)合令牌池的設(shè)計(jì),可以顯著提高大模型 API 的調(diào)用效率。關(guān)鍵在于:
asyncio
管理異步任務(wù)。asyncio.gather
并發(fā)執(zhí)行。這一思路可以應(yīng)用于需要高并發(fā)的場(chǎng)景,例如自然語言處理、實(shí)時(shí)數(shù)據(jù)處理等,助力開發(fā)者構(gòu)建高效的 AI 應(yīng)用系統(tǒng)。
文章轉(zhuǎn)自微信公眾號(hào)@AI悠閑區(qū)
Ollama Python 調(diào)用:本地大模型的高效交互方式
探索海洋數(shù)據(jù)的寶庫:Amentum海洋數(shù)據(jù)探測(cè)API的潛力
Jenkins API和Docker快速上手指南
HapiJS 身份驗(yàn)證 : 使用 JWT 保護(hù)您的 API
使用 Axios 在 React 中創(chuàng)建集中式 API 客戶端文件
Cursor + Devbox 進(jìn)階開發(fā)實(shí)踐:從 Hello World 到 One API
火山引擎如何接入API:從入門到實(shí)踐的技術(shù)指南
什么是聚類分析?
通過API監(jiān)控提高API穩(wěn)定性
對(duì)比大模型API的內(nèi)容創(chuàng)意新穎性、情感共鳴力、商業(yè)轉(zhuǎn)化潛力
一鍵對(duì)比試用API 限時(shí)免費(fèi)