Skip to content

Commit

Permalink
Merge pull request #108 from AlexStocks/master
Browse files Browse the repository at this point in the history
[add] get raw log interface
  • Loading branch information
shabicheng authored Nov 11, 2020
2 parents 9c99819 + ec3d3d5 commit 982232d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 18 deletions.
2 changes: 2 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ type ClientInterface interface {
// GetLogs query logs with [from, to) time range
GetLogs(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error)
GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error)

// #################### Index Operations #####################
// CreateIndex ...
Expand Down
7 changes: 7 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ func (c *Client) GetLogs(project, logstore string, topic string, from int64, to
return ls.GetLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
}

// GetLogLines ...
func (c *Client) GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogLines(topic, from, to, queryExp, maxLineNum, offset, reverse)
}

// CreateIndex ...
func (c *Client) CreateIndex(project, logstore string, index Index) error {
ls := convertLogstore(c, project, logstore)
Expand Down
2 changes: 1 addition & 1 deletion example/consumer/copy_data/copy_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ func process(shardId int, logGroupList *sls.LogGroupList) string {
fmt.Println(err)
}
}
fmt.Println("shardId %v processing works sucess", shardId)
fmt.Printf("shardId %v processing works sucess\n", shardId)
return ""
}
65 changes: 48 additions & 17 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,9 @@ func (s *LogStore) GetHistograms(topic string, from int64, to int64, queryExp st
return &getHistogramsResponse, nil
}

// GetLogs query logs with [from, to) time range
func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
// getLogs query logs with [from, to) time range
func (s *LogStore) getLogs(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*http.Response, []byte, *GetLogsResponse, error) {

h := map[string]string{
"x-log-bodyrawsize": "0",
Expand All @@ -538,30 +538,24 @@ func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
urlVal.Add("query", queryExp)

uri := fmt.Sprintf("/logstores/%s?%s", s.Name, urlVal.Encode())

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, NewClientError(err)
return nil, nil, nil, NewClientError(err)
}
defer r.Body.Close()

body, _ := ioutil.ReadAll(r.Body)
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(body, err); jErr != nil {
return nil, NewBadResponseError(string(body), r.Header, r.StatusCode)
return nil, nil, nil, NewBadResponseError(string(body), r.Header, r.StatusCode)
}
return nil, err
}

logs := []map[string]string{}
err = json.Unmarshal(body, &logs)
if err != nil {
return nil, NewBadResponseError(string(body), r.Header, r.StatusCode)
return nil, nil, nil, err
}

count, err := strconv.ParseInt(r.Header[GetLogsCountHeader][0], 10, 32)
if err != nil {
return nil, err
return nil, nil, nil, err
}
var contents string
if _, ok := r.Header[GetLogsQueryInfo]; ok {
Expand All @@ -573,15 +567,52 @@ func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
if sqlHeaderArray, ok := r.Header[HasSQLHeader]; ok && len(sqlHeaderArray) > 0 && sqlHeaderArray[0] == "true" {
hasSQL = true
}
getLogsResponse := GetLogsResponse{

return r, body, &GetLogsResponse{
Progress: r.Header[ProgressHeader][0],
Count: count,
Logs: logs,
Contents: contents,
HasSQL: hasSQL,
}, nil
}

// GetJsonLogs query logs with [from, to) time range
func (s *LogStore) GetLogLines(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {

rsp, b, logRsp, err := s.getLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
if err != nil {
return nil, err
}
var logs []json.RawMessage
err = json.Unmarshal(b, &logs)
if err != nil {
return nil, NewBadResponseError(string(b), rsp.Header, rsp.StatusCode)
}

lineRsp := GetLogLinesResponse{
GetLogsResponse: *logRsp,
Lines: logs,
}

return &lineRsp, nil
}

// GetLogs query logs with [from, to) time range
func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {

rsp, b, logRsp, err := s.getLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
if err == nil && len(b) != 0 {
logs := []map[string]string{}
err = json.Unmarshal(b, &logs)
if err != nil {
return nil, NewBadResponseError(string(b), rsp.Header, rsp.StatusCode)
}
logRsp.Logs = logs
}

return &getLogsResponse, nil
return logRsp, err
}

// GetContextLogs ...
Expand Down
7 changes: 7 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ type GetLogsResponse struct {
HasSQL bool `json:"hasSQL"`
}

// GetLogLinesResponse defines response from GetLogLines call
// note: GetLogLinesResponse.Logs is nil when use GetLogLinesResponse
type GetLogLinesResponse struct {
GetLogsResponse
Lines []json.RawMessage
}

func (resp *GetLogsResponse) IsComplete() bool {
return strings.ToLower(resp.Progress) == "complete"
}
Expand Down
11 changes: 11 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,17 @@ func (c *TokenAutoUpdateClient) GetLogs(project, logstore string, topic string,
return
}

func (c *TokenAutoUpdateClient) GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (r *GetLogLinesResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.GetLogLines(project, logstore, topic, from, to, queryExp, maxLineNum, offset, reverse)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) CreateIndex(project, logstore string, index Index) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.CreateIndex(project, logstore, index)
Expand Down

0 comments on commit 982232d

Please sign in to comment.