-
Notifications
You must be signed in to change notification settings - Fork 7
/
fetch-client.ts
177 lines (155 loc) · 4.62 KB
/
fetch-client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/** following reference needed to include types for experimental fetch API in Node */
/// <reference lib="dom" />
import { getServiceError, NetworkError } from "../errors";
import { QueryFailure } from "../wire-protocol";
import {
HTTPClient,
HTTPClientOptions,
HTTPRequest,
HTTPResponse,
HTTPStreamRequest,
HTTPStreamClient,
StreamAdapter,
} from "./http-client";
/**
* An implementation for {@link HTTPClient} that uses the native fetch API
*/
export class FetchClient implements HTTPClient, HTTPStreamClient {
#queryURL: string;
#streamURL: string;
#keepalive: boolean;
constructor({ url, fetch_keepalive }: HTTPClientOptions) {
this.#queryURL = new URL("/query/1", url).toString();
this.#streamURL = new URL("/stream/1", url).toString();
this.#keepalive = fetch_keepalive;
}
/** {@inheritDoc HTTPClient.request} */
async request({
data,
headers: requestHeaders,
method,
client_timeout_ms,
}: HTTPRequest): Promise<HTTPResponse> {
const signal =
AbortSignal.timeout === undefined
? (() => {
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), client_timeout_ms);
return signal;
})()
: AbortSignal.timeout(client_timeout_ms);
const response = await fetch(this.#queryURL, {
method,
headers: { ...requestHeaders, "Content-Type": "application/json" },
body: JSON.stringify(data),
signal,
keepalive: this.#keepalive,
}).catch((error) => {
throw new NetworkError("The network connection encountered a problem.", {
cause: error,
});
});
const status = response.status;
const responseHeaders: Record<string, string> = {};
response.headers.forEach((value, key) => (responseHeaders[key] = value));
const body = await response.text();
return {
status,
body,
headers: responseHeaders,
};
}
/** {@inheritDoc HTTPStreamClient.stream} */
stream({
data,
headers: requestHeaders,
method,
}: HTTPStreamRequest): StreamAdapter {
const request = new Request(this.#streamURL, {
method,
headers: { ...requestHeaders, "Content-Type": "application/json" },
body: JSON.stringify(data),
keepalive: this.#keepalive,
});
const abortController = new AbortController();
const options = {
signal: abortController.signal,
};
async function* reader() {
const response = await fetch(request, options).catch((error) => {
throw new NetworkError(
"The network connection encountered a problem.",
{
cause: error,
},
);
});
const status = response.status;
if (!(status >= 200 && status < 400)) {
const failure: QueryFailure = await response.json();
throw getServiceError(failure, status);
}
const body = response.body;
if (!body) {
throw new Error("Response body is undefined.");
}
const reader = body.getReader();
try {
for await (const line of readLines(reader)) {
yield line;
}
} catch (error) {
throw new NetworkError(
"The network connection encountered a problem while streaming events.",
{ cause: error },
);
}
}
return {
read: reader(),
close: () => {
abortController.abort("Stream closed by the client.");
},
};
}
/** {@inheritDoc HTTPClient.close} */
close() {
// no actions at this time
}
}
/**
* Get individual lines from the stream
*
* The stream may be broken into arbitrary chunks, but the events are delimited by a newline character.
*
* @param reader - The stream reader
*/
async function* readLines(reader: ReadableStreamDefaultReader<Uint8Array>) {
const textDecoder = new TextDecoder();
let partOfLine = "";
for await (const chunk of readChunks(reader)) {
const chunkText = textDecoder.decode(chunk);
const chunkLines = (partOfLine + chunkText).split("\n");
// Yield all complete lines
for (let i = 0; i < chunkLines.length - 1; i++) {
yield chunkLines[i].trim();
}
// Store the partial line
partOfLine = chunkLines[chunkLines.length - 1];
}
// Yield the remaining partial line if any
if (partOfLine.trim() !== "") {
yield partOfLine;
}
}
async function* readChunks(reader: ReadableStreamDefaultReader<Uint8Array>) {
let done = false;
do {
const readResult = await reader.read();
if (readResult.value !== undefined) {
yield readResult.value;
}
done = readResult.done;
} while (!done);
}