Skip to content

Commit

Permalink
feat: added new option to initialise null timestamps to time.Now() (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
belitre authored Apr 14, 2020
1 parent c4b06ac commit 106aa82
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 13 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func ParseToJSONAvro() error {
* `WithTimestampToMillis()` will add milliseconds to timestamps, only works for `logicalType="timestamp-millis"` fields: `{"test": 1571128870}` => `{"test": time.Time(1571128870000)}`
* `WithTimestampToMicros()` will add microseconds to timestamps, only works for `logicalType="timestamp-micros"` fields: `{"test": 1571128870}` => `{"test": time.Time(1571128870000000)}`
* `WithDateTimeFormat(format string)` will try to parse a string to a timestamp using the format specified as param, only works for `logicalType="timestamp-millis"` or `logicalType="timestamp-micros"` fields: `{"test": "2019-10-14T12:45:18Z"}` => (using `time.RFC3339` as format and type `logicalType="timestamp-millis`) => `{"test": time.Time(15710571180000)}`
* `WithNowForNullTimestamp` will set `time.Now()` if the field is null, only works for `logicalType="timestamp-millis"` or `logicalType="timestamp-micros"` fields.

### Supported types

Expand All @@ -165,12 +166,12 @@ Not all the avro types are supported by `avro-kedavro` yet! The current supporte

Unsupported types:

| Avro |
| ------------------ |
| `enum` |
| `fixed` |
| `map` |
| `array` |
| Avro |
| ------- |
| `enum` |
| `fixed` |
| `map` |
| `array` |

### Supported Unions

Expand Down Expand Up @@ -207,4 +208,5 @@ Accepted values in json for timestamps are:
* If the string is a number with decimals: it will be treated as a timestamp where the decimals will be consider fractions of seconds.
* If the selected type is `timestamp-millis` the parser will keep the first three decimals.
* If the selected type is `timestamp-micros` the parser will keep the first six decimals.
* If the string has non-numeric characters: the parser will try to parse the string to `time.Time` using the provided format with the option `WithDateTimeFormat(format string)`
* If the string has non-numeric characters: the parser will try to parse the string to `time.Time` using the provided format with the option `WithDateTimeFormat(format string)`
* `null`: only if `WithNowForNullTimestamp()` option is provided. When the option is provided, if a null is found for a `timestamp-millis` or `timestamp-micros` field, `time.Now()` will be used as value.
115 changes: 115 additions & 0 deletions pkg/kedavro/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,69 @@ func TestTimestampToMillis(t *testing.T) {
result, err = parser.Parse([]byte(jsonRecord))
assert.Error(t, err)
assert.Nil(t, result)

jsonRecord = `
{"bleh": "blah"}
`

parser, err = NewParser(schema)
assert.NoError(t, err)

result, err = parser.Parse([]byte(jsonRecord))
assert.Error(t, err)
assert.Nil(t, result)

jsonRecord = `
{"bleh": "blah"}
`

parser, err = NewParser(schema, WithNowForNullTimestamp())
assert.NoError(t, err)

result, err = parser.Parse([]byte(jsonRecord))
assert.NoError(t, err)

resultAsMap, ok := result.(map[string]interface{})
assert.True(t, ok)

assert.NotNil(t, resultAsMap["test"])

resultTime := resultAsMap["test"].(time.Time)

now := time.Now()
// just checking the returned time is between now and 2 seconds ago...
// if the test took more than 2 seconds... it deserves to fail :D
assert.True(t, now.Before(resultTime.Add(2*time.Second)))
assert.True(t, resultTime.Before(now))

_, err = codec.TextualFromNative(nil, result)
assert.NoError(t, err)

jsonRecord = `
{"test": null}
`

parser, err = NewParser(schema, WithNowForNullTimestamp())
assert.NoError(t, err)

result, err = parser.Parse([]byte(jsonRecord))
assert.NoError(t, err)

resultAsMap, ok = result.(map[string]interface{})
assert.True(t, ok)

assert.NotNil(t, resultAsMap["test"])

resultTime = resultAsMap["test"].(time.Time)

now = time.Now()
// just checking the returned time is between now and 2 seconds ago...
// if the test took more than 2 seconds... it deserves to fail :D
assert.True(t, now.Before(resultTime.Add(2*time.Second)))
assert.True(t, resultTime.Before(now))

_, err = codec.TextualFromNative(nil, result)
assert.NoError(t, err)
}

//nolint
Expand Down Expand Up @@ -675,4 +738,56 @@ func TestTimestampToMicros(t *testing.T) {
result, err = parser.Parse([]byte(jsonRecord))
assert.Error(t, err)
assert.Nil(t, result)

jsonRecord = `
{"bleh": "blah"}
`

parser, err = NewParser(schema, WithNowForNullTimestamp())
assert.NoError(t, err)

result, err = parser.Parse([]byte(jsonRecord))
assert.NoError(t, err)

resultAsMap, ok := result.(map[string]interface{})
assert.True(t, ok)

assert.NotNil(t, resultAsMap["test"])

resultTime := resultAsMap["test"].(time.Time)

now := time.Now()
// just checking the returned time is between now and 2 seconds ago...
// if the test took more than 2 seconds... it deserves to fail :D
assert.True(t, now.Before(resultTime.Add(2*time.Second)))
assert.True(t, resultTime.Before(now))

_, err = codec.TextualFromNative(nil, result)
assert.NoError(t, err)

jsonRecord = `
{"test": null}
`

parser, err = NewParser(schema, WithNowForNullTimestamp())
assert.NoError(t, err)

result, err = parser.Parse([]byte(jsonRecord))
assert.NoError(t, err)

resultAsMap, ok = result.(map[string]interface{})
assert.True(t, ok)

assert.NotNil(t, resultAsMap["test"])

resultTime = resultAsMap["test"].(time.Time)

now = time.Now()
// just checking the returned time is between now and 2 seconds ago...
// if the test took more than 2 seconds... it deserves to fail :D
assert.True(t, now.Before(resultTime.Add(2*time.Second)))
assert.True(t, resultTime.Before(now))

_, err = codec.TextualFromNative(nil, result)
assert.NoError(t, err)
}
6 changes: 6 additions & 0 deletions pkg/kedavro/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func WithDateTimeFormat(format string) ParserOption {
}
}

func WithNowForNullTimestamp() ParserOption {
return func(o *types.Options) {
o.IsSetNowForNilTimestamp = true
}
}

func NewParser(schemaString string, opts ...ParserOption) (Parser, error) {
s := map[string]interface{}{}

Expand Down
5 changes: 5 additions & 0 deletions pkg/kedavro/primitiveparsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ func parseLongValue(field *Field, value interface{}) (interface{}, error) {
}

func parseLongField(field *Field, record map[string]interface{}) (interface{}, error) {
if field.LogicalType == types.TimestampMillis || field.LogicalType == types.TimestampMicros {
if v, ok := record[field.Name]; (!ok || v == nil) && field.Opts.IsSetNowForNilTimestamp {
return time.Now(), nil
}
}
return parseWithDefaultValue(field, record, parseLongValue)
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ const (
)

type Options struct {
IsStringToNumber bool
IsStringToBool bool
IsTimestampToMillis bool
IsTimestampToMicros bool
IsFormatDateTime bool
DateTimeFormat string
IsStringToNumber bool
IsStringToBool bool
IsTimestampToMillis bool
IsTimestampToMicros bool
IsFormatDateTime bool
IsSetNowForNilTimestamp bool
DateTimeFormat string
}

0 comments on commit 106aa82

Please sign in to comment.