-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathhttp.go
More file actions
471 lines (416 loc) · 13.3 KB
/
http.go
File metadata and controls
471 lines (416 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
package getstream
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
)
// logRequest logs the details of an HTTP request
func (c *Client) logRequest(req *http.Request) {
c.logger.Debug("---> %s %s", req.Method, req.URL.String())
c.logger.Debug("Host: %s", req.Host)
for key, values := range req.Header {
c.logger.Debug("%s: %s", key, strings.Join(values, ", "))
}
if req.Body != nil {
var buf bytes.Buffer
body, _ := io.ReadAll(req.Body)
buf.Write(body)
req.Body = io.NopCloser(&buf)
c.logger.Debug("\n%s", string(body))
}
}
// logResponse logs the details of an HTTP response
func (c *Client) logResponse(resp *http.Response, body []byte, duration time.Duration) {
c.logger.Debug("<--- %d %s (%s)", resp.StatusCode, http.StatusText(resp.StatusCode), duration)
for key, values := range resp.Header {
c.logger.Debug("%s: %s", key, strings.Join(values, ", "))
}
c.logger.Debug("\n%s", string(body))
}
// Error represents an API error
type StreamError struct {
Code int `json:"code"`
Message string `json:"message"`
ExceptionFields map[string]string `json:"exception_fields,omitempty"`
StatusCode int `json:"StatusCode"`
Duration string `json:"duration"`
MoreInfo string `json:"more_info"`
RateLimit *RateLimitInfo `json:"-"`
}
func (e StreamError) Error() string {
return e.Message
}
// Response is the base response returned to the client
type StreamResponse[T any] struct {
RateLimitInfo *RateLimitInfo `json:"ratelimit"`
Data T
}
// parseResponse parses the HTTP response into the provided result
func parseResponse[GResponse any](c *Client, resp *http.Response, body []byte, result *GResponse) (*StreamResponse[GResponse], error) {
statusCode := resp.StatusCode
c.logger.Debug("Status Code: %d", statusCode)
// If status code indicates an error
if statusCode >= 399 {
var apiErr StreamError
err := json.Unmarshal(body, &apiErr)
if err != nil {
apiErr.Message = string(body)
apiErr.StatusCode = resp.StatusCode
return nil, apiErr
}
apiErr.RateLimit = NewRateLimitFromHeaders(resp.Header)
return nil, apiErr
}
// Attempt to unmarshal the response into the result
err := json.Unmarshal(body, result)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}
// Add rate limit info to the result
return addRateLimitInfo(resp.Header, result)
}
// requestURL constructs the full request URL
func (c *Client) requestURL(path string, values url.Values, pathParams map[string]string) (string, error) {
path = buildPath(path, pathParams)
u, err := url.Parse(c.baseUrl + path)
if err != nil {
return "", fmt.Errorf("url.Parse: %w", err)
}
if values == nil {
values = make(url.Values)
}
values.Add("api_key", c.apiKey)
c.logger.Debug("Query parameters: %v", values)
u.RawQuery = values.Encode()
url := u.String()
c.logger.Debug("Full URL: %s", url)
return url, nil
}
// buildPath constructs a URL path with parameters, escaping them appropriately.
func buildPath(path string, pathParams map[string]string) string {
if pathParams == nil {
return path
}
for k, v := range pathParams {
pathParams[k] = url.QueryEscape(v)
}
return replaceParams(path, pathParams)
}
// replaceParams replaces placeholders in the path with the corresponding values from pathParams.
func replaceParams(path string, pathParams map[string]string) string {
for k, v := range pathParams {
path = strings.ReplaceAll(path, "{"+k+"}", v)
}
return path
}
// newRequest creates a new HTTP request
func newRequest[T any](c *Client, ctx context.Context, method, path string, params url.Values, data T, pathParams map[string]string) (*http.Request, error) {
u, err := c.requestURL(path, params, pathParams)
if err != nil {
return nil, err
}
r, err := http.NewRequestWithContext(ctx, method, u, http.NoBody)
if err != nil {
return nil, err
}
c.setHeaders(r)
// Do not set body if the method is GET
if method == http.MethodGet {
r.Body = nil
c.logger.Debug("GET request: No body set")
return r, nil
}
// Handle other methods with body
c.logger.Debug("Method: %s, Data: %#v (Type: %T)", method, data, data)
switch t := any(data).(type) {
case nil:
c.logger.Debug("Data is nil")
r.Body = nil
case io.ReadCloser:
c.logger.Debug("Data is io.ReadCloser")
r.Body = t
case io.Reader:
c.logger.Debug("Data is io.Reader")
r.Body = io.NopCloser(t)
case *UploadFileRequest, *UploadImageRequest, *UploadChannelFileRequest, *UploadChannelImageRequest:
return c.createMultipartRequest(r, t)
default:
c.logger.Debug("Data is of type %T, attempting to marshal to JSON", t)
b, err := json.Marshal(data)
if err != nil {
c.logger.Error("Error marshaling data: %+v, setting body to nil", err)
r.Body = nil
} else {
r.Body = io.NopCloser(bytes.NewReader(b))
c.logger.Debug("Request body set with JSON: %s", string(b))
}
}
return r, nil
}
func getFileContent(fileName string, fileContent io.Reader) (io.Reader, error) {
if fileContent != nil {
return fileContent, nil
}
if fileName != "" {
file, err := os.Open(fileName)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
return file, nil
}
return nil, fmt.Errorf("either file name or file content must be provided")
}
// createMultipartRequest creates a multipart form request for file/image uploads
func (c *Client) createMultipartRequest(r *http.Request, data any) (*http.Request, error) {
var buf bytes.Buffer
writer := multipart.NewWriter(&buf)
var fileContent io.Reader
var fileName string
var err error
// Handle both UploadFileRequest and UploadImageRequest
switch req := data.(type) {
case *UploadFileRequest:
if req.File == nil {
return nil, fmt.Errorf("file name must be provided")
}
fileName = *req.File
fileContent, err = getFileContent(*req.File, nil)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
// Add user field if present
if req.User != nil {
userJSON, err := json.Marshal(req.User)
if err != nil {
return nil, fmt.Errorf("failed to marshal user: %w", err)
}
err = writer.WriteField("user", string(userJSON))
if err != nil {
return nil, fmt.Errorf("failed to write user field: %w", err)
}
}
case *UploadImageRequest:
if req.File == nil {
return nil, fmt.Errorf("file name must be provided")
}
fileName = *req.File
fileContent, err = getFileContent(*req.File, nil)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
// Add upload_sizes field if present
if req.UploadSizes != nil && len(req.UploadSizes) > 0 {
uploadSizesJSON, err := json.Marshal(req.UploadSizes)
if err != nil {
return nil, fmt.Errorf("failed to marshal upload_sizes: %w", err)
}
err = writer.WriteField("upload_sizes", string(uploadSizesJSON))
if err != nil {
return nil, fmt.Errorf("failed to write upload_sizes field: %w", err)
}
}
// Add user field if present
if req.User != nil {
userJSON, err := json.Marshal(req.User)
if err != nil {
return nil, fmt.Errorf("failed to marshal user: %w", err)
}
err = writer.WriteField("user", string(userJSON))
if err != nil {
return nil, fmt.Errorf("failed to write user field: %w", err)
}
}
case *UploadChannelFileRequest:
if req.File == nil {
return nil, fmt.Errorf("file name must be provided")
}
fileName = *req.File
fileContent, err = getFileContent(*req.File, nil)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
// Add user field if present
if req.User != nil {
userJSON, err := json.Marshal(req.User)
if err != nil {
return nil, fmt.Errorf("failed to marshal user: %w", err)
}
err = writer.WriteField("user", string(userJSON))
if err != nil {
return nil, fmt.Errorf("failed to write user field: %w", err)
}
}
case *UploadChannelImageRequest:
if req.File == nil {
return nil, fmt.Errorf("file name must be provided")
}
fileName = *req.File
fileContent, err = getFileContent(*req.File, nil)
if err != nil {
return nil, fmt.Errorf("failed to open file: %w", err)
}
// Add upload_sizes field if present
if req.UploadSizes != nil && len(req.UploadSizes) > 0 {
uploadSizesJSON, err := json.Marshal(req.UploadSizes)
if err != nil {
return nil, fmt.Errorf("failed to marshal upload_sizes: %w", err)
}
err = writer.WriteField("upload_sizes", string(uploadSizesJSON))
if err != nil {
return nil, fmt.Errorf("failed to write upload_sizes field: %w", err)
}
}
// Add user field if present
if req.User != nil {
userJSON, err := json.Marshal(req.User)
if err != nil {
return nil, fmt.Errorf("failed to marshal user: %w", err)
}
err = writer.WriteField("user", string(userJSON))
if err != nil {
return nil, fmt.Errorf("failed to write user field: %w", err)
}
}
default:
return nil, fmt.Errorf("unsupported request type for multipart: %T", data)
}
// Add file field
fileWriter, err := writer.CreateFormFile("file", fileName)
if err != nil {
return nil, fmt.Errorf("failed to create form file: %w", err)
}
_, err = io.Copy(fileWriter, fileContent)
if err != nil {
return nil, fmt.Errorf("failed to copy file content: %w", err)
}
err = writer.Close()
if err != nil {
return nil, fmt.Errorf("failed to close multipart writer: %w", err)
}
// Update request body and content type
r.Body = io.NopCloser(&buf)
r.Header.Set("Content-Type", writer.FormDataContentType())
c.logger.Debug("Created multipart request with file: %s", fileName)
return r, nil
}
// setHeaders sets necessary headers for the request
func (c *Client) setHeaders(r *http.Request) {
if r.Header.Get("Content-Type") == "" {
r.Header.Set("Content-Type", "application/json")
}
r.Header.Set("X-Stream-Client", versionHeader())
r.Header.Set("Authorization", c.authToken)
r.Header.Set("Stream-Auth-Type", "jwt")
}
func StructToMapWithTags(input any, tagName string) (map[string]any, error) {
result := make(map[string]any)
err := extractFields(reflect.ValueOf(input), tagName, result)
return result, err
}
// Recursive function to extract fields
func extractFields(val reflect.Value, tagName string, result map[string]any) error {
// Check if the input is a pointer and get the actual value
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
// Ensure the provided input is a struct
if val.Kind() != reflect.Struct {
return fmt.Errorf("input must be a struct or a pointer to a struct")
}
typ := val.Type()
for i := 0; i < val.NumField(); i++ {
field := val.Field(i)
structField := typ.Field(i)
if tag, ok := structField.Tag.Lookup(tagName); ok {
result[tag] = field.Interface()
}
}
return nil
}
func extractQueryParams(v any) url.Values {
if v == nil || reflect.ValueOf(v).IsNil() {
return url.Values{}
}
m, err := StructToMapWithTags(v, "query")
if err != nil {
panic(err)
}
values := url.Values{}
for k, v := range m {
value := reflect.ValueOf(v)
if value.Kind() == reflect.Ptr && value.IsNil() {
continue
}
values.Set(k, EncodeValueToQueryParam(v))
}
return values
}
// EncodeValueToQueryParam returns the string representation of a value ready to be used as a query param
func EncodeValueToQueryParam(value any) string {
val := reflect.ValueOf(value)
switch val.Kind() {
case reflect.Ptr:
return EncodeValueToQueryParam(val.Elem().Interface())
case reflect.String:
return val.String()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return strconv.FormatInt(val.Int(), 10)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return strconv.FormatUint(val.Uint(), 10)
case reflect.Float32, reflect.Float64:
return strconv.FormatFloat(val.Float(), 'f', -1, 64)
case reflect.Bool:
return strconv.FormatBool(val.Bool())
case reflect.Slice:
// For query params, slices of primitives should be comma-separated (e.g. ids=a,b,c)
parts := make([]string, val.Len())
for i := 0; i < val.Len(); i++ {
parts[i] = EncodeValueToQueryParam(val.Index(i).Interface())
}
return strings.Join(parts, ",")
case reflect.Map, reflect.Struct:
b, err := json.Marshal(value)
if err != nil {
panic(err)
}
return string(b)
default:
return fmt.Sprintf("%v", val.Interface())
}
}
// MakeRequest makes a generic HTTP request
func MakeRequest[GRequest any, GResponse any](c *Client, ctx context.Context, method, path string, params url.Values, data *GRequest, response *GResponse, pathParams map[string]string) (*StreamResponse[GResponse], error) {
r, err := newRequest(c, ctx, method, path, params, data, pathParams)
if err != nil {
return nil, err
}
c.logRequest(r)
start := time.Now()
resp, err := c.httpClient.Do(r)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read HTTP response: %w", err)
}
duration := time.Since(start)
c.logResponse(resp, b, duration)
return parseResponse(c, resp, b, response)
}
// addRateLimitInfo adds rate limit information to the result
func addRateLimitInfo[Gresponse any](headers http.Header, result *Gresponse) (*StreamResponse[Gresponse], error) {
rateLimit := NewRateLimitFromHeaders(headers)
return &StreamResponse[Gresponse]{RateLimitInfo: rateLimit, Data: *result}, nil
}