diff --git a/README.md b/README.md index 739aebed..33030911 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,14 @@ go get -u github.com/aliyun/aliyun-log-go-sdk Client = sls.CreateNormalInterfaceV2(Endpoint, credentialsProvider) ``` + 为了防止出现配置错误,您可以在创建 Client 之后,测试 Client 是否能成功调用 SLS API + ```go + _, err := Client.ListProject() + if err != nil { + panic(err) + } + ``` + 2. **创建project** 参考 [log_project.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/project/log_project.go)文件 @@ -97,8 +105,6 @@ go get -u github.com/aliyun/aliyun-log-go-sdk 5. **写数据** - 参考[put_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/put_log.go) - 这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。 ```go @@ -143,8 +149,6 @@ go get -u github.com/aliyun/aliyun-log-go-sdk 6.**读数据** -参考[pull_log.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/loghub/pull_log.go) - 这里展示了使用SDK中原生API接口调用去拉取数据的方式,我们不推荐使用这种方式去读取消费logstore中的数据,推荐使用SDK中 [consumer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/consumer) 消费组去拉取数据,消费组提供自动负载均衡以及失败重试等机制,并且会自动保存拉取断点,再次拉取不会拉取重复数据。 ```go diff --git a/client_interface.go b/client_interface.go index 0dee411a..4eb338b0 100644 --- a/client_interface.go +++ b/client_interface.go @@ -264,14 +264,18 @@ type ClientInterface interface { // The nextCursor is the next curosr can be used to read logs at next time. GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (out []byte, nextCursor string, err error) + // Deprecated: Use GetLogsBytesWithQuery instead. GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) + GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error) // PullLogs gets logs from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next cursor can be used to read logs at next time. // @note if you want to pull logs continuous, set endCursor = "" PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) + // Deprecated: Use PullLogsWithQuery instead. PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) + PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) // GetHistograms query logs with [from, to) time range GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) // GetLogs query logs with [from, to) time range diff --git a/client_store.go b/client_store.go index 017da696..09bdbc11 100644 --- a/client_store.go +++ b/client_store.go @@ -202,6 +202,11 @@ func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor str return ls.GetLogsBytesV2(plr) } +func (c *Client) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.GetLogsBytesWithQuery(plr) +} + // PullLogs gets logs from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next cursor can be used to read logs at next time. @@ -217,6 +222,11 @@ func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor s return ls.PullLogsV2(plr) } +func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.PullLogsWithQuery(plr) +} + // GetHistograms query logs with [from, to) time range func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) { ls := convertLogstore(c, project, logstore) diff --git a/consumer/README.md b/consumer/README.md index 20a21d56..c9808410 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -64,6 +64,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可 |SecurityToken|aliyun SecurityToken|非必填,参考https://help.aliyun.com/document_detail/47277.html| |AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用| |AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s| +|Query|过滤规则 基于规则消费时必须设置对应规则 如 *| where a = 'xxx'|非必填| 2.**覆写消费逻辑** diff --git a/consumer/config.go b/consumer/config.go index 09713e9a..a3f303e2 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -14,7 +14,7 @@ type LogHubConfig struct { //:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken) //:param Project: //:param Logstore: - //:param Query: + //:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx' //:param ConsumerGroupName: //:param ConsumerName: //:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed. diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 2234532b..cc6e0597 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -131,8 +131,7 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err return cursor, err } -func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) { - var logBytes []byte +func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, plm *sls.PullLogMeta, err error) { plr := &sls.PullLogRequest{ Project: consumer.option.Project, Logstore: consumer.option.Logstore, @@ -141,20 +140,10 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo Cursor: cursor, LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount, } - if plr.Query != "" { - plr.PullMode = "scan_on_stream" - } for retry := 0; retry < 3; retry++ { - logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr) - if err == nil { - rawSize = len(logBytes) - gl, err = sls.LogsBytesDecode(logBytes) - if err == nil { - break - } - } + gl, plm, err = consumer.client.PullLogsWithQuery(plr) if err != nil { - slsError, ok := err.(sls.Error) + slsError, ok := err.(*sls.Error) if ok { level.Warn(consumer.logger).Log("msg", "shard pull logs failed, occur sls error", "shard", shardId, diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index b9aa66b3..64a9e3f1 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -10,17 +10,19 @@ import ( ) type ShardConsumerWorker struct { - client *ConsumerClient - consumerCheckPointTracker *DefaultCheckPointTracker - shutdownFlag bool - lastFetchLogGroupList *sls.LogGroupList - nextFetchCursor string - lastFetchGroupCount int - lastFetchTime time.Time - lastFetchRawSize int - consumerStatus string - processor Processor - shardId int + client *ConsumerClient + consumerCheckPointTracker *DefaultCheckPointTracker + shutdownFlag bool + lastFetchLogGroupList *sls.LogGroupList + nextFetchCursor string + lastFetchGroupCount int + lastFetchGroupCountBeforeQuery int + lastFetchTime time.Time + lastFetchRawSize int + lastFetchRawSizeBeforeQuery int + consumerStatus string + processor Processor + shardId int // TODO: refine to channel isCurrentDone bool logger log.Logger @@ -145,14 +147,21 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) { } func (consumer *ShardConsumerWorker) shouldFetch() bool { - if consumer.lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || consumer.lastFetchRawSize >= 4*1024*1024 { + lastFetchRawSize := consumer.lastFetchRawSize + lastFetchGroupCount := consumer.lastFetchGroupCount + + if consumer.client.option.Query != "" { + lastFetchRawSize = consumer.lastFetchRawSizeBeforeQuery + lastFetchGroupCount = consumer.lastFetchGroupCountBeforeQuery + } + if lastFetchGroupCount >= consumer.client.option.MaxFetchLogGroupCount || lastFetchRawSize >= 4*1024*1024 { return true } duration := time.Since(consumer.lastFetchTime) - if consumer.lastFetchGroupCount < 100 && consumer.lastFetchRawSize < 1024*1024 { + if lastFetchGroupCount < 100 && lastFetchRawSize < 1024*1024 { // The time used here is in milliseconds. return duration > 500*time.Millisecond - } else if consumer.lastFetchGroupCount < 500 && consumer.lastFetchRawSize < 2*1024*1024 { + } else if lastFetchGroupCount < 500 && lastFetchRawSize < 2*1024*1024 { return duration > 200*time.Millisecond } else { return duration > 50*time.Millisecond diff --git a/consumer/tasks.go b/consumer/tasks.go index 367db164..1df4fd66 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -49,16 +49,26 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - logGroup, nextCursor, rawSize, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) if err != nil { return err } // set cursors user to decide whether to save according to the execution of `process` consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor) consumer.lastFetchLogGroupList = logGroup - consumer.nextFetchCursor = nextCursor - consumer.lastFetchRawSize = rawSize + consumer.nextFetchCursor = pullLogMeta.NextCursor + consumer.lastFetchRawSize = pullLogMeta.RawSize consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + if consumer.client.option.Query != "" { + consumer.lastFetchRawSizeBeforeQuery = pullLogMeta.RawSizeBeforeQuery + consumer.lastFetchGroupCountBeforeQuery = pullLogMeta.RawDataCountBeforeQuery + if consumer.lastFetchRawSizeBeforeQuery == -1 { + consumer.lastFetchRawSizeBeforeQuery = 0 + } + if consumer.lastFetchGroupCountBeforeQuery == -1 { + consumer.lastFetchGroupCountBeforeQuery = 0 + } + } consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor) level.Debug(consumer.logger).Log( "shardId", consumer.shardId, diff --git a/example/consumer/query_demo/simple_demo_with_query.go b/example/consumer/query_demo/simple_demo_with_query.go index 06734a35..444f1bd6 100644 --- a/example/consumer/query_demo/simple_demo_with_query.go +++ b/example/consumer/query_demo/simple_demo_with_query.go @@ -25,7 +25,8 @@ func main() { ConsumerName: "", // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. - CursorPosition: consumerLibrary.BEGIN_CURSOR, + CursorPosition: consumerLibrary.SPECIAL_TIMER_CURSOR, + CursorStartTime: 1706077849, // Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption Query: "* | where cast(body_bytes_sent as bigint) > 14000", } @@ -43,7 +44,12 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { - fmt.Println(shardId, logGroupList) + for _, logGroup := range logGroupList.LogGroups { + for _, log := range logGroup.Logs { + fmt.Println("log_content: ", log.Contents) + } + } + fmt.Printf("shardId %v processing works sucess\n", shardId) checkpointTracker.SaveCheckPoint(false) return "", nil } diff --git a/log_store.go b/log_store.go index a7be28be..0ceb3016 100644 --- a/log_store.go +++ b/log_store.go @@ -448,10 +448,16 @@ func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string, return s.GetLogsBytesV2(plr) } +// Deprecated: use GetLogsBytesWithQuery instead +func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) ([]byte, string, error) { + out, plm, err := s.GetLogsBytesWithQuery(plr) + return out, plm.NextCursor, err +} + // GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next curosr can be used to read logs at next time. -func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) { +func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Accept": "application/x-protobuf", @@ -463,12 +469,12 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s r, err := request(s.project, "GET", uri, h, nil) if err != nil { - return nil, "", err + return } defer r.Body.Close() buf, err := ioutil.ReadAll(r.Body) if err != nil { - return nil, "", err + return } if r.StatusCode != http.StatusOK { @@ -494,28 +500,34 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s err = fmt.Errorf("unexpected compress type:%v", v[0]) return } - + pullLogMeta = &PullLogMeta{} v, ok = r.Header["X-Log-Cursor"] if !ok || len(v) == 0 { err = fmt.Errorf("can't find 'x-log-cursor' header") return } - nextCursor = v[0] - - v, ok = r.Header["X-Log-Bodyrawsize"] - if !ok || len(v) == 0 { - err = fmt.Errorf("can't find 'x-log-bodyrawsize' header") - return - } - bodyRawSize, err := strconv.Atoi(v[0]) + pullLogMeta.NextCursor = v[0] + pullLogMeta.RawSize, err = ParseHeaderInt(r, "X-Log-Bodyrawsize") if err != nil { - return nil, "", err + return } - - out = make([]byte, bodyRawSize) - if bodyRawSize != 0 { + if pullLogMeta.RawSize > 0 { + out = make([]byte, pullLogMeta.RawSize) len := 0 - if len, err = lz4.UncompressBlock(buf, out); err != nil || len != bodyRawSize { + if len, err = lz4.UncompressBlock(buf, out); err != nil || len != pullLogMeta.RawSize { + return + } + } + // If query is not nil, extract more headers + if plr.Query != "" { + // RawSizeBeforeQuery before data processing + pullLogMeta.RawSizeBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatasize") + if err != nil { + return + } + //lines before data processing + pullLogMeta.RawDataCountBeforeQuery, err = ParseHeaderInt(r, "X-Log-Rawdatacount") + if err != nil { return } } @@ -549,19 +561,24 @@ func (s *LogStore) PullLogs(shardID int, cursor, endCursor string, return s.PullLogsV2(plr) } -func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) { +// Deprecated: use PullLogsWithQuery instead +func (s *LogStore) PullLogsV2(plr *PullLogRequest) (*LogGroupList, string, error) { + gl, plm, err := s.PullLogsWithQuery(plr) + return gl, plm.NextCursor, err +} - out, nextCursor, err := s.GetLogsBytesV2(plr) +func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { + out, plm, err := s.GetLogsBytesWithQuery(plr) if err != nil { - return nil, "", err + return nil, nil, err } gl, err = LogsBytesDecode(out) if err != nil { - return nil, "", err + return nil, nil, err } - return gl, nextCursor, nil + return } // GetHistograms query logs with [from, to) time range diff --git a/model.go b/model.go index bfa0382c..5e6008c4 100644 --- a/model.go +++ b/model.go @@ -50,7 +50,8 @@ type PullLogRequest struct { EndCursor string LogGroupMaxCount int Query string - PullMode string + // Deprecated: PullMode is not used + PullMode string } func (plr *PullLogRequest) ToURLParams() url.Values { @@ -63,14 +64,18 @@ func (plr *PullLogRequest) ToURLParams() url.Values { } if plr.Query != "" { urlVal.Add("query", plr.Query) + urlVal.Add("pullMode", "scan_on_stream") } - if plr.PullMode != "" { - urlVal.Add("pullMode", plr.PullMode) - } - return urlVal } +type PullLogMeta struct { + NextCursor string + RawSize int + RawSizeBeforeQuery int + RawDataCountBeforeQuery int +} + // GetHistogramsResponse defines response from GetHistograms call type SingleHistogram struct { Progress string `json:"progress"` diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 184162ff..3a4a7467 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -816,6 +816,7 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i return c.GetLogsBytesV2(plr) } +// Deprecated: use GetLogsBytesWithQuery instead func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) { for i := 0; i < c.maxTryTimes; i++ { out, nextCursor, err = c.logClient.GetLogsBytesV2(plr) @@ -826,6 +827,16 @@ func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, return } +func (c *TokenAutoUpdateClient) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, plm *PullLogMeta, err error) { + for i := 0; i < c.maxTryTimes; i++ { + out, plm, err = c.logClient.GetLogsBytesWithQuery(plr) + if !c.processError(err) { + return + } + } + return +} + func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) { plr := &PullLogRequest{ @@ -839,6 +850,7 @@ func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, return c.PullLogsV2(plr) } +// Deprecated: use PullLogsWithQuery instead func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) { for i := 0; i < c.maxTryTimes; i++ { gl, nextCursor, err = c.logClient.PullLogsV2(plr) @@ -849,6 +861,16 @@ func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupLis return } +func (c *TokenAutoUpdateClient) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, plm *PullLogMeta, err error) { + for i := 0; i < c.maxTryTimes; i++ { + gl, plm, err = c.logClient.PullLogsWithQuery(plr) + if !c.processError(err) { + return + } + } + return +} + func (c *TokenAutoUpdateClient) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (h *GetHistogramsResponse, err error) { for i := 0; i < c.maxTryTimes; i++ { h, err = c.logClient.GetHistograms(project, logstore, topic, from, to, queryExp) diff --git a/utils.go b/utils.go index b1b11795..f9b1f263 100644 --- a/utils.go +++ b/utils.go @@ -1,6 +1,11 @@ package sls -import "strconv" +import ( + "fmt" + "net/http" + "strconv" + "strings" +) func BoolToInt64(b bool) int64 { if b { @@ -25,3 +30,15 @@ func Int64PtrToString(i *int64) string { } return strconv.FormatInt(*i, 10) } + +func ParseHeaderInt(r *http.Response, headerName string) (int, error) { + values := r.Header[headerName] + if len(values) > 0 { + value, err := strconv.Atoi(values[0]) + if err != nil { + return -1, fmt.Errorf("can't parse '%s' header: %v", strings.ToLower(headerName), err) + } + return value, nil + } + return -1, fmt.Errorf("can't find '%s' header", strings.ToLower(headerName)) +}