Skip to content

Commit

Permalink
Ll/vectara (#783)
Browse files Browse the repository at this point in the history
Add vectara support in Eidolon.

1. Adds vectara logic unit for simple agentic rag
2. Adds vectara agent utilizing built in chat interface. 

Will add docs in follow-up pr if we like the contract
  • Loading branch information
LukeLalor committed Sep 18, 2024
1 parent 56971f5 commit e2fccf4
Show file tree
Hide file tree
Showing 11 changed files with 7,134 additions and 14 deletions.
2 changes: 1 addition & 1 deletion sdk/eidolon_ai_sdk/agent/doc_manager/document_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class DocumentManagerSpec(BaseModel):
name: str = Field(description="The name of the document manager (used to name database collections).")
recheck_frequency: int = Field(default=60, description="The number of seconds between checks.")
loader: AnnotatedReference[DocumentLoader]
doc_processor: AnnotatedReference[DocumentProcessor]
concurrency: int = Field(default=8, description="The number of concurrent tasks to run.")
doc_processor: AnnotatedReference[DocumentProcessor]


class DocumentManager(Specable[DocumentManagerSpec]):
Expand Down
18 changes: 10 additions & 8 deletions sdk/eidolon_ai_sdk/agent/doc_manager/document_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Iterable

from opentelemetry import trace
from pydantic import BaseModel
Expand All @@ -9,10 +10,12 @@
from eidolon_ai_sdk.agent.doc_manager.parsers.base_parser import DocumentParser, DataBlob
from eidolon_ai_sdk.agent.doc_manager.transformer.document_transformer import DocumentTransformer
from eidolon_ai_sdk.agent_os import AgentOS
from eidolon_ai_sdk.memory.document import Document
from eidolon_ai_sdk.system.reference_model import Specable, AnnotatedReference
from eidolon_ai_sdk.util.async_wrapper import make_async

tracer = trace.get_tracer(__name__)
logger = logging.getLogger("eidolon")


class DocumentProcessorSpec(BaseModel):
Expand All @@ -28,14 +31,13 @@ def __init__(self, **kwargs):
Specable.__init__(self, **kwargs)
self.parser = self.spec.parser.instantiate()
self.splitter = self.spec.splitter.instantiate()
self.logger = logging.getLogger("eidolon")

@make_async
def parse(self, data: bytes, mimetype: str, path: str):
def parse(self, data: bytes, mimetype: str, path: str) -> Iterable[Document]:
return self.parser.parse(DataBlob.from_bytes(data=data, mimetype=mimetype, path=path))

@make_async
def split(self, docs):
def split(self, docs) -> Iterable[Document]:
return self.splitter.transform_documents(docs)

async def addFile(self, collection_name: str, file_info: FileInfo):
Expand All @@ -55,16 +57,16 @@ async def addFile(self, collection_name: str, file_info: FileInfo):
},
)
if len(docs) == 0:
self.logger.debug(f"File contained no text {file_info.path}")
logger.debug(f"File contained no text {file_info.path}")
return
with tracer.start_as_current_span("record similarity"):
await AgentOS.similarity_memory.add(collection_name, docs)
self.logger.debug(f"Added file {file_info.path}")
logger.debug(f"Added file {file_info.path}")
except Exception as e:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.warning(f"Failed to parse file {file_info.path}", exc_info=True)
if logger.isEnabledFor(logging.DEBUG):
logger.warning(f"Failed to parse file {file_info.path}", exc_info=True)
else:
self.logger.warning(f"Failed to parse file {file_info.path} ({e})")
logger.warning(f"Failed to parse file {file_info.path} ({e})")

async def removeFile(self, collection_name: str, path: str):
with tracer.start_as_current_span("remove file"):
Expand Down
93 changes: 93 additions & 0 deletions sdk/eidolon_ai_sdk/agent/vectara_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import copy
import json
import os
from typing import Annotated
from urllib.parse import urljoin

from fastapi import Body
from httpx import AsyncClient
from httpx_sse import EventSource
from pydantic import BaseModel, Field

from eidolon_ai_client.events import StringOutputEvent, StartStreamContextEvent, ObjectOutputEvent, \
EndStreamContextEvent, AgentStateEvent
from eidolon_ai_sdk.agent.agent import register_action
from eidolon_ai_sdk.system.processes import MongoDoc
from eidolon_ai_sdk.system.reference_model import Specable


os.environ.setdefault("VECTARA_API_KEY", "test")


class VectaraAgentSpec(BaseModel):
"""
An agent backed by Vectara. Requires the VECTARA_API_KEY environment variable to be set for authentication.
"""

corpus_key: str
description: str = "Search documents related to {{ corpus_key }}"
vectara_url: str = "https://api.vectara.io/"
body_overrides: dict = Field({}, description="Arguments to use when creating / continuing a chat. See https://docs.vectara.com/docs/rest-api/create-chat for more information.")


# We need to store chatid / processid mappings since vectara doesn't have metatdata / query concepts
class VectaraDoc(MongoDoc):
collection = "vectara_docs"
process_id: str
vectara_chat_id: str
metadata: dict = {}


class VectaraAgent(Specable[VectaraAgentSpec]):
@property
def _token(self):
return os.environ["VECTARA_API_KEY"]

@property
def _headers(self):
return {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'x-api-key': self._token
}

def _url(self, suffix):
return urljoin(self.spec.vectara_url, suffix)

@register_action("initialized", "idle", description=lambda agent, _: agent.spec.description)
async def converse(self, process_id, question: Annotated[str, Body()]):
body = copy.deepcopy(self.spec.body_overrides)
body.setdefault("search", {}).setdefault("corpora", [{}])
for corpus in body["search"]["corpora"]:
corpus.setdefault("corpus_key", self.spec.corpus_key)
body["query"] = question
body["stream_response"] = True

doc = await VectaraDoc.find_one(query=dict(process_id=process_id))
async with AsyncClient() as client:
response = await client.post(
url=self._url("/v2/chats" if not doc else f"/v2/chats/{doc.vectara_chat_id}/turns"),
headers=self._headers,
json=body,
)
response.raise_for_status()

yield StartStreamContextEvent(context_id="response_info", title="Response Information")
try:
async for sse_event in EventSource(response).aiter_sse():
if sse_event.event == "chat_info":
if not doc:
data = json.loads(sse_event.data)
doc = await VectaraDoc.create(process_id=process_id, vectara_chat_id=data['chat_id'])
elif sse_event.event == "generation_chunk":
data = json.loads(sse_event.data)
yield StringOutputEvent(content=data['generation_chunk'])
elif sse_event.event == "search_results":
for result in json.loads(sse_event.data)["search_results"]:
yield ObjectOutputEvent(stream_context="response_info", content=result)
elif sse_event.event == "factual_consistency_score":
yield ObjectOutputEvent(stream_context="response_info", content=json.loads(sse_event.data))
finally:
yield EndStreamContextEvent(context_id="response_info")

yield AgentStateEvent(state="idle")
14 changes: 9 additions & 5 deletions sdk/eidolon_ai_sdk/builtins/code_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from eidolon_ai_sdk.agent.tot_agent.checker import ToTChecker
from eidolon_ai_sdk.agent.tot_agent.thought_generators import ThoughtGenerationStrategy, ProposePromptStrategy
from eidolon_ai_sdk.agent.tot_agent.tot_agent import TreeOfThoughtsAgent
from eidolon_ai_sdk.agent.vectara_agent import VectaraAgent
from eidolon_ai_sdk.agent_os_interfaces import FileMemory, SymbolicMemory, SimilarityMemory, SecurityManager
from eidolon_ai_sdk.apu.longterm_memory_unit import LongTermMemoryUnit
from eidolon_ai_sdk.apu.agent_io import IOUnit
Expand All @@ -60,6 +61,7 @@
from eidolon_ai_sdk.builtins.components.opentelemetry import OpenTelemetryManager, CustomSampler, NoopSpanExporter
from eidolon_ai_sdk.builtins.components.usage import UsageMiddleware
from eidolon_ai_sdk.builtins.logic_units.api_logic_unit import ApiLogicUnit
from eidolon_ai_sdk.builtins.logic_units.vectara import VectaraSearch
from eidolon_ai_sdk.builtins.logic_units.web_search import WebSearch, Browser, Search
from eidolon_ai_sdk.memory.azure_file_memory import AzureFileMemory
from eidolon_ai_sdk.memory.s3_file_memory import S3FileMemory
Expand Down Expand Up @@ -122,7 +124,6 @@ def named_builtins() -> List[ReferenceResource]:
Tuples map the name of the first element to the name of the second.
Single types map the name of first element to it's fqn.
"""

builtin_list = [
AgentMachine,
# security manager
Expand All @@ -145,6 +146,11 @@ def named_builtins() -> List[ReferenceResource]:
RetrieverAgent,
AutonomousSpeechAgent,
SqlAgent,
WebScrapingAgent,
WebSearchAgent,
WebResearcher,
VectaraAgent,
APIAgent,
# apu
(APU, ConversationalAPU),
ConversationalAPU,
Expand All @@ -162,11 +168,8 @@ def named_builtins() -> List[ReferenceResource]:
Search,
Browser,
Retriever,
WebScrapingAgent,
WebSearchAgent,
WebResearcher,
ApiLogicUnit,
APIAgent,
VectaraSearch,
# machine components
(SymbolicMemory, MongoSymbolicMemory),
MongoSymbolicMemory,
Expand Down Expand Up @@ -249,4 +252,5 @@ def named_builtins() -> List[ReferenceResource]:
# config objects
ReplayConfig,
]

return [_to_resource(maybe_tuple) for maybe_tuple in builtin_list if maybe_tuple]
83 changes: 83 additions & 0 deletions sdk/eidolon_ai_sdk/builtins/logic_units/vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
from urllib.parse import urljoin

from httpx import AsyncClient
from pydantic import BaseModel, Field

from eidolon_ai_sdk.apu.logic_unit import LogicUnit, llm_function
from eidolon_ai_sdk.system.reference_model import Specable


class VectaraSearchSpec(BaseModel):
"""
A logic unit for searching in Vectara. Requires the VECTARA_API_KEY environment variable to be set for authentication.
"""

corpus_key: str = Field(description="The corpus key to search in.")
description: str = Field("Search documents related to {corpus_key}.", description="Description of the tool presented to LLM. Will be formatted with corpus_key.")
vectara_url: str = "https://api.vectara.io/"


class VectaraSearch(Specable[VectaraSearchSpec], LogicUnit):
@property
def _token(self):
return os.environ["VECTARA_API_KEY"]

@property
def _headers(self):
return {
'Accept': 'application/json',
'x-api-key': self._token
}

def _url(self, suffix):
return urljoin(self.spec.vectara_url, suffix)

@llm_function(description=lambda lu, _: lu.spec.description.format(corpus_key=lu.spec.corpus_key))
async def query(self, query: str, limit: int = 10, offset: int = 0):
async with AsyncClient() as client:
response = await client.post(
url=self._url(f"/v2/corpora/{self.spec.corpus_key}/query"),
headers=self._headers,
json=dict(
query=query,
search=dict(
limit=limit,
offset=offset,
)
),
)
response.raise_for_status()
response_body = response.json()
content = [dict(text=r.get("text"), document_id=r.get("document_id")) for r in response_body["search_results"]]
documents = {r.get("document_id"): r.get("document_metadata", {}).get("title") for r in response_body["search_results"]}
return dict(search_results=content, documents=documents)

@llm_function()
async def read_document(self, document_id: str):
async with AsyncClient() as client:
response = await client.get(
url=self._url(f"/v2/corpora/{self.spec.corpus_key}/documents/{document_id}"),
headers=self._headers,
)
response.raise_for_status()
response_body = response.json()

sections = []
for part in response_body["parts"]:
if part["metadata"].get("is_title", False):
sections.append((dict(title=part["text"], content=[])))
if "title_level" in part["metadata"]:
sections[-1]["title_level"] = part["metadata"]["title_level"]
else:
if not sections:
sections.append(dict(title="", content=[]))
sections[-1]["content"].append(part["text"])
for section in sections:
section["content"] = "".join(section["content"])

return dict(
document_id=document_id,
metadata=response_body["metadata"],
sections=sections,
)
Loading

0 comments on commit e2fccf4

Please sign in to comment.