Skip to content

Commit

Permalink
drainer: process empty string from tidb in update/delete statement on…
Browse files Browse the repository at this point in the history
… oracle (#1163)

ref #1162
  • Loading branch information
cartersz committed May 25, 2022
1 parent b8a6395 commit ca0eeed
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/loader/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (dml *DML) buildOracleWhere(builder *strings.Builder, oracleHolderPos int)
if i > 0 {
builder.WriteString(" AND ")
}
if wargs[i] == nil {
if wargs[i] == nil || wargs[i] == "" {
builder.WriteString(escapeName(wnames[i]) + " IS NULL")
} else {
builder.WriteString(fmt.Sprintf("%s = :%d", escapeName(wnames[i]), pOracleHolderPos))
Expand Down Expand Up @@ -380,7 +380,7 @@ func (dml *DML) oracleDeleteNewValueSQL() (sql string, args []interface{}) {
if i > 0 {
builder.WriteString(" AND ")
}
if colValues[i] == nil {
if colValues[i] == nil || colValues[i] == "" {
builder.WriteString(escapeName(colNames[i]) + " IS NULL")
} else {
builder.WriteString(fmt.Sprintf("%s = :%d", colNames[i], oracleHolderPos))
Expand Down
103 changes: 103 additions & 0 deletions pkg/loader/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,45 @@ func (s *SQLSuite) TestOracleUpdateSQL(c *check.C) {
c.Assert(args[3], check.Equals, "pingcap")
}

func (s *SQLSuite) TestOracleUpdateSQLEmptyString(c *check.C) {
dml := DML{
Tp: UpdateDMLType,
Database: "db",
Table: "tbl",
Values: map[string]interface{}{
"ID": 123,
"NAME": "pc",
"OFFER": nil,
},
OldValues: map[string]interface{}{
"ID": 123,
"NAME": "",
"OFFER": nil,
},
info: &tableInfo{
columns: []string{"ID", "NAME", "OFFER"},
},
UpColumnsInfoMap: map[string]*model.ColumnInfo{
"ID": {
FieldType: types.FieldType{Tp: mysql.TypeInt24}},
"NAME": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
"OFFER": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
},
DestDBType: OracleDB,
}
sql, args := dml.sql()
c.Assert(
sql, check.Equals,
"UPDATE db.tbl SET ID = :1,NAME = :2,OFFER = :3 WHERE ID = :4 AND NAME IS NULL AND OFFER IS NULL AND rownum <=1")
c.Assert(args, check.HasLen, 4)
c.Assert(args[0], check.Equals, 123)
c.Assert(args[1], check.Equals, "pc")
c.Assert(args[2], check.Equals, nil)
c.Assert(args[3], check.Equals, 123)
}

func (s *SQLSuite) TestOracleUpdateSQLPrimaryKey(c *check.C) {
dml := DML{
Tp: UpdateDMLType,
Expand Down Expand Up @@ -379,6 +418,34 @@ func (s *SQLSuite) TestOracleDeleteSQL(c *check.C) {
c.Assert(args[1], check.Equals, "pc")
}

func (s *SQLSuite) TestOracleDeleteSQLEmptyString(c *check.C) {
dml := DML{
Tp: DeleteDMLType,
Database: "db",
Table: "tbl",
Values: map[string]interface{}{
"ID": 123,
"NAME": "",
},
info: &tableInfo{
columns: []string{"ID", "NAME"},
},
UpColumnsInfoMap: map[string]*model.ColumnInfo{
"ID": {
FieldType: types.FieldType{Tp: mysql.TypeInt24}},
"NAME": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
},
DestDBType: OracleDB,
}
sql, args := dml.sql()
c.Assert(
sql, check.Equals,
"DELETE FROM db.tbl WHERE ID = :1 AND NAME IS NULL AND rownum <=1")
c.Assert(args, check.HasLen, 1)
c.Assert(args[0], check.Equals, 123)
}

func (s *SQLSuite) TestOracleInsertSQL(c *check.C) {
dml := DML{
Tp: InsertDMLType,
Expand Down Expand Up @@ -568,3 +635,39 @@ func (s *SQLSuite) TestOracleDeleteNewValueSQLWithNoUK(c *check.C) {
c.Assert(args[1], check.Equals, "456")
c.Assert(args[2], check.Equals, "pc")
}

func (s *SQLSuite) TestOracleDeleteNewValueSQLEmptyString(c *check.C) {
dml := DML{
Tp: InsertDMLType,
Database: "db",
Table: "tbl",
Values: map[string]interface{}{
"ID": 123,
"ID2": "456",
"NAME": "",
"C2": nil,
},
info: &tableInfo{
columns: []string{"ID", "ID2", "NAME", "C2"},
},
UpColumnsInfoMap: map[string]*model.ColumnInfo{
"ID": {
FieldType: types.FieldType{Tp: mysql.TypeInt24}},
"ID2": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
"NAME": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
"C2": {
FieldType: types.FieldType{Tp: mysql.TypeVarString}},
},
DestDBType: OracleDB,
}

sql, args := dml.oracleDeleteNewValueSQL()
c.Assert(
sql, check.Equals,
"DELETE FROM db.tbl WHERE C2 IS NULL AND ID = :1 AND ID2 = :2 AND NAME IS NULL AND rownum <=1")
c.Assert(args, check.HasLen, 2)
c.Assert(args[0], check.Equals, 123)
c.Assert(args[1], check.Equals, "456")
}

0 comments on commit ca0eeed

Please sign in to comment.