Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
9027754
Redo
Jun 30, 2025
bf18abf
Tests
Jun 30, 2025
a40af0c
Remove print
Jun 30, 2025
38f5be6
Simplify
Jun 30, 2025
4763170
Feedback
Jul 1, 2025
6a95dc5
Feedback
Jul 1, 2025
ee8fafd
Feedback
Jul 1, 2025
9af9940
Cleanup
Jul 1, 2025
1cd1311
Update context.go
smaye81 Jul 1, 2025
93cacf8
Feedback
Jul 1, 2025
adc81b1
Interceptors
Jul 1, 2025
8397865
Interceptor tests
Jul 1, 2025
bc250b0
Feedback
Jul 1, 2025
0a44db9
Feedback
Jul 1, 2025
94dbb48
Update header setting
Jul 1, 2025
57e8698
Fix responseWrapper docs
Jul 1, 2025
e422ba2
Fix again
Jul 1, 2025
3cdb5e1
Update tests
Jul 1, 2025
6a3ed80
Style
Jul 1, 2025
e14c0d7
Move func
Jul 2, 2025
a2e3f4e
Fix server stream tests
Jul 3, 2025
b83ca03
Rename context methods and always create a new call info when using e…
Jul 16, 2025
7d37a59
Interceptor tests
Jul 21, 2025
3cadcc1
Side quest tests
Jul 21, 2025
b2d9bce
Extensive testing for simple and generic APIs using callinfo
Jul 21, 2025
153acb7
Implement simple for client streaming on handler
Jul 17, 2025
021a0cb
Implement simple for client streaming on client
Jul 17, 2025
7995c00
Implement simple for bidi streaming on client
Jul 17, 2025
d12c6c9
Make client/bidi stream fallible for simple
Jul 17, 2025
e4026f2
Fix benchmark/example test
Jul 17, 2025
e34c8c8
Redo
Jun 30, 2025
194fb35
Tests
Jun 30, 2025
39380da
Remove print
Jun 30, 2025
36bc4bc
Simplify
Jun 30, 2025
c4b878f
Feedback
Jul 1, 2025
6c5253b
Feedback
Jul 1, 2025
3f509d8
Feedback
Jul 1, 2025
c78eb94
Cleanup
Jul 1, 2025
279b452
Update context.go
smaye81 Jul 1, 2025
7baf7be
Feedback
Jul 1, 2025
8d92bae
Interceptors
Jul 1, 2025
329c9e5
Interceptor tests
Jul 1, 2025
c674148
Feedback
Jul 1, 2025
3935be9
Feedback
Jul 1, 2025
a295d10
Update header setting
Jul 1, 2025
1d30ca6
Fix responseWrapper docs
Jul 1, 2025
a2e8814
Fix again
Jul 1, 2025
d8102c0
Update tests
Jul 1, 2025
8de8390
Style
Jul 1, 2025
32520e7
Move func
Jul 2, 2025
d5ccf16
Fix server stream tests
Jul 3, 2025
cd7dc92
Rename context methods and always create a new call info when using e…
Jul 16, 2025
45ee1ce
Interceptor tests
Jul 21, 2025
1011799
Side quest tests
Jul 21, 2025
9143ba0
Extensive testing for simple and generic APIs using callinfo
Jul 21, 2025
fa4e176
Feedback
Jul 23, 2025
8e8dc71
Merge branch 'simple' into simple
smaye81 Jul 23, 2025
0b48c6a
Merge pull request #1 from jchadwick-buf/simple
smaye81 Jul 23, 2025
f131a46
Add full host of tests for all RPC types and simple vs. generics API.
Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
conn := client.protocolClient.NewConn(ctx, unarySpec, request.Header())
conn.onRequestSend(func(r *http.Request) {
request.setRequestMethod(r.Method)
callInfo, ok := clientCallInfoFromContext(ctx)
if ok {
callInfo.method = r.Method
}
})
// Send always returns an io.EOF unless the error is from the client-side.
// We want the user to continue to call Receive in those cases to get the
Expand All @@ -100,6 +104,7 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
return response, conn.CloseResponse()
})
if interceptor := config.Interceptor; interceptor != nil {
// interceptor is the full chain of all interceptors provided
unaryFunc = interceptor.WrapUnary(unaryFunc)
}
client.callUnary = func(ctx context.Context, request *Request[Req]) (*Response[Res], error) {
Expand All @@ -109,6 +114,23 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
request.spec = unarySpec
request.peer = client.protocolClient.Peer()
protocolClient.WriteRequestHeader(StreamTypeUnary, request.Header())

// Also set them in the context if there's a call info present
callInfo, callInfoOk := clientCallInfoFromContext(ctx)
if callInfoOk {
callInfo.peer = request.Peer()
callInfo.spec = request.Spec()
// A client could have set request headers in the call info OR the request wrapper
// So if a callInfo exists in context, merge any headers from there into the request wrapper
// so that all headers are sent in the request
mergeHeaders(request.Header(), callInfo.requestHeader)

// Copy the call info into a sentinel value. This is so we can compare
// the sentinel value against the call info in context. If they're different,
// we can stop the request. This protects against changing the context in interceptors.
ctx = context.WithValue(ctx, sentinelContextKey{}, callInfo)
}

response, err := unaryFunc(ctx, request)
if err != nil {
return nil, err
Expand All @@ -117,6 +139,12 @@ func NewClient[Req, Res any](httpClient HTTPClient, url string, options ...Clien
if !ok {
return nil, errorf(CodeInternal, "unexpected client response type %T", response)
}
if callInfoOk {
// Wrap the response and set it into the context callinfo
callInfo.responseSource = &responseWrapper[Res]{
response: typed,
}
}
return typed, nil
}
return client
Expand All @@ -130,19 +158,6 @@ func (c *Client[Req, Res]) CallUnary(ctx context.Context, request *Request[Req])
return c.callUnary(ctx, request)
}

// CallUnarySimple calls a request-response procedure using the function signature
// associated with the "simple" generation option.
//
// This option eliminates the [Request] and [Response] wrappers, and instead uses the
// context.Context to propagate information such as headers.
func (c *Client[Req, Res]) CallUnarySimple(ctx context.Context, requestMsg *Req) (*Res, error) {
response, err := c.CallUnary(ctx, requestFromContext(ctx, requestMsg))
if response != nil {
return response.Msg, err
}
return nil, err
}

// CallClientStream calls a client streaming procedure.
func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res] {
if c.err != nil {
Expand All @@ -159,12 +174,33 @@ func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Reques
if c.err != nil {
return nil, c.err
}
callInfo, callInfoOk := clientCallInfoFromContext(ctx)
// Set values in the context if there's a call info present
if callInfoOk {
// Copy the call info into a sentinel value. This is so we can compare
// the sentinel value against the call info in context. If they're different,
// we can stop the request. This protects against changing the context in interceptors.
ctx = context.WithValue(ctx, sentinelContextKey{}, callInfo)
}
conn := c.newConn(ctx, StreamTypeServer, func(r *http.Request) {
request.method = r.Method
})
request.spec = conn.Spec()
request.peer = conn.Peer()
request.spec = conn.Spec()

// Set values in the context if there's a call info present
if callInfoOk {
callInfo.peer = conn.Peer()
callInfo.spec = conn.Spec()
callInfo.responseSource = conn

// Merge any callInfo request headers first, then do the request.
// so that context headers show first in the list of headers
mergeHeaders(conn.RequestHeader(), callInfo.RequestHeader())
}

mergeHeaders(conn.RequestHeader(), request.header)

// Send always returns an io.EOF unless the error is from the client-side.
// We want the user to continue to call Receive in those cases to get the
// full error from the server-side.
Expand All @@ -182,15 +218,6 @@ func (c *Client[Req, Res]) CallServerStream(ctx context.Context, request *Reques
}, nil
}

// CallServerStreamSimple calls a server streaming procedure using the function signature
Comment thread
jhump marked this conversation as resolved.
// associated with the "simple" generation option.
//
// This option eliminates the [Request] wrapper, and instead uses the context.Context to
// propagate information such as headers.
func (c *Client[Req, Res]) CallServerStreamSimple(ctx context.Context, requestMsg *Req) (*ServerStreamForClient[Res], error) {
return c.CallServerStream(ctx, requestFromContext(ctx, requestMsg))
}

// CallBidiStream calls a bidirectional streaming procedure.
func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res] {
if c.err != nil {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions cmd/protoc-gen-connect-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,19 @@ func generateClientMethod(g *protogen.GeneratedFile, method *protogen.Method, na
g.P("return c.", unexport(method.GoName), ".CallClientStream(ctx)")
case !isStreamingClient && isStreamingServer:
if simple {
g.P("return c.", unexport(method.GoName), ".CallServerStreamSimple(ctx, req)")
g.P("return c.", unexport(method.GoName), ".CallServerStream(ctx, ", connectPackage.Ident("NewRequest"), "(req))")
} else {
g.P("return c.", unexport(method.GoName), ".CallServerStream(ctx, req)")
}
case isStreamingClient && isStreamingServer:
g.P("return c.", unexport(method.GoName), ".CallBidiStream(ctx)")
default:
if simple {
g.P("return c.", unexport(method.GoName), ".CallUnarySimple(ctx, req)")
g.P("response, err := c.", unexport(method.GoName), ".CallUnary(ctx, ", connectPackage.Ident("NewRequest"), "(req))")
g.P("if response != nil {")
g.P("return response.Msg, err")
g.P("}")
g.P("return nil, err")
} else {
g.P("return c.", unexport(method.GoName), ".CallUnary(ctx, req)")
}
Expand Down
56 changes: 41 additions & 15 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,6 @@ func (r *Request[_]) setRequestMethod(method string) {
r.method = method
}

// setHeader sets the request header to the given value.
func (r *Request[_]) setHeader(header http.Header) {
r.header = header
}

// AnyRequest is the common method set of every [Request], regardless of type
// parameter. It's used in unary interceptors.
//
Expand Down Expand Up @@ -287,16 +282,6 @@ func (r *Response[_]) Trailer() http.Header {
return r.trailer
}

// setHeader sets the response header.
func (r *Response[_]) setHeader(header http.Header) {
r.header = header
}

// setTrailer sets the response trailer.
func (r *Response[_]) setTrailer(trailer http.Header) {
r.trailer = trailer
}

// internalOnly implements AnyResponse.
func (r *Response[_]) internalOnly() {}

Expand Down Expand Up @@ -383,6 +368,47 @@ type hasHTTPMethod interface {
getHTTPMethod() string
}

// errStreamingClientConn is a sentinel error implementation of StreamingClientConn.
type errStreamingClientConn struct {
err error
}

func (c *errStreamingClientConn) Receive(msg any) error {
return c.err
}

func (c *errStreamingClientConn) Spec() Spec {
return Spec{}
}

func (c *errStreamingClientConn) Peer() Peer {
return Peer{}
}

func (c *errStreamingClientConn) Send(msg any) error {
return c.err
}

func (c *errStreamingClientConn) CloseRequest() error {
return c.err
}

func (c *errStreamingClientConn) CloseResponse() error {
return c.err
}

func (c *errStreamingClientConn) RequestHeader() http.Header {
return make(http.Header)
}

func (c *errStreamingClientConn) ResponseHeader() http.Header {
return make(http.Header)
}

func (c *errStreamingClientConn) ResponseTrailer() http.Header {
return make(http.Header)
}

// receiveUnaryResponse unmarshals a message from a StreamingClientConn, then
// envelopes the message and attaches headers and trailers. It attempts to
// consume the response stream and isn't appropriate when receiving multiple
Expand Down
Loading