Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ll/vectara #783

Merged
merged 9 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/eidolon_ai_sdk/agent/doc_manager/document_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class DocumentManagerSpec(BaseModel):
name: str = Field(description="The name of the document manager (used to name database collections).")
recheck_frequency: int = Field(default=60, description="The number of seconds between checks.")
loader: AnnotatedReference[DocumentLoader]
doc_processor: AnnotatedReference[DocumentProcessor]
concurrency: int = Field(default=8, description="The number of concurrent tasks to run.")
doc_processor: AnnotatedReference[DocumentProcessor]


class DocumentManager(Specable[DocumentManagerSpec]):
Expand Down
18 changes: 10 additions & 8 deletions sdk/eidolon_ai_sdk/agent/doc_manager/document_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Iterable

from opentelemetry import trace
from pydantic import BaseModel
Expand All @@ -9,10 +10,12 @@
from eidolon_ai_sdk.agent.doc_manager.parsers.base_parser import DocumentParser, DataBlob
from eidolon_ai_sdk.agent.doc_manager.transformer.document_transformer import DocumentTransformer
from eidolon_ai_sdk.agent_os import AgentOS
from eidolon_ai_sdk.memory.document import Document
from eidolon_ai_sdk.system.reference_model import Specable, AnnotatedReference
from eidolon_ai_sdk.util.async_wrapper import make_async

tracer = trace.get_tracer(__name__)
logger = logging.getLogger("eidolon")


class DocumentProcessorSpec(BaseModel):
Expand All @@ -28,14 +31,13 @@ def __init__(self, **kwargs):
Specable.__init__(self, **kwargs)
self.parser = self.spec.parser.instantiate()
self.splitter = self.spec.splitter.instantiate()
self.logger = logging.getLogger("eidolon")

@make_async
def parse(self, data: bytes, mimetype: str, path: str):
def parse(self, data: bytes, mimetype: str, path: str) -> Iterable[Document]:
return self.parser.parse(DataBlob.from_bytes(data=data, mimetype=mimetype, path=path))

@make_async
def split(self, docs):
def split(self, docs) -> Iterable[Document]:
return self.splitter.transform_documents(docs)

async def addFile(self, collection_name: str, file_info: FileInfo):
Expand All @@ -55,16 +57,16 @@ async def addFile(self, collection_name: str, file_info: FileInfo):
},
)
if len(docs) == 0:
self.logger.debug(f"File contained no text {file_info.path}")
logger.debug(f"File contained no text {file_info.path}")
return
with tracer.start_as_current_span("record similarity"):
await AgentOS.similarity_memory.add(collection_name, docs)
self.logger.debug(f"Added file {file_info.path}")
logger.debug(f"Added file {file_info.path}")
except Exception as e:
if self.logger.isEnabledFor(logging.DEBUG):
self.logger.warning(f"Failed to parse file {file_info.path}", exc_info=True)
if logger.isEnabledFor(logging.DEBUG):
logger.warning(f"Failed to parse file {file_info.path}", exc_info=True)
else:
self.logger.warning(f"Failed to parse file {file_info.path} ({e})")
logger.warning(f"Failed to parse file {file_info.path} ({e})")

async def removeFile(self, collection_name: str, path: str):
with tracer.start_as_current_span("remove file"):
Expand Down
93 changes: 93 additions & 0 deletions sdk/eidolon_ai_sdk/agent/vectara_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import copy
import json
import os
from typing import Annotated
from urllib.parse import urljoin

from fastapi import Body
from httpx import AsyncClient
from httpx_sse import EventSource
from pydantic import BaseModel, Field

from eidolon_ai_client.events import StringOutputEvent, StartStreamContextEvent, ObjectOutputEvent, \
EndStreamContextEvent, AgentStateEvent
from eidolon_ai_sdk.agent.agent import register_action
from eidolon_ai_sdk.system.processes import MongoDoc
from eidolon_ai_sdk.system.reference_model import Specable


os.environ.setdefault("VECTARA_API_KEY", "test")


class VectaraAgentSpec(BaseModel):
"""
An agent backed by Vectara. Requires the VECTARA_API_KEY environment variable to be set for authentication.
"""

corpus_key: str
description: str = "Search documents related to {{ corpus_key }}"
vectara_url: str = "https://api.vectara.io/"
body_overrides: dict = Field({}, description="Arguments to use when creating / continuing a chat. See https://docs.vectara.com/docs/rest-api/create-chat for more information.")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agent Configuration lives here. Override allow the creation / turn interface to be arbitrarily customized since we don't know how people will want to use This. contract is the same for both endpoints,



# We need to store chatid / processid mappings since vectara doesn't have metatdata / query concepts
class VectaraDoc(MongoDoc):
collection = "vectara_docs"
process_id: str
vectara_chat_id: str
metadata: dict = {}


class VectaraAgent(Specable[VectaraAgentSpec]):
@property
def _token(self):
return os.environ["VECTARA_API_KEY"]

@property
def _headers(self):
return {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'x-api-key': self._token
}

def _url(self, suffix):
return urljoin(self.spec.vectara_url, suffix)

@register_action("initialized", "idle", description=lambda agent, _: agent.spec.description)
async def converse(self, process_id, question: Annotated[str, Body()]):
body = copy.deepcopy(self.spec.body_overrides)
body.setdefault("search", {}).setdefault("corpora", [{}])
for corpus in body["search"]["corpora"]:
corpus.setdefault("corpus_key", self.spec.corpus_key)
body["query"] = question
body["stream_response"] = True

doc = await VectaraDoc.find_one(query=dict(process_id=process_id))
async with AsyncClient() as client:
response = await client.post(
url=self._url("/v2/chats" if not doc else f"/v2/chats/{doc.vectara_chat_id}/turns"),
headers=self._headers,
json=body,
)
response.raise_for_status()

yield StartStreamContextEvent(context_id="response_info", title="Response Information")
try:
async for sse_event in EventSource(response).aiter_sse():
if sse_event.event == "chat_info":
if not doc:
data = json.loads(sse_event.data)
doc = await VectaraDoc.create(process_id=process_id, vectara_chat_id=data['chat_id'])
elif sse_event.event == "generation_chunk":
data = json.loads(sse_event.data)
yield StringOutputEvent(content=data['generation_chunk'])
elif sse_event.event == "search_results":
for result in json.loads(sse_event.data)["search_results"]:
yield ObjectOutputEvent(stream_context="response_info", content=result)
elif sse_event.event == "factual_consistency_score":
yield ObjectOutputEvent(stream_context="response_info", content=json.loads(sse_event.data))
finally:
yield EndStreamContextEvent(context_id="response_info")

yield AgentStateEvent(state="idle")
14 changes: 9 additions & 5 deletions sdk/eidolon_ai_sdk/builtins/code_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from eidolon_ai_sdk.agent.tot_agent.checker import ToTChecker
from eidolon_ai_sdk.agent.tot_agent.thought_generators import ThoughtGenerationStrategy, ProposePromptStrategy
from eidolon_ai_sdk.agent.tot_agent.tot_agent import TreeOfThoughtsAgent
from eidolon_ai_sdk.agent.vectara_agent import VectaraAgent
from eidolon_ai_sdk.agent_os_interfaces import FileMemory, SymbolicMemory, SimilarityMemory, SecurityManager
from eidolon_ai_sdk.apu.longterm_memory_unit import LongTermMemoryUnit
from eidolon_ai_sdk.apu.agent_io import IOUnit
Expand All @@ -60,6 +61,7 @@
from eidolon_ai_sdk.builtins.components.opentelemetry import OpenTelemetryManager, CustomSampler, NoopSpanExporter
from eidolon_ai_sdk.builtins.components.usage import UsageMiddleware
from eidolon_ai_sdk.builtins.logic_units.api_logic_unit import ApiLogicUnit
from eidolon_ai_sdk.builtins.logic_units.vectara import VectaraSearch
from eidolon_ai_sdk.builtins.logic_units.web_search import WebSearch, Browser, Search
from eidolon_ai_sdk.memory.azure_file_memory import AzureFileMemory
from eidolon_ai_sdk.memory.s3_file_memory import S3FileMemory
Expand Down Expand Up @@ -122,7 +124,6 @@ def named_builtins() -> List[ReferenceResource]:
Tuples map the name of the first element to the name of the second.
Single types map the name of first element to it's fqn.
"""

builtin_list = [
AgentMachine,
# security manager
Expand All @@ -145,6 +146,11 @@ def named_builtins() -> List[ReferenceResource]:
RetrieverAgent,
AutonomousSpeechAgent,
SqlAgent,
WebScrapingAgent,
WebSearchAgent,
WebResearcher,
VectaraAgent,
APIAgent,
# apu
(APU, ConversationalAPU),
ConversationalAPU,
Expand All @@ -162,11 +168,8 @@ def named_builtins() -> List[ReferenceResource]:
Search,
Browser,
Retriever,
WebScrapingAgent,
WebSearchAgent,
WebResearcher,
ApiLogicUnit,
APIAgent,
VectaraSearch,
# machine components
(SymbolicMemory, MongoSymbolicMemory),
MongoSymbolicMemory,
Expand Down Expand Up @@ -249,4 +252,5 @@ def named_builtins() -> List[ReferenceResource]:
# config objects
ReplayConfig,
]

return [_to_resource(maybe_tuple) for maybe_tuple in builtin_list if maybe_tuple]
83 changes: 83 additions & 0 deletions sdk/eidolon_ai_sdk/builtins/logic_units/vectara.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
from urllib.parse import urljoin

from httpx import AsyncClient
from pydantic import BaseModel, Field

from eidolon_ai_sdk.apu.logic_unit import LogicUnit, llm_function
from eidolon_ai_sdk.system.reference_model import Specable


class VectaraSearchSpec(BaseModel):
"""
A logic unit for searching in Vectara. Requires the VECTARA_API_KEY environment variable to be set for authentication.
"""

corpus_key: str = Field(description="The corpus key to search in.")
description: str = Field("Search documents related to {corpus_key}.", description="Description of the tool presented to LLM. Will be formatted with corpus_key.")
vectara_url: str = "https://api.vectara.io/"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vectara tool for agentic rag. Allows agents to 1. query arbitrary search terms and 2. read full documents as desired (doc id is retrieved from searches)



class VectaraSearch(Specable[VectaraSearchSpec], LogicUnit):
@property
def _token(self):
return os.environ["VECTARA_API_KEY"]

@property
def _headers(self):
return {
'Accept': 'application/json',
'x-api-key': self._token
}

def _url(self, suffix):
return urljoin(self.spec.vectara_url, suffix)

@llm_function(description=lambda lu, _: lu.spec.description.format(corpus_key=lu.spec.corpus_key))
async def query(self, query: str, limit: int = 10, offset: int = 0):
async with AsyncClient() as client:
response = await client.post(
url=self._url(f"/v2/corpora/{self.spec.corpus_key}/query"),
headers=self._headers,
json=dict(
query=query,
search=dict(
limit=limit,
offset=offset,
)
),
)
response.raise_for_status()
response_body = response.json()
content = [dict(text=r.get("text"), document_id=r.get("document_id")) for r in response_body["search_results"]]
documents = {r.get("document_id"): r.get("document_metadata", {}).get("title") for r in response_body["search_results"]}
return dict(search_results=content, documents=documents)

@llm_function()
async def read_document(self, document_id: str):
async with AsyncClient() as client:
response = await client.get(
url=self._url(f"/v2/corpora/{self.spec.corpus_key}/documents/{document_id}"),
headers=self._headers,
)
response.raise_for_status()
response_body = response.json()

sections = []
for part in response_body["parts"]:
if part["metadata"].get("is_title", False):
sections.append((dict(title=part["text"], content=[])))
if "title_level" in part["metadata"]:
sections[-1]["title_level"] = part["metadata"]["title_level"]
else:
if not sections:
sections.append(dict(title="", content=[]))
sections[-1]["content"].append(part["text"])
for section in sections:
section["content"] = "".join(section["content"])

return dict(
document_id=document_id,
metadata=response_body["metadata"],
sections=sections,
)
Loading
Loading