From 705087529524ec96602435cd5eb736f0632e1d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Galego?= Date: Fri, 18 Oct 2024 14:17:14 +0100 Subject: [PATCH] Added support for Amazon Bedrock models --- .gitignore | 4 + examples/lightrag_bedrock_demo.py | 48 +++++++++++ lightrag/llm.py | 128 ++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 181 insertions(+) create mode 100644 .gitignore create mode 100644 examples/lightrag_bedrock_demo.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..cb457220 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +*.egg-info +dickens/ +book.txt \ No newline at end of file diff --git a/examples/lightrag_bedrock_demo.py b/examples/lightrag_bedrock_demo.py new file mode 100644 index 00000000..36ec3857 --- /dev/null +++ b/examples/lightrag_bedrock_demo.py @@ -0,0 +1,48 @@ +""" +LightRAG meets Amazon Bedrock ⛰️ +""" + +import os + +from lightrag import LightRAG, QueryParam +from lightrag.llm import bedrock_complete, bedrock_embedding +from lightrag.utils import EmbeddingFunc + +WORKING_DIR = "./dickens" + +if not os.path.exists(WORKING_DIR): + os.mkdir(WORKING_DIR) + +rag = LightRAG( + working_dir=WORKING_DIR, + llm_model_func=bedrock_complete, + llm_model_name="anthropic.claude-3-haiku-20240307-v1:0", + node2vec_params = { + 'dimensions': 1024, + 'num_walks': 10, + 'walk_length': 40, + 'window_size': 2, + 'iterations': 3, + 'random_seed': 3 + }, + embedding_func=EmbeddingFunc( + embedding_dim=1024, + max_token_size=8192, + func=lambda texts: bedrock_embedding(texts) + ) +) + +with open("./book.txt") as f: + rag.insert(f.read()) + +# Naive search +print(rag.query("What are the top themes in this story?", param=QueryParam(mode="naive"))) + +# Local search +print(rag.query("What are the top themes in this story?", param=QueryParam(mode="local"))) + +# Global search +print(rag.query("What are the top themes in this story?", param=QueryParam(mode="global"))) + +# Hybrid search +print(rag.query("What are the top themes in this story?", param=QueryParam(mode="hybrid"))) diff --git a/lightrag/llm.py b/lightrag/llm.py index 7328a583..8fc0da2e 100644 --- a/lightrag/llm.py +++ b/lightrag/llm.py @@ -1,4 +1,6 @@ import os +import json +import aioboto3 import numpy as np import ollama from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout @@ -48,6 +50,54 @@ async def openai_complete_if_cache( ) return response.choices[0].message.content +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), +) +async def bedrock_complete_if_cache( + model, prompt, system_prompt=None, history_messages=[], base_url=None, + aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs +) -> str: + os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id) + os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key) + os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token) + + hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) + + messages = [] + messages.extend(history_messages) + messages.append({'role': "user", 'content': [{'text': prompt}]}) + + args = { + 'modelId': model, + 'messages': messages + } + + if system_prompt: + args['system'] = [{'text': system_prompt}] + + if hashing_kv is not None: + args_hash = compute_args_hash(model, messages) + if_cache_return = await hashing_kv.get_by_id(args_hash) + if if_cache_return is not None: + return if_cache_return["return"] + + session = aioboto3.Session() + async with session.client("bedrock-runtime") as bedrock_async_client: + + response = await bedrock_async_client.converse(**args, **kwargs) + + if hashing_kv is not None: + await hashing_kv.upsert({ + args_hash: { + 'return': response['output']['message']['content'][0]['text'], + 'model': model + } + }) + + return response['output']['message']['content'][0]['text'] + async def hf_model_if_cache( model, prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: @@ -145,6 +195,19 @@ async def gpt_4o_mini_complete( **kwargs, ) + +async def bedrock_complete( + prompt, system_prompt=None, history_messages=[], **kwargs +) -> str: + return await bedrock_complete_if_cache( + "anthropic.claude-3-sonnet-20240229-v1:0", + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + **kwargs, + ) + + async def hf_model_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: @@ -186,6 +249,71 @@ async def openai_embedding(texts: list[str], model: str = "text-embedding-3-smal return np.array([dp.embedding for dp in response.data]) +# @wrap_embedding_func_with_attrs(embedding_dim=1024, max_token_size=8192) +# @retry( +# stop=stop_after_attempt(3), +# wait=wait_exponential(multiplier=1, min=4, max=10), +# retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), # TODO: fix exceptions +# ) +async def bedrock_embedding( + texts: list[str], model: str = "amazon.titan-embed-text-v2:0", + aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None) -> np.ndarray: + os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id) + os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key) + os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token) + + session = aioboto3.Session() + async with session.client("bedrock-runtime") as bedrock_async_client: + + if (model_provider := model.split(".")[0]) == "amazon": + embed_texts = [] + for text in texts: + if "v2" in model: + body = json.dumps({ + 'inputText': text, + # 'dimensions': embedding_dim, + 'embeddingTypes': ["float"] + }) + elif "v1" in model: + body = json.dumps({ + 'inputText': text + }) + else: + raise ValueError(f"Model {model} is not supported!") + + response = await bedrock_async_client.invoke_model( + modelId=model, + body=body, + accept="application/json", + contentType="application/json" + ) + + response_body = await response.get('body').json() + + embed_texts.append(response_body['embedding']) + elif model_provider == "cohere": + body = json.dumps({ + 'texts': texts, + 'input_type': "search_document", + 'truncate': "NONE" + }) + + response = await bedrock_async_client.invoke_model( + model=model, + body=body, + accept="application/json", + contentType="application/json" + ) + + response_body = json.loads(response.get('body').read()) + + embed_texts = response_body['embeddings'] + else: + raise ValueError(f"Model provider '{model_provider}' is not supported!") + + return np.array(embed_texts) + + async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray: input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids with torch.no_grad(): diff --git a/requirements.txt b/requirements.txt index f7dcd787..a1054692 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aioboto3 openai tiktoken networkx