Skip to content

Commit

Permalink
Feature support s3 and add examples (#252)
Browse files Browse the repository at this point in the history
* update

* add test case

* init

---------

Co-authored-by: fanzhonghao.fzh@alibaba-inc.com <fanzhonghao.fzh@alibaba-inc.com>
  • Loading branch information
fanzhonghao and fanzhonghao.fzh@alibaba-inc.com authored Jan 25, 2024
1 parent 3ab030e commit bb56350
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
14 changes: 14 additions & 0 deletions client_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
DataSourceKafka DataSourceType = "Kafka"
DataSourceCMS DataSourceType = "AliyunCloudMonitor"
DataSourceGeneral DataSourceType = "General"
DataSourceS3 DataSourceType = "S3"

OSSDataFormatTypeLine OSSDataFormatType = "Line"
OSSDataFormatTypeMultiline OSSDataFormatType = "Multiline"
Expand Down Expand Up @@ -196,6 +197,19 @@ type (
VpcId string `json:"vpcId"`
}

S3Source struct {
DataSource
AWSAccessKey string `json:"awsAccessKey"`
AWSAccessKeySecret string `json:"awsAccessKeySecret"`
AWSRegion string `json:"awsRegion"`
Bucket string `json:"bucket"`
Prefix string `json:"prefix,omitempty"`
Pattern string `json:"pattern,omitempty"`
CompressionCodec string `json:"compressionCodec,omitempty"`
Encoding string `json:"encoding,omitempty"`
Format interface{} `json:"format,omitempty"`
}

// ingestion JDBC source
AliyunBssSource struct {
DataSource
Expand Down
3 changes: 3 additions & 0 deletions example/ingestion/kafka_ingestion_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func main() {
BootStrapServers: "123.123.123.123:9092", // TODO
ValueType: "json", // TODO
FromPosition: "lastest", // TODO
Communication: "{\"protocol\":\"sasl_ssl\",\"sasl\":{\"password\":\"yyy\",\"mechanism\":\"plain\",\"username\":\"xxx\"}}",
NameResolutions: "{\"localhost\":\"127.0.0.1\"}",
VpcId: "vpc-asdasdas",
}
source_tmp, _ := json.Marshal(&kafkaSource)
var source map[string]interface{}
Expand Down
67 changes: 67 additions & 0 deletions example/ingestion/s3_ingestion_sample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"encoding/json"
"fmt"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/example/util"
)

func main() {

Check failure on line 10 in example/ingestion/s3_ingestion_sample.go

View workflow job for this annotation

GitHub Actions / build

main redeclared in this block
fmt.Println("create s3 ingestion sample begin")
logstoreName := util.LogStoreName
project := util.ProjectName
client := util.Client
base := sls.BaseJob{
Name: "ingest-s3-test2", // TODO
DisplayName: "s3 bucket import", // TODO
Description: "test-s3", // TODO
Type: "Ingestion", // default
}
sj := sls.ScheduledJob{
BaseJob: base,
Schedule: &sls.Schedule{
Type: "Resident", // default
},
}

s3Source := sls.S3Source{
DataSource: sls.DataSource{DataSourceType: sls.DataSourceS3},
AWSAccessKey: util.AWSAccessKey,
AWSAccessKeySecret: util.AWSAccessKeySecret,
AWSRegion: "", // TODO
Bucket: "", // TODO
Prefix: "", // TODO
Format: map[string]string{
"type": "json",
"encoding": "UTF-8",
"interval": "5m",
},
CompressionCodec: "none",
}
source_tmp, _ := json.Marshal(&s3Source)
var source map[string]interface{}
_ = json.Unmarshal(source_tmp, &source)

for k, v := range source {
if v == nil {
delete(source, k)
}
}

ingestion := &sls.Ingestion{
ScheduledJob: sj,
IngestionConfiguration: &sls.IngestionConfiguration{
Version: "v2.0",
LogStore: logstoreName,
NumberOfInstance: 0,
DataSource: source,
},
}
if err := client.CreateIngestion(project, ingestion); err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("create s3 ingestion over")
}

}

0 comments on commit bb56350

Please sign in to comment.