diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..cb457220 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +*.egg-info +dickens/ +book.txt \ No newline at end of file diff --git a/examples/lightrag_bedrock_demo.py b/examples/lightrag_bedrock_demo.py new file mode 100644 index 00000000..c515922e --- /dev/null +++ b/examples/lightrag_bedrock_demo.py @@ -0,0 +1,41 @@ +""" +LightRAG meets Amazon Bedrock ⛰️ +""" + +import os +import logging + +from lightrag import LightRAG, QueryParam +from lightrag.llm import bedrock_complete, bedrock_embedding +from lightrag.utils import EmbeddingFunc + +logging.getLogger("aiobotocore").setLevel(logging.WARNING) + +WORKING_DIR = "./dickens" +if not os.path.exists(WORKING_DIR): + os.mkdir(WORKING_DIR) + +rag = LightRAG( + working_dir=WORKING_DIR, + llm_model_func=bedrock_complete, + llm_model_name="Anthropic Claude 3 Haiku // Amazon Bedrock", + embedding_func=EmbeddingFunc( + embedding_dim=1024, + max_token_size=8192, + func=bedrock_embedding + ) +) + +with open("./book.txt", 'r', encoding='utf-8') as f: + rag.insert(f.read()) + +for mode in ["naive", "local", "global", "hybrid"]: + print("\n+-" + "-" * len(mode) + "-+") + print(f"| {mode.capitalize()} |") + print("+-" + "-" * len(mode) + "-+\n") + print( + rag.query( + "What are the top themes in this story?", + param=QueryParam(mode=mode) + ) + ) diff --git a/lightrag/llm.py b/lightrag/llm.py index 7328a583..48defb4d 100644 --- a/lightrag/llm.py +++ b/lightrag/llm.py @@ -1,4 +1,9 @@ import os +import copy +import json +import botocore +import aioboto3 +import botocore.errorfactory import numpy as np import ollama from openai import AsyncOpenAI, APIConnectionError, RateLimitError, Timeout @@ -48,6 +53,81 @@ async def openai_complete_if_cache( ) return response.choices[0].message.content + +class BedrockError(Exception): + """Generic error for issues related to Amazon Bedrock""" + + +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, max=60), + retry=retry_if_exception_type((BedrockError)), +) +async def bedrock_complete_if_cache( + model, prompt, system_prompt=None, history_messages=[], + aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, **kwargs +) -> str: + os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id) + os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key) + os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token) + + # Fix message history format + messages = [] + for history_message in history_messages: + message = copy.copy(history_message) + message['content'] = [{'text': message['content']}] + messages.append(message) + + # Add user prompt + messages.append({'role': "user", 'content': [{'text': prompt}]}) + + # Initialize Converse API arguments + args = { + 'modelId': model, + 'messages': messages + } + + # Define system prompt + if system_prompt: + args['system'] = [{'text': system_prompt}] + + # Map and set up inference parameters + inference_params_map = { + 'max_tokens': "maxTokens", + 'top_p': "topP", + 'stop_sequences': "stopSequences" + } + if (inference_params := list(set(kwargs) & set(['max_tokens', 'temperature', 'top_p', 'stop_sequences']))): + args['inferenceConfig'] = {} + for param in inference_params: + args['inferenceConfig'][inference_params_map.get(param, param)] = kwargs.pop(param) + + hashing_kv: BaseKVStorage = kwargs.pop("hashing_kv", None) + if hashing_kv is not None: + args_hash = compute_args_hash(model, messages) + if_cache_return = await hashing_kv.get_by_id(args_hash) + if if_cache_return is not None: + return if_cache_return["return"] + + # Call model via Converse API + session = aioboto3.Session() + async with session.client("bedrock-runtime") as bedrock_async_client: + + try: + response = await bedrock_async_client.converse(**args, **kwargs) + except Exception as e: + raise BedrockError(e) + + if hashing_kv is not None: + await hashing_kv.upsert({ + args_hash: { + 'return': response['output']['message']['content'][0]['text'], + 'model': model + } + }) + + return response['output']['message']['content'][0]['text'] + async def hf_model_if_cache( model, prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: @@ -145,6 +225,19 @@ async def gpt_4o_mini_complete( **kwargs, ) + +async def bedrock_complete( + prompt, system_prompt=None, history_messages=[], **kwargs +) -> str: + return await bedrock_complete_if_cache( + "anthropic.claude-3-haiku-20240307-v1:0", + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + **kwargs, + ) + + async def hf_model_complete( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: @@ -186,6 +279,71 @@ async def openai_embedding(texts: list[str], model: str = "text-embedding-3-smal return np.array([dp.embedding for dp in response.data]) +# @wrap_embedding_func_with_attrs(embedding_dim=1024, max_token_size=8192) +# @retry( +# stop=stop_after_attempt(3), +# wait=wait_exponential(multiplier=1, min=4, max=10), +# retry=retry_if_exception_type((RateLimitError, APIConnectionError, Timeout)), # TODO: fix exceptions +# ) +async def bedrock_embedding( + texts: list[str], model: str = "amazon.titan-embed-text-v2:0", + aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None) -> np.ndarray: + os.environ['AWS_ACCESS_KEY_ID'] = os.environ.get('AWS_ACCESS_KEY_ID', aws_access_key_id) + os.environ['AWS_SECRET_ACCESS_KEY'] = os.environ.get('AWS_SECRET_ACCESS_KEY', aws_secret_access_key) + os.environ['AWS_SESSION_TOKEN'] = os.environ.get('AWS_SESSION_TOKEN', aws_session_token) + + session = aioboto3.Session() + async with session.client("bedrock-runtime") as bedrock_async_client: + + if (model_provider := model.split(".")[0]) == "amazon": + embed_texts = [] + for text in texts: + if "v2" in model: + body = json.dumps({ + 'inputText': text, + # 'dimensions': embedding_dim, + 'embeddingTypes': ["float"] + }) + elif "v1" in model: + body = json.dumps({ + 'inputText': text + }) + else: + raise ValueError(f"Model {model} is not supported!") + + response = await bedrock_async_client.invoke_model( + modelId=model, + body=body, + accept="application/json", + contentType="application/json" + ) + + response_body = await response.get('body').json() + + embed_texts.append(response_body['embedding']) + elif model_provider == "cohere": + body = json.dumps({ + 'texts': texts, + 'input_type': "search_document", + 'truncate': "NONE" + }) + + response = await bedrock_async_client.invoke_model( + model=model, + body=body, + accept="application/json", + contentType="application/json" + ) + + response_body = json.loads(response.get('body').read()) + + embed_texts = response_body['embeddings'] + else: + raise ValueError(f"Model provider '{model_provider}' is not supported!") + + return np.array(embed_texts) + + async def hf_embedding(texts: list[str], tokenizer, embed_model) -> np.ndarray: input_ids = tokenizer(texts, return_tensors='pt', padding=True, truncation=True).input_ids with torch.no_grad(): diff --git a/requirements.txt b/requirements.txt index f7dcd787..a1054692 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +aioboto3 openai tiktoken networkx