-
Notifications
You must be signed in to change notification settings - Fork 477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ai-cache] Implement a WASM plugin for LLM result retrieval based on vector similarity #1290
base: main
Are you sure you want to change the base?
Conversation
update update: 注意在使用http协议的时候不要用tls update: add lobechat add: makefile for ai-proxy fix bugs fix bugs fix: redis connection fix: dashvector and dashscope cluster fix: change vdb collection feat: add chroma logic docs: 增加 api 说明 update: no callback version fix: change to callback fix: finish chrome remove: key update: gitignore
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1290 +/- ##
==========================================
+ Coverage 35.91% 44.22% +8.31%
==========================================
Files 69 75 +6
Lines 11576 9823 -1753
==========================================
+ Hits 4157 4344 +187
+ Misses 7104 5150 -1954
- Partials 315 329 +14 |
} | ||
log.Debugf("unknown message:%s", bodyJson) | ||
return "" | ||
RedisSearchHandler(key, ctx, config, log, stream, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前所有的 cache 逻辑都是从 CacheProvider 作为入口的。假如我本地没有 Redis,能跳过 Cache 阶段直接查 VDB 吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个和embedding部分好像都需要做成可选的?因为有些vdb不支持直接存embedding。
理论上需要判断是否含embeddingProvider和cacheProvider从而进入不同的入口函数,以及对应修改core.go
的逻辑?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不支持直接存embedding的VDB能给个例子吗?这种我们怎么查呢?
具体改法可以再想想。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
目前所有的 cache 逻辑都是从 CacheProvider 作为入口的。假如我本地没有 Redis,能跳过 Cache 阶段直接查 VDB 吗?
跳过 Cache 阶段直接查 VDB 有两种方式:
- 用用户定义的 Embedding 模型产生的 Vector 进行向量查询,这种方式需要在插件中使用 embeddingProvider。
- 部分 VDB 对 Embedding 过程进行了封装,支持直接使用字符串进行查询,比如(Chroma[1],Weaviate[2]),这种方式不需要 embeddingProvider。
这里可能需要考虑这两种对接方式?
[1]https://docs.trychroma.com/reference/py-collection#query
[2]https://weaviate.io/developers/weaviate/quickstart#step-6-queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
整个流程我粗略写了一下,可能不太全,供参考:
func onHttpRequestBody() {
if err := searchCache(ctx, config, key); err != nil {
return types.ActionContinue
}
return types.ActionPause
}
func searchCache(ctx, config, key) {
cacheProvider := config.ActiveCacheProvider
if cacheProvider == nil {
return searchVectorDb(ctx, config, key)
}
cacheProvider.search(key, func (response, err) {
processCacheResponse(ctx, response, err)
})
}
func processCacheResponse(ctx, resposne, err) {
if err == nil && response != nil {
// cache hit
sendReponse(ctx, response)
proxywasm.ResumeRequest()
return
}
// err != nil: search failed
// err == nil && response == nil: cache miss
err = searchVectorDb(ctx, key)
if err != nil {
proxywasm.ResumeRequest()
return
}
}
func searchVectorDb(ctx, config, key) {
embeddingProvider := config.ActiveEmbeddingProvider
if embeddingProvider == nil {
return error.New("xxxx")
}
embeddingProvider.search(key, func (response, err) {
processVectorDbResponse(ctx, response, err)
})
}
func processVectorDbResponse(ctx, response, err) {
}
func sendResponse(ctx wrapper.HttpContext, response string) {
stream := ctx.Get("xxx")
if stream {
// send stream response
} else {
// send normal response
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
跳过 Cache 阶段直接查 VDB 有两种方式:
- 用用户定义的 Embedding 模型产生的 Vector 进行向量查询,这种方式需要在插件中使用 embeddingProvider。
- 部分 VDB 对 Embedding 过程进行了封装,支持直接使用字符串进行查询,比如(Chroma[1],Weaviate[2]),这种方式不需要 embeddingProvider。
这里可能需要考虑这两种对接方式?
[1]https://docs.trychroma.com/reference/py-collection#query [2]https://weaviate.io/developers/weaviate/quickstart#step-6-queries
嗯,这两种都要支持。
实现上可以考虑在接口层面进行区分。实际的 provider 可以选择实现基于 text 的查询接口或者基于 embedding 的查询接口,callback 使用相同的函数签名。core 在调用时判断 provider 实现了哪个接口,决定是否需要调用 embedding provider。
同理,如果有的 provider 不支持 upload,也可以把 upload 作为一个单独的接口来进行实现。
|
||
if err != nil { | ||
log.Errorf("Failed to retrieve key: %s from cache, error: %v", key, err) | ||
proxywasm.ResumeHttpRequest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个地方有问题的。如果进了这个分支,这个函数并没有返回,外面的 onHttpRequestBody
也并没有返回 types.ActionPause
。这个 ResumeHttpRequest
会不会报错我不确定,但至少外面的请求应该是会一直卡住的。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能理解为proxywasm.ResumeHttpRequest()后面需要return来确保当前函数退出是吗?目前修改为
err := activeCacheProvider.Get(queryKey, func(response resp.Value) {
if err := response.Error(); err == nil && !response.IsNull() {
log.Infof("cache hit, key: %s", key)
processCacheHit(key, response, stream, ctx, config, log)
} else {
if err != nil {
log.Errorf("error retrieving key: %s from cache, error: %v", key, err)
}
if response.IsNull() {
log.Infof("cache miss, key: %s", key)
}
if useSimilaritySearch {
err = performSimilaritySearch(key, ctx, config, log, key, stream)
if err != nil {
log.Errorf("failed to perform similarity search for key: %s, error: %v", key, err)
proxywasm.ResumeHttpRequest()
return
}
}
proxywasm.ResumeHttpRequest()
return
}
})
跟这个PR似乎有重合的部分?#1248 如这个PR里评论的,embeding和vector部分逻辑比较通用,建议放到独立的 ai-utils 目录下 |
DOMAIN = "dashscope.aliyuncs.com" | ||
PORT = 443 | ||
DEFAULT_MODEL_NAME = "text-embedding-v1" | ||
ENDPOINT = "/api/v1/services/embeddings/text-embedding/text-embedding" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这几个字段最好体现出dashscope的关键字,不然别的服务对接的时候名字不好起。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
修改为:
const (
DASHSCOPE_DOMAIN = "dashscope.aliyuncs.com"
DASHSCOPE_PORT = 443
DASHSCOPE_DEFAULT_MODEL_NAME = "text-embedding-v1"
DASHSCOPE_ENDPOINT = "/api/v1/services/embeddings/text-embedding/text-embedding"
)
"fmt" | ||
|
||
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-cache/config" | ||
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-cache/vector" | ||
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" | ||
"github.com/go-errors/errors" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个import应该不对
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
修改为:
"errors"
processCacheHit(key, mostSimilarData.Answer, stream, ctx, config, log) | ||
} else { | ||
// otherwise, continue to check cache for the most similar key | ||
CheckCacheForKey(mostSimilarData.Text, ctx, config, log, stream, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个的报错没有处理
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
修改为:
err = CheckCacheForKey(mostSimilarData.Text, ctx, config, log, stream, false)
if err != nil {
log.Errorf("check cache for key: %s failed, error: %v", mostSimilarData.Text, err)
proxywasm.ResumeHttpRequest()
}
}) | ||
|
||
if err != nil { | ||
log.Errorf("Failed to retrieve key: %s from cache, error: %v", key, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为啥这个地方用了 %v 来处理 err,另一个地方就用的是 %s + err.Error()。。。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好像之前习惯在errors.New里面用拼接的方式,统一改成了%v的方式
err = fmt.Errorf("failed to parse response: %v", err)
log.Errorf("Failed to perform similarity search for key: %s, error: %v", key, err) | ||
} | ||
} | ||
proxywasm.ResumeHttpRequest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果去查了VDB,这里是不是就不能Resume?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
改成了不使用vdb或查找vdb出错的时候再Resume,看起来会合理点
// handleCacheResponse processes cache response and handles cache hits and misses.
func handleCacheResponse(key string, response resp.Value, ctx wrapper.HttpContext, log wrapper.Log, stream bool, config config.PluginConfig, useSimilaritySearch bool) {
if err := response.Error(); err == nil && !response.IsNull() {
log.Infof("Cache hit for key: %s", key)
processCacheHit(key, response.String(), stream, ctx, config, log)
return
}
log.Infof("Cache miss for key: %s", key)
if err := response.Error(); err != nil {
log.Errorf("Error retrieving key: %s from cache, error: %v", key, err)
}
if useSimilaritySearch {
if err := performSimilaritySearch(key, ctx, config, log, key, stream); err != nil {
log.Errorf("Failed to perform similarity search for key: %s, error: %v", key, err)
proxywasm.ResumeHttpRequest()
}
} else {
proxywasm.ResumeHttpRequest()
}
}
// Attempt to upload answer embedding first | ||
if ansEmbUploader, ok := activeVectorProvider.(vector.AnswerEmbeddingUploader); ok { | ||
log.Infof("[onHttpResponseBody] uploading answer embedding for key: %s", key) | ||
err := ansEmbUploader.UploadAnswerEmbedding(key, emb, value, ctx, log, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个命名会不会容易让人误解为上传Answer的Embedding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
接口名和接口类型修改为:
UploadAnswerAndEmbedding
AnswerAndEmbeddingUploader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far so good.
@@ -157,6 +157,13 @@ func (d *DvProvider) QueryEmbedding( | |||
return err | |||
} | |||
|
|||
func checkField(fields map[string]interface{}, key string) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getField 是不是好一点?
Ⅰ. Describe what this PR did
给ai-cache插件添加基于语文本向量相似度召回缓存的能力
Ⅱ. Does this pull request fix one issue?
fixes #1040