Added support for Amazon Bedrock models

This commit is contained in:
João Galego
2024-10-18 14:17:14 +01:00
parent f49de420cf
commit 1fc55b18d5
4 changed files with 181 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
__pycache__
*.egg-info
dickens/
book.txt

View File

@@ -0,0 +1,48 @@
"""
LightRAG meets Amazon Bedrock ⛰️
"""
import os
from lightrag import LightRAG, QueryParam
from lightrag.llm import bedrock_complete, bedrock_embedding
from lightrag.utils import EmbeddingFunc
WORKING_DIR = "./dickens"
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=bedrock_complete,
llm_model_name="anthropic.claude-3-haiku-20240307-v1:0",
node2vec_params = {
'dimensions': 1024,
'num_walks': 10,
'walk_length': 40,
'window_size': 2,
'iterations': 3,
'random_seed': 3
},
embedding_func=EmbeddingFunc(
embedding_dim=1024,
max_token_size=8192,
func=lambda texts: bedrock_embedding(texts)
)
)
with open("./book.txt") as f:
rag.insert(f.read())
# Naive search
print(rag.query("What are the top themes in this story?", param=QueryParam(mode="naive")))
# Local search
print(rag.query("What are the top themes in this story?", param=QueryParam(mode="local")))
# Global search
print(rag.query("What are the top themes in this story?", param=QueryParam(mode="global")))
# Hybrid search
print(rag.query("What are the top themes in this story?", param=QueryParam(mode="hybrid")))

View File

@@ -1,4 +1,6 @@
import os import os
import json
import aioboto3
import numpy as np import numpy as np
import ollama import ollama
from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout
@@ -48,6 +50,54 @@ async def openai_complete_if_cache(
) )
return response.choices[0].message.content return response.choices[0].message.content
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)),
)
async def bedrock_complete_if_cache(
model, prompt, system_prompt=None, history_messages=[], base_url=None,
aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs
) -> str:
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id)
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key)
os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token)
hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None)
messages = []
messages.extend(history_messages)
messages.append({'role': "user", 'content': [{'text': prompt}]})
args = {
'modelId': model,
'messages': messages
}
if system_prompt:
args['system'] = [{'text': system_prompt}]
if hashing_kv is not None:
args_hash = compute_args_hash(model, messages)
if_cache_return = await hashing_kv.get_by_id(args_hash)
if if_cache_return is not None:
return if_cache_return["return"]
session = aioboto3.Session()
async with session.client("bedrock-runtime") as bedrock_async_client:
response = await bedrock_async_client.converse(**args, **kwargs)
if hashing_kv is not None:
await hashing_kv.upsert({
args_hash: {
'return': response['output']['message']['content'][0]['text'],
'model': model
}
})
return response['output']['message']['content'][0]['text']
async def hf_model_if_cache( async def hf_model_if_cache(
model, prompt, system_prompt=None, history_messages=[], **kwargs model, prompt, system_prompt=None, history_messages=[], **kwargs
) -> str: ) -> str:
@@ -145,6 +195,19 @@ async def gpt_4o_mini_complete(
**kwargs, **kwargs,
) )
async def bedrock_complete(
prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
return await bedrock_complete_if_cache(
"anthropic.claude-3-sonnet-20240229-v1:0",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
**kwargs,
)
async def hf_model_complete( async def hf_model_complete(
prompt, system_prompt=None, history_messages=[], **kwargs prompt, system_prompt=None, history_messages=[], **kwargs
) -> str: ) -> str:
@@ -186,6 +249,71 @@ async def openai_embedding(texts: list[str], model: str = "text-embedding-3-smal
return np.array([dp.embedding for dp in response.data]) return np.array([dp.embedding for dp in response.data])
# @wrap_embedding_func_with_attrs(embedding_dim=1024, max_token_size=8192)
# @retry(
# stop=stop_after_attempt(3),
# wait=wait_exponential(multiplier=1, min=4, max=10),
# retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), # TODO: fix exceptions
# )
async def bedrock_embedding(
texts: list[str], model: str = "amazon.titan-embed-text-v2:0",
aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None) -> np.ndarray:
os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id)
os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key)
os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token)
session = aioboto3.Session()
async with session.client("bedrock-runtime") as bedrock_async_client:
if (model_provider := model.split(".")[0]) == "amazon":
embed_texts = []
for text in texts:
if "v2" in model:
body = json.dumps({
'inputText': text,
# 'dimensions': embedding_dim,
'embeddingTypes': ["float"]
})
elif "v1" in model:
body = json.dumps({
'inputText': text
})
else:
raise ValueError(f"Model {model} is not supported!")
response = await bedrock_async_client.invoke_model(
modelId=model,
body=body,
accept="application/json",
contentType="application/json"
)
response_body = await response.get('body').json()
embed_texts.append(response_body['embedding'])
elif model_provider == "cohere":
body = json.dumps({
'texts': texts,
'input_type': "search_document",
'truncate': "NONE"
})
response = await bedrock_async_client.invoke_model(
model=model,
body=body,
accept="application/json",
contentType="application/json"
)
response_body = json.loads(response.get('body').read())
embed_texts = response_body['embeddings']
else:
raise ValueError(f"Model provider '{model_provider}' is not supported!")
return np.array(embed_texts)
async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray: async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray:
input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids
with torch.no_grad(): with torch.no_grad():

View File

@@ -1,3 +1,4 @@
aioboto3
openai openai
tiktoken tiktoken
networkx networkx