Skip to content

feat: optional SSE in REST protocol#176

Open
joroshiba wants to merge 3 commits into
connectrpc:mainfrom
joroshiba:sse-support
Open

feat: optional SSE in REST protocol#176
joroshiba wants to merge 3 commits into
connectrpc:mainfrom
joroshiba:sse-support

Conversation

@joroshiba

@joroshiba joroshiba commented Sep 30, 2025

Copy link
Copy Markdown

This adds support for SSE streaming on REST Protocol via an option, but is not enabled by default. When using WithRESTServerSentEvents(), only streaming rpcs using REST protocol when request header Accept is set to text/event-stream are affected.

When SSE is being used:

  • on connect an event open is fired
  • rpc streamed responses written with a data containing the JSON serialized protobuf response object in the data field of the event.
  • error events are generated when error encountered
  • when the stream is closed the complete event is fired

There are SSE specific field configurations available for event id and retry. To support these fields, custom directives are parsed out of the google.api.http response_body annotation field, to configure these fields to be set as specifed for each data sent.

  • SSE_EVENT=<field_name> will change the event name on the SSE to be derived from the field specified
  • SSE_ID=<field_name>, will change the event id to be derived from the field specified. If this is not set no ID is specified, when it is the id field will be blank unless the attribute has a value on the rendered data.
  • SSE_RETRY=<field_name>, will set the retry field based on the value, assuming it is all ascii numbers.
  • SSE_OMIT will, if any of the above are set, omit the fields specified from the data json body.

Related issue:
#161

Signed-off-by: Jordan Oroshiba <jordan@astria.org>
…support via protobuf annotation directives

Signed-off-by: Jordan Oroshiba <jordan@astria.org>
Signed-off-by: Jordan Oroshiba <jordan@astria.org>

@emcfarlane emcfarlane left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the time to put this PR together. I can see the need for having a custom encoding to support streaming use cases but have concerns about the approach. For extending the definition of google.api.http it would be better suited as a custom plugin (see this issue: #159), where you would be able to configure the streaming behaviour to match exactly what your clients expect. To add this behaviour to vanguard we would require the behaviour to be as generally applicable as possible. We don't however currently support pluggable interfaces.

Comment thread README.md
rpc WatchBooks(WatchBooksRequest) returns (stream BookUpdate) {
option (google.api.http) = {
get: "/v1/{parent=shelves/*}/books:watch"
response_body: "SSE_EVENT=type,SSE_ID=sequence,SSE_OMIT"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This response_body is not a valid field path. It breaks the semantic meaning describe in google.api.http. Any other tool that interprets HttpRule annotations will misinterpret this.

Comment thread README.md
Comment on lines +170 to +173
**Available directives:**
- `SSE_EVENT=field_name` - Extract `field_name` from each message to use as the SSE event type
- `SSE_ID=field_name` - Extract `field_name` from each message to use as the SSE event ID
- `SSE_OMIT` - Remove the extracted fields from the JSON data payload

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A custom annotation would be more idiomatic and avoid overloading the response_body.

Comment thread transcoder.go
Comment on lines +2327 to +2329
var jsonData map[string]interface{}
needsFiltering := s.omitExtractedFields && (s.eventField != "" || s.eventIDField != "" || s.retryField != "")
err := json.Unmarshal(data, &jsonData)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unmarshals the already marshalled data to extract and filter and then must re-encode. This has overhead but also correctness issues, the map[string]any will re-order the fields.

Comment thread protocol_rest.go
if r.useSSE && !isErr {
headers["Content-Type"] = []string{contentTypeSSE}
headers["Cache-Control"] = []string{"no-cache"}
headers["Connection"] = []string{"keep-alive"}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep-alive should not be set for HTTP/2.

Comment thread protocol_rest.go
buf.WriteString(",")
}
first = false
buf.WriteString(`"` + k + `":"` + v + `"`)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must escape the key and values for headers to avoid breaking encoding.

Comment thread transcoder.go
eventIDField string // Field name to extract from message for event ID
retryField string // Field name to extract from message for retry interval
omitExtractedFields bool // Whether to remove extracted fields from data payload
responseType protoreflect.MessageType

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responseType is never used.

Comment thread transcoder.go
buf.WriteString(sseNewline)

// Write to delegate
_, err := s.delegate.Write(buf.Bytes())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to call Flush()?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants