Skip to content

Commit

Permalink
return JSON ok response for hec receiver raw path (open-telemetry#29875)
Browse files Browse the repository at this point in the history
**Description:**
Adds enhancement for splunk hec receiver raw path
- previously it returns no response in http body: it nows returns json
response

**Link to tracking Issue:**

open-telemetry#29745
  • Loading branch information
splunkericl committed Dec 15, 2023
1 parent 5ddd0ba commit ce75979
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .chloggen/splunkhecreceiver-return-success-body.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Returns json response in raw endpoint when it is successful

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20766]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 12 additions & 5 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
// Centralizing some HTTP and related string constants.
gzipEncoding = "gzip"
httpContentEncodingHeader = "Content-Encoding"
httpContentTypeHeader = "Content-Type"
httpJSONTypeHeader = "application/json"
)

var (
Expand Down Expand Up @@ -229,6 +231,14 @@ func (r *splunkReceiver) Shutdown(context.Context) error {
return err
}

func (r *splunkReceiver) writeSuccessResponse(ctx context.Context, resp http.ResponseWriter, eventCount int) {
resp.Header().Set(httpContentTypeHeader, httpJSONTypeHeader)
resp.WriteHeader(http.StatusOK)
if _, err := resp.Write(okRespBody); err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, eventCount, err)
}
}

func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ctx = r.obsrecv.StartLogsOp(ctx)
Expand Down Expand Up @@ -292,7 +302,7 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
if consumerErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, consumerErr)
} else {
resp.WriteHeader(http.StatusOK)
r.writeSuccessResponse(ctx, resp, ld.LogRecordCount())
r.obsrecv.EndLogsOp(ctx, metadata.Type, slLen, nil)
}
}
Expand Down Expand Up @@ -404,10 +414,7 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
}
}

resp.WriteHeader(http.StatusOK)
if _, err := resp.Write(okRespBody); err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events)+len(metricEvents), err)
}
r.writeSuccessResponse(ctx, resp, len(events)+len(metricEvents))
}

func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resource pcommon.Resource) {
Expand Down
89 changes: 55 additions & 34 deletions receiver/splunkhecreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

func assertHecSuccessResponse(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusOK, status)
assert.Equal(t, httpJSONTypeHeader, resp.Header.Get(httpContentTypeHeader))
assert.Equal(t, map[string]any{"code": float64(0), "text": "Success"}, body)
}

func Test_splunkhecreceiver_NewLogsReceiver(t *testing.T) {
defaultConfig := createDefaultConfig().(*Config)
emptyEndpointConfig := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -167,14 +174,15 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
tests := []struct {
name string
req *http.Request
assertResponse func(t *testing.T, status int, body any)
assertResponse func(t *testing.T, resp *http.Response, body any)
assertSink func(t *testing.T, sink *consumertest.LogsSink)
assertMetricsSink func(t *testing.T, sink *consumertest.MetricsSink)
}{
{
name: "incorrect_method",
req: httptest.NewRequest("PUT", "http://localhost/foo", nil),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, "Only \"POST\" method is supported", body)
},
Expand All @@ -188,7 +196,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req.Header.Set("Content-Type", "application/not-json")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusOK, status)
assert.Equal(t, map[string]any{
"text": "Success",
Expand All @@ -203,7 +212,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req.Header.Set("Content-Encoding", "superzipper")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusUnsupportedMediaType, status)
assert.Equal(t, `"Content-Encoding" must be "gzip" or empty`, body)
},
Expand All @@ -214,7 +224,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader([]byte{1, 2, 3, 4}))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(6), "text": "Invalid data format"}, body)
},
Expand All @@ -225,7 +236,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(nil))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(5), "text": "No data"}, body)
},
Expand All @@ -238,7 +250,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(6), "text": "Invalid data format"}, body)
},
Expand All @@ -253,7 +266,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(12), "text": "Event field is required"}, body)
},
Expand All @@ -268,7 +282,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(13), "text": "Event field cannot be blank"}, body)
},
Expand All @@ -281,9 +296,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assert.Equal(t, http.StatusOK, status)
assert.Equal(t, map[string]any{"code": float64(0), "text": "Success"}, body)
assertResponse: func(t *testing.T, resp *http.Response, body any) {
assertHecSuccessResponse(t, resp, body)
},
assertSink: func(t *testing.T, sink *consumertest.LogsSink) {
assert.Equal(t, 1, len(sink.AllLogs()))
Expand All @@ -300,9 +314,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assert.Equal(t, http.StatusOK, status)
assert.Equal(t, map[string]any{"code": float64(0), "text": "Success"}, body)
assertResponse: func(t *testing.T, resp *http.Response, body any) {
assertHecSuccessResponse(t, resp, body)
},
assertSink: func(t *testing.T, sink *consumertest.LogsSink) {
assert.Equal(t, 0, len(sink.AllLogs()))
Expand All @@ -327,9 +340,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req.Header.Set("Content-Encoding", "gzip")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assert.Equal(t, http.StatusOK, status)
assert.Equal(t, map[string]any{"code": float64(0), "text": "Success"}, body)
assertResponse: func(t *testing.T, resp *http.Response, body any) {
assertHecSuccessResponse(t, resp, body)
},
},
{
Expand All @@ -342,7 +354,8 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
req.Header.Set("Content-Encoding", "gzip")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, `Error on gzip body`, body)
},
Expand Down Expand Up @@ -373,7 +386,7 @@ func Test_splunkhecReceiver_handleReq(t *testing.T) {
fmt.Println(string(respBytes))
assert.NoError(t, json.Unmarshal(respBytes, &body))

tt.assertResponse(t, resp.StatusCode, body)
tt.assertResponse(t, resp, body)
if tt.assertSink != nil {
tt.assertSink(t, sink)
}
Expand Down Expand Up @@ -928,12 +941,13 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
tests := []struct {
name string
req *http.Request
assertResponse func(t *testing.T, status int, body any)
assertResponse func(t *testing.T, resp *http.Response, body any)
}{
{
name: "incorrect_method",
req: httptest.NewRequest("PUT", "http://localhost/foo", nil),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, `Only "POST" method is supported`, body)
},
Expand All @@ -945,7 +959,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req.Header.Set("Content-Type", "application/not-json")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusOK, status)
},
},
Expand All @@ -956,7 +971,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req.Header.Set("Content-Encoding", "superzipper")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusUnsupportedMediaType, status)
assert.Equal(t, `"Content-Encoding" must be "gzip" or empty`, body)
},
Expand All @@ -967,7 +983,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(nil))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(5), "text": "No data"}, body)
},
Expand All @@ -979,7 +996,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", strings.NewReader("foo\nbar"))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusOK, status)
},
},
Expand All @@ -991,8 +1009,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req := httptest.NewRequest("POST", "http://localhost/foo", bytes.NewReader(msgBytes))
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assert.Equal(t, http.StatusOK, status)
assertResponse: func(t *testing.T, resp *http.Response, body any) {
assertHecSuccessResponse(t, resp, body)
},
},
{
Expand All @@ -1011,8 +1029,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req.Header.Set("Content-Encoding", "gzip")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assert.Equal(t, http.StatusOK, status)
assertResponse: func(t *testing.T, resp *http.Response, body any) {
assertHecSuccessResponse(t, resp, body)
},
},
{
Expand All @@ -1025,7 +1043,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
req.Header.Set("Content-Encoding", "gzip")
return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, `Error on gzip body`, body)
},
Expand All @@ -1044,7 +1063,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {

return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(6), "text": "Invalid data format"}, body)
},
Expand All @@ -1063,7 +1083,8 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {

return req
}(),
assertResponse: func(t *testing.T, status int, body any) {
assertResponse: func(t *testing.T, resp *http.Response, body any) {
status := resp.StatusCode
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, map[string]any{"code": float64(6), "text": "Invalid data format"}, body)
},
Expand Down Expand Up @@ -1094,7 +1115,7 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
assert.NoError(t, json.Unmarshal(respBytes, &body))
}

tt.assertResponse(t, resp.StatusCode, body)
tt.assertResponse(t, resp, body)
})
}
}
Expand Down

0 comments on commit ce75979

Please sign in to comment.