Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azopenai] EventReader should return an error if the stream is closed #21323

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/ai/azopenai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

### Bugs Fixed

- EventReader, used by GetChatCompletionsStream and GetCompletionsStream for streaming results, would not return an
error if the underlying Body reader was closed or EOF'd before the actual DONE: token arrived. This could result in an
infinite loop for callers. (PR#)

### Other Changes

## 0.1.1 (2023-07-26)
Expand Down
2 changes: 1 addition & 1 deletion sdk/ai/azopenai/custom_models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestParseResponseError(t *testing.T) {

contentFilterResults := contentFilterErr.ContentFilterResults

// thsi comment was considered violent, so it was filtered.
// this comment was considered violent, so it was filtered.
require.Equal(t, &ContentFilterResultsViolence{
Filtered: to.Ptr(true),
Severity: to.Ptr(ContentFilterSeverityMedium)}, contentFilterResults.Violence)
Expand Down
11 changes: 9 additions & 2 deletions sdk/ai/azopenai/event_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,19 @@ func (er *EventReader[T]) Read() (T, error) {
err := json.Unmarshal([]byte(tokens[1]), &data)
return data, err
default: // Any other event type is an unexpected
return data, errors.New("Unexpected event type: " + tokens[0])
return data, errors.New("unexpected event type: " + tokens[0])
}
// Unreachable
}
}
return *new(T), er.scanner.Err()

scannerErr := er.scanner.Err()

if scannerErr == nil {
return *new(T), errors.New("incomplete stream")
}

return *new(T), scannerErr
}

// Close closes the EventReader and any applicable inner stream state.
Expand Down
17 changes: 16 additions & 1 deletion sdk/ai/azopenai/event_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestEventReader_InvalidType(t *testing.T) {

firstEvent, err := eventReader.Read()
require.Empty(t, firstEvent)
require.EqualError(t, err, "Unexpected event type: invaliddata")
require.EqualError(t, err, "unexpected event type: invaliddata")
}

type badReader struct{}
Expand All @@ -35,8 +35,23 @@ func (br badReader) Read(p []byte) (n int, err error) {

func TestEventReader_BadReader(t *testing.T) {
eventReader := newEventReader[ChatCompletions](io.NopCloser(badReader{}))
defer eventReader.Close()

firstEvent, err := eventReader.Read()
require.Empty(t, firstEvent)
require.ErrorIs(t, io.ErrClosedPipe, err)
}

func TestEventReader_StreamIsClosedBeforeDone(t *testing.T) {
buff := strings.NewReader("data: {}")

eventReader := newEventReader[ChatCompletions](io.NopCloser(buff))

evt, err := eventReader.Read()
require.Empty(t, evt)
require.NoError(t, err)

evt, err = eventReader.Read()
require.Empty(t, evt)
require.EqualError(t, err, "incomplete stream")
}