Merge pull request #49 from JGalego/feat/bedrock-support

feat: Amazon Bedrock support ⛰️
This commit is contained in:
zrguo
2024-10-19 11:30:26 +08:00
committed by GitHub
4 changed files with 204 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
__pycache__
*.egg-info
dickens/
book.txt

View File

@@ -0,0 +1,41 @@
"""
LightRAG meets Amazon Bedrock ⛰️
"""
import os
import logging
from lightrag import LightRAG, QueryParam
from lightrag.llm import bedrock_complete, bedrock_embedding
from lightrag.utils import EmbeddingFunc
logging.getLogger("aiobotocore").setLevel(logging.WARNING)
WORKING_DIR = "./dickens"
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=bedrock_complete,
llm_model_name="Anthropic Claude 3 Haiku // Amazon Bedrock",
embedding_func=EmbeddingFunc(
embedding_dim=1024,
max_token_size=8192,
func=bedrock_embedding
)
)
with open("./book.txt", 'r', encoding='utf-8') as f:
rag.insert(f.read())
for mode in ["naive", "local", "global", "hybrid"]:
print("\n+-" + "-" * len(mode) + "-+")
print(f"| {mode.capitalize()} |")
print("+-" + "-" * len(mode) + "-+\n")
print(
rag.query(
"What are the top themes in this story?",
param=QueryParam(mode=mode)
)
)

View File

@@ -1,4 +1,9 @@
import os import os
import copy
import json
import botocore
import aioboto3
import botocore.errorfactory
import numpy as np import numpy as np
import ollama import ollama
from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout
@@ -48,6 +53,81 @@ async def openai_complete_if_cache(
) )
return response.choices[0].message.content return response.choices[0].message.content
class BedrockError(Exception):
"""Generic error for issues related to Amazon Bedrock"""
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, max=60),
retry=retry_if_exception_type((BedrockError)),
)
async def bedrock_complete_if_cache(
model, prompt, system_prompt=None, history_messages=[],
aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs
) -> str:
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id)
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key)
os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token)
# Fix message history format
messages = []
for history_message in history_messages:
message = copy.copy(history_message)
message['content'] = [{'text': message['content']}]
messages.append(message)
# Add user prompt
messages.append({'role': "user", 'content': [{'text': prompt}]})
# Initialize Converse API arguments
args = {
'modelId': model,
'messages': messages
}
# Define system prompt
if system_prompt:
args['system'] = [{'text': system_prompt}]
# Map and set up inference parameters
inference_params_map = {
'max_tokens': "maxTokens",
'top_p': "topP",
'stop_sequences': "stopSequences"
}
if (inference_params := list(set(kwargs) & set(['max_tokens', 'temperature', 'top_p', 'stop_sequences']))):
args['inferenceConfig'] = {}
for param in inference_params:
args['inferenceConfig'][inference_params_map.get(param, param)] = kwargs.pop(param)
hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None)
if hashing_kv is not None:
args_hash = compute_args_hash(model, messages)
if_cache_return = await hashing_kv.get_by_id(args_hash)
if if_cache_return is not None:
return if_cache_return["return"]
# Call model via Converse API
session = aioboto3.Session()
async with session.client("bedrock-runtime") as bedrock_async_client:
try:
response = await bedrock_async_client.converse(**args, **kwargs)
except Exception as e:
raise BedrockError(e)
if hashing_kv is not None:
await hashing_kv.upsert({
args_hash: {
'return': response['output']['message']['content'][0]['text'],
'model': model
}
})
return response['output']['message']['content'][0]['text']
async def hf_model_if_cache( async def hf_model_if_cache(
model, prompt, system_prompt=None, history_messages=[], **kwargs model, prompt, system_prompt=None, history_messages=[], **kwargs
) -> str: ) -> str:
@@ -145,6 +225,19 @@ async def gpt_4o_mini_complete(
**kwargs, **kwargs,
) )
async def bedrock_complete(
prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
return await bedrock_complete_if_cache(
"anthropic.claude-3-haiku-20240307-v1:0",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
**kwargs,
)
async def hf_model_complete( async def hf_model_complete(
prompt, system_prompt=None, history_messages=[], **kwargs prompt, system_prompt=None, history_messages=[], **kwargs
) -> str: ) -> str:
@@ -186,6 +279,71 @@ async def openai_embedding(texts: list[str], model: str = "text-embedding-3-smal
return np.array([dp.embedding for dp in response.data]) return np.array([dp.embedding for dp in response.data])
# @wrap_embedding_func_with_attrs(embedding_dim=1024, max_token_size=8192)
# @retry(
# stop=stop_after_attempt(3),
# wait=wait_exponential(multiplier=1, min=4, max=10),
# retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), # TODO: fix exceptions
# )
async def bedrock_embedding(
texts: list[str], model: str = "amazon.titan-embed-text-v2:0",
aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None) -> np.ndarray:
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id)
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key)
os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token)
session = aioboto3.Session()
async with session.client("bedrock-runtime") as bedrock_async_client:
if (model_provider := model.split(".")[0]) == "amazon":
embed_texts = []
for text in texts:
if "v2" in model:
body = json.dumps({
'inputText': text,
# 'dimensions': embedding_dim,
'embeddingTypes': ["float"]
})
elif "v1" in model:
body = json.dumps({
'inputText': text
})
else:
raise ValueError(f"Model {model} is not supported!")
response = await bedrock_async_client.invoke_model(
modelId=model,
body=body,
accept="application/json",
contentType="application/json"
)
response_body = await response.get('body').json()
embed_texts.append(response_body['embedding'])
elif model_provider == "cohere":
body = json.dumps({
'texts': texts,
'input_type': "search_document",
'truncate': "NONE"
})
response = await bedrock_async_client.invoke_model(
model=model,
body=body,
accept="application/json",
contentType="application/json"
)
response_body = json.loads(response.get('body').read())
embed_texts = response_body['embeddings']
else:
raise ValueError(f"Model provider '{model_provider}' is not supported!")
return np.array(embed_texts)
async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray: async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray:
input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids
with torch.no_grad(): with torch.no_grad():

View File

@@ -1,3 +1,4 @@
aioboto3
openai openai
tiktoken tiktoken
networkx networkx