Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support concat and change to picker #248

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,24 @@ schema:
alternativeIndices:
- 7
- 8

# concatItems examples
schema:
type: vertex
vertex:
vid:
concatItems:
- "abc"
- 1
function: hash
```

##### `schema.vertex.vid`

**Optional**. Describes the vertex ID column and the function used for the vertex ID.

* `index`: **Optional**. The column number in the CSV file. Started with 0. The default value is 0.
* `concatItems`: **Optional**. The concat item can be `string`, `int` or mixed. `string` represents a constant, and `int` represents an index column. Then connect all items.If set, the above `index` will have no effect.
* `function`: **Optional**. Functions to generate the VIDs. Currently, we only support function `hash` and `uuid`.
* `type`: **Optional**. The type for VIDs. The default value is `string`.
* `prefix`: **Optional**. Add prefix to the original vid. When `function` is specified also, `prefix` is applied to the original vid before `function`.
Expand Down
10 changes: 10 additions & 0 deletions README_zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,23 @@ schema:
alternativeIndices:
- 7
- 8
# concatItems examples
schema:
type: vertex
vertex:
vid:
concatItems:
- "abc"
- 1
function: hash
```

##### `schema.vertex.vid`

**可选**。描述点 VID 所在的列和使用的函数。

- `index`:**可选**。在 CSV 文件中的列标,从 0 开始计数。默认值 0。
- `concatItems`: **可选**. 连接项可以是`string`、`int`或者混合。`string`代表常量,`int`表示索引列。然后连接所有的项。如果设置了,上面的`index`将不生效。
- `function`:**可选**。用来生成 VID 时的函数,有 `hash` 和 `uuid` 两种函数可选。
- `prefix`: **可选**。给 原始vid 添加的前缀,当同时指定了 `function` 时, 生成 VID 的方法是先添加 `prefix` 前缀, 再用 `function`生成 VID。

Expand Down
32 changes: 32 additions & 0 deletions examples/v1/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ files:
- name: name
type: string

- path: ./course.csv
failDataPath: ./err/course-concat.csv
batchSize: 2
inOrder: true
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: vertex
vertex:
vid:
type: int
concatItems: # "c1{index0}c2{index1}2"
- "c1"
- 0
- c2
- 1
- "2"
function: hash
tags:
- name: course
props:
- name: name
type: string
- name: credits
type: int
- name: building
props:
- name: name
type: string

- path: ./course-with-header.csv
failDataPath: ./err/course-with-header.csv
batchSize: 2
Expand Down
31 changes: 31 additions & 0 deletions examples/v2/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,37 @@ files:
- name: name
type: string

- path: ./course.csv
failDataPath: ./err/course-concat.csv
batchSize: 2
inOrder: true
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: vertex
vertex:
vid:
type: string
concatItems: # "c1{index0}c2{index1}2"
- "c1"
- 0
- c2
- 1
- "2"
tags:
- name: course
props:
- name: name
type: string
- name: credits
type: int
- name: building
props:
- name: name
type: string

- path: ./course-with-header.csv
failDataPath: ./err/course-with-header.csv
batchSize: 2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.1
)

go 1.13
153 changes: 94 additions & 59 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/vesoft-inc/nebula-importer/pkg/base"
ierrors "github.com/vesoft-inc/nebula-importer/pkg/errors"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
"github.com/vesoft-inc/nebula-importer/pkg/picker"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -58,13 +59,16 @@ type Prop struct {
NullValue string `json:"nullValue" yaml:"nullValue"`
AlternativeIndices []int `json:"alternativeIndices" yaml:"alternativeIndices"`
DefaultValue *string `json:"defaultValue" yaml:"defaultValue"`
picker picker.Picker
}

type VID struct {
Index *int `json:"index" yaml:"index"`
Function *string `json:"function" yaml:"function"`
Type *string `json:"type" yaml:"type"`
Prefix *string `json:"prefix" yaml:"prefix"`
Index *int `json:"index" yaml:"index"`
ConcatItems []interface{} `json:"concatItems" yaml:"concatItems"` // only string and int is support, int is for Index
Function *string `json:"function" yaml:"function"`
Type *string `json:"type" yaml:"type"`
Prefix *string `json:"prefix" yaml:"prefix"`
picker picker.Picker
}

type Rank struct {
Expand Down Expand Up @@ -494,7 +498,6 @@ func (s *Schema) validateAndReset(prefix string) error {
func (v *VID) ParseFunction(str string) (err error) {
i := strings.Index(str, "(")
j := strings.Index(str, ")")
err = nil
if i < 0 && j < 0 {
v.Function = nil
v.Type = &kDefaultVidType
Expand Down Expand Up @@ -533,25 +536,11 @@ func (v *VID) String(vid string) string {
}

func (v *VID) FormatValue(record base.Record) (string, error) {
if len(record) <= *v.Index {
return "", fmt.Errorf("vid index(%d) out of range record length(%d)", *v.Index, len(record))
}
vid := record[*v.Index]
if v.Prefix != nil {
vid = *v.Prefix + vid
}
if v.Function == nil || *v.Function == "" {
if err := checkVidFormat(vid, *v.Type == "int"); err != nil {
return "", err
}
if *v.Type == "string" {
return fmt.Sprintf("%q", vid), nil
} else {
return vid, nil
}
} else {
return fmt.Sprintf("%s(%q)", *v.Function, vid), nil
value, err := v.picker.Pick(record)
if err != nil {
return "", err
}
return value.Val, nil
}

func (v *VID) checkFunction(prefix string) error {
Expand Down Expand Up @@ -585,7 +574,48 @@ func (v *VID) validateAndReset(prefix string, defaultVal int) error {
v.Type = &kDefaultVidType
logger.Log.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type)
}
return nil

return v.InitPicker()
}

func (v *VID) InitPicker() error {
pickerConfig := picker.Config{
Type: *v.Type,
Function: v.Function,
}

hasPrefix := v.Prefix != nil && *v.Prefix != ""

if len(v.ConcatItems) > 0 {
if hasPrefix {
pickerConfig.ConcatItems.AddConstant(*v.Prefix)
}
for i, item := range v.ConcatItems {
switch val := item.(type) {
case int:
pickerConfig.ConcatItems.AddIndex(val)
case string:
pickerConfig.ConcatItems.AddConstant(val)
default:
return fmt.Errorf("ConcatItems only support int or string, but the %d is %v", i, val)
}
}
} else if hasPrefix {
pickerConfig.ConcatItems.AddConstant(*v.Prefix)
pickerConfig.ConcatItems.AddIndex(*v.Index)
} else {
pickerConfig.Indices = []int{*v.Index}
}

if (v.Function == nil || *v.Function == "") && strings.EqualFold(*v.Type, "int") {
pickerConfig.CheckOnPost = func(v *picker.Value) error {
return checkVidFormat(v.Val, true)
}
}

var err error
v.picker, err = pickerConfig.Build()
return err
}

func (r *Rank) validateAndReset(prefix string, defaultVal int) error {
Expand Down Expand Up @@ -690,22 +720,23 @@ func (e *Edge) validateAndReset(prefix string) error {
if e.Name == nil {
return fmt.Errorf("Please configure edge name in: %s.name", prefix)
}
if e.SrcVID != nil {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil {
return err
}
} else {

if e.SrcVID == nil {
index := 0
e.SrcVID = &VID{Index: &index, Type: &kDefaultVidType}
}
if e.DstVID != nil {
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil {
return err
}
} else {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil {
return err
}

if e.DstVID == nil {
index := 1
e.DstVID = &VID{Index: &index, Type: &kDefaultVidType}
}
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil {
return err
}

start := 2
if e.Rank != nil {
if err := e.Rank.validateAndReset(fmt.Sprintf("%s.rank", prefix), 2); err != nil {
Expand Down Expand Up @@ -792,14 +823,13 @@ func (v *Vertex) validateAndReset(prefix string) error {
// if v.Tags == nil {
// return fmt.Errorf("Please configure %.tags", prefix)
// }
if v.VID != nil {
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil {
return err
}
} else {
if v.VID == nil {
index := 0
v.VID = &VID{Index: &index, Type: &kDefaultVidType}
}
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil {
return err
}
j := 1
for i := range v.Tags {
if v.Tags[i] != nil {
Expand Down Expand Up @@ -834,28 +864,11 @@ func (p *Prop) IsGeographyType() bool {
}

func (p *Prop) FormatValue(record base.Record) (string, error) {
r, isNull, err := p.getValue(record)
value, err := p.picker.Pick(record)
if err != nil {
return "", err
}
if isNull {
return r, err
}
if p.IsStringType() {
return fmt.Sprintf("%q", r), nil
}
if p.IsDateOrTimeType() {
if p.IsTimestampType() && reTimestampInteger.MatchString(r) {
return fmt.Sprintf("%s(%s)", strings.ToLower(*p.Type), r), nil
}
return fmt.Sprintf("%s(%q)", strings.ToLower(*p.Type), r), nil
}
// Only support wkt for geography currently
if p.IsGeographyType() {
return fmt.Sprintf("ST_GeogFromText(%q)", r), nil
}

return r, nil
return value.Val, nil
}

func (p *Prop) getValue(record base.Record) (string, bool, error) {
Expand Down Expand Up @@ -903,7 +916,29 @@ func (p *Prop) validateAndReset(prefix string, val int) error {
return fmt.Errorf("Invalid prop index: %d, name: %s, type: %s", *p.Index, *p.Name, *p.Type)
}
}
return nil
return p.InitPicker()
}

func (p *Prop) InitPicker() error {
pickerConfig := picker.Config{
Indices: []int{*p.Index},
Type: *p.Type,
}

if p.Nullable {
pickerConfig.Nullable = func(s string) bool {
return s == p.NullValue
}
pickerConfig.NullValue = dbNULL
if len(p.AlternativeIndices) > 0 {
pickerConfig.Indices = append(pickerConfig.Indices, p.AlternativeIndices...)
}
pickerConfig.DefaultValue = p.DefaultValue
}

var err error
p.picker, err = pickerConfig.Build()
return err
}

func (t *Tag) FormatValues(record base.Record) (string, bool, error) {
Expand Down
Loading