Skip to content

Commit

Permalink
Support spl (#251)
Browse files Browse the repository at this point in the history
* update readme

* scan mode fix flow_limit for consumer groups

* README add Query

* v2 change to pullLogMeta

* fix name and the location of flow_limit

* fix review

* revert V2 response & add Deprecated some methods to use

* name fix

* reviewer comments repair

* deprecated V2

* deprecated fix

* fix put/pull link

* [bugfix] return 403 need to flow_limit

* add utils.ParseHeaderInt to simply code

---------

Co-authored-by: crimson <1291463831@qq.com>
Co-authored-by: Crimson <39024757+crimson-gao@users.noreply.github.com>
Co-authored-by: 泓逸 <laoguihong.lgh@alibaba-inc.com>
  • Loading branch information
4 people authored Jan 29, 2024
1 parent bb56350 commit 4da4cf6
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 66 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
Client = sls.CreateNormalInterfaceV2(Endpoint, credentialsProvider)
```

为了防止出现配置错误,您可以在创建 Client 之后,测试 Client 是否能成功调用 SLS API
```go
_, err := Client.ListProject()
if err != nil {
panic(err)
}
```

2. **创建project**

参考 [log_project.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/project/log_project.go)文件
Expand Down Expand Up @@ -97,8 +105,6 @@ go get -u github.com/aliyun/aliyun-log-go-sdk

5. **写数据**

参考[put_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/put_log.go)

这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。

```go
Expand Down Expand Up @@ -143,8 +149,6 @@ go get -u github.com/aliyun/aliyun-log-go-sdk

6.**读数据**

参考[pull_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/pull_log.go)

这里展示了使用SDK中原生API接口调用去拉取数据的方式,我们不推荐使用这种方式去读取消费logstore中的数据,推荐使用SDK中 [consumer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/consumer) 消费组去拉取数据,消费组提供自动负载均衡以及失败重试等机制,并且会自动保存拉取断点,再次拉取不会拉取重复数据。

```go
Expand Down
4 changes: 4 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,18 @@ type ClientInterface interface {
// The nextCursor is the next curosr can be used to read logs at next time.
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error)
// Deprecated: Use GetLogsBytesWithQuery instead.
GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error)
GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error)
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
// Deprecated: Use PullLogsWithQuery instead.
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error)
// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogs query logs with [from, to) time range
Expand Down
10 changes: 10 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor str
return ls.GetLogsBytesV2(plr)
}

func (c *Client) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesWithQuery(plr)
}

// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
Expand All @@ -217,6 +222,11 @@ func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor s
return ls.PullLogsV2(plr)
}

func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsWithQuery(plr)
}

// GetHistograms query logs with [from, to) time range
func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
Expand Down
1 change: 1 addition & 0 deletions consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
|SecurityToken|aliyun SecurityToken|非必填,参考https://help.aliyun.com/document_detail/47277.html|
|AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用|
|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s|
|Query|过滤规则 基于规则消费时必须设置对应规则 如 *| where a = 'xxx'|非必填|

2.**覆写消费逻辑**

Expand Down
2 changes: 1 addition & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type LogHubConfig struct {
//:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken)
//:param Project:
//:param Logstore:
//:param Query:
//:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx'
//:param ConsumerGroupName:
//:param ConsumerName:
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
Expand Down
17 changes: 3 additions & 14 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err
return cursor, err
}

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) {
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
Expand All @@ -141,20 +140,10 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
Cursor: cursor,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
if plr.Query != "" {
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
if err == nil {
break
}
}
gl, plm, err = consumer.client.PullLogsWithQuery(plr)
if err != nil {
slsError, ok := err.(sls.Error)
slsError, ok := err.(*sls.Error)
if ok {
level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error",
"shard", shardId,
Expand Down
37 changes: 23 additions & 14 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import (
)

type ShardConsumerWorker struct {
client *ConsumerClient
consumerCheckPointTracker *DefaultCheckPointTracker
shutdownFlag bool
lastFetchLogGroupList *sls.LogGroupList
nextFetchCursor string
lastFetchGroupCount int
lastFetchTime time.Time
lastFetchRawSize int
consumerStatus string
processor Processor
shardId int
client *ConsumerClient
consumerCheckPointTracker *DefaultCheckPointTracker
shutdownFlag bool
lastFetchLogGroupList *sls.LogGroupList
nextFetchCursor string
lastFetchGroupCount int
lastFetchGroupCountBeforeQuery int
lastFetchTime time.Time
lastFetchRawSize int
lastFetchRawSizeBeforeQuery int
consumerStatus string
processor Processor
shardId int
// TODO: refine to channel
isCurrentDone bool
logger log.Logger
Expand Down Expand Up @@ -145,14 +147,21 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) {
}

func (consumer *ShardConsumerWorker) shouldFetch() bool {
if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4*1024*1024 {
lastFetchRawSize := consumer.lastFetchRawSize
lastFetchGroupCount := consumer.lastFetchGroupCount

if consumer.client.option.Query != "" {
lastFetchRawSize = consumer.lastFetchRawSizeBeforeQuery
lastFetchGroupCount = consumer.lastFetchGroupCountBeforeQuery
}
if lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 {
return true
}
duration := time.Since(consumer.lastFetchTime)
if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024*1024 {
if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 {
// The time used here is in milliseconds.
return duration > 500*time.Millisecond
} else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2*1024*1024 {
} else if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 {
return duration > 200*time.Millisecond
} else {
return duration > 50*time.Millisecond
Expand Down
16 changes: 13 additions & 3 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,26 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, nextCursor, rawSize, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
if err != nil {
return err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = nextCursor
consumer.lastFetchRawSize = rawSize
consumer.nextFetchCursor = pullLogMeta.NextCursor
consumer.lastFetchRawSize = pullLogMeta.RawSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
if consumer.client.option.Query != "" {
consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery
consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery
if consumer.lastFetchRawSizeBeforeQuery == -1 {
consumer.lastFetchRawSizeBeforeQuery = 0
}
if consumer.lastFetchGroupCountBeforeQuery == -1 {
consumer.lastFetchGroupCountBeforeQuery = 0
}
}
consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor)
level.Debug(consumer.logger).Log(
"shardId", consumer.shardId,
Expand Down
10 changes: 8 additions & 2 deletions example/consumer/query_demo/simple_demo_with_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func main() {
ConsumerName: "",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: consumerLibrary.BEGIN_CURSOR,
CursorPosition: consumerLibrary.SPECIAL_TIMER_CURSOR,
CursorStartTime: 1706077849,
// Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
Query: "* | where cast(body_bytes_sent as bigint) > 14000",
}
Expand All @@ -43,7 +44,12 @@ func main() {
// Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
// otherwise you will report errors.
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
fmt.Println(shardId, logGroupList)
for _, logGroup := range logGroupList.LogGroups {
for _, log := range logGroup.Logs {
fmt.Println("log_content: ", log.Contents)
}
}
fmt.Printf("shardId %v processing works sucess\n", shardId)
checkpointTracker.SaveCheckPoint(false)
return "", nil
}
61 changes: 39 additions & 22 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,16 @@ func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
return s.GetLogsBytesV2(plr)
}

// Deprecated: use GetLogsBytesWithQuery instead
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) {
out, plm, err := s.GetLogsBytesWithQuery(plr)
return out, plm.NextCursor, err
}

// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
Expand All @@ -463,12 +469,12 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, "", err
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, "", err
return
}

if r.StatusCode != http.StatusOK {
Expand All @@ -494,28 +500,34 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s
err = fmt.Errorf("unexpected compress type:%v", v[0])
return
}

pullLogMeta = &PullLogMeta{}
v, ok = r.Header["X-Log-Cursor"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-cursor' header")
return
}
nextCursor = v[0]

v, ok = r.Header["X-Log-Bodyrawsize"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-bodyrawsize' header")
return
}
bodyRawSize, err := strconv.Atoi(v[0])
pullLogMeta.NextCursor = v[0]
pullLogMeta.RawSize, err = ParseHeaderInt(r, "X-Log-Bodyrawsize")
if err != nil {
return nil, "", err
return
}

out = make([]byte, bodyRawSize)
if bodyRawSize != 0 {
if pullLogMeta.RawSize > 0 {
out = make([]byte, pullLogMeta.RawSize)
len := 0
if len, err = lz4.UncompressBlock(buf, out); err != nil || len != bodyRawSize {
if len, err = lz4.UncompressBlock(buf, out); err != nil || len != pullLogMeta.RawSize {
return
}
}
// If query is not nil, extract more headers
if plr.Query != "" {
// RawSizeBeforeQuery before data processing
pullLogMeta.RawSizeBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatasize")
if err != nil {
return
}
//lines before data processing
pullLogMeta.RawDataCountBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatacount")
if err != nil {
return
}
}
Expand Down Expand Up @@ -549,19 +561,24 @@ func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
return s.PullLogsV2(plr)
}

func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
// Deprecated: use PullLogsWithQuery instead
func (s *LogStore) PullLogsV2(plr *PullLogRequest) (*LogGroupList, string, error) {
gl, plm, err := s.PullLogsWithQuery(plr)
return gl, plm.NextCursor, err
}

out, nextCursor, err := s.GetLogsBytesV2(plr)
func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) {
out, plm, err := s.GetLogsBytesWithQuery(plr)
if err != nil {
return nil, "", err
return nil, nil, err
}

gl, err = LogsBytesDecode(out)
if err != nil {
return nil, "", err
return nil, nil, err
}

return gl, nextCursor, nil
return
}

// GetHistograms query logs with [from, to) time range
Expand Down
15 changes: 10 additions & 5 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ type PullLogRequest struct {
EndCursor string
LogGroupMaxCount int
Query string
PullMode string
// Deprecated: PullMode is not used
PullMode string
}

func (plr *PullLogRequest) ToURLParams() url.Values {
Expand All @@ -63,14 +64,18 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
}
if plr.Query != "" {
urlVal.Add("query", plr.Query)
urlVal.Add("pullMode", "scan_on_stream")
}
if plr.PullMode != "" {
urlVal.Add("pullMode", plr.PullMode)
}

return urlVal
}

type PullLogMeta struct {
NextCursor string
RawSize int
RawSizeBeforeQuery int
RawDataCountBeforeQuery int
}

// GetHistogramsResponse defines response from GetHistograms call
type SingleHistogram struct {
Progress string `json:"progress"`
Expand Down
Loading

0 comments on commit 4da4cf6

Please sign in to comment.