diff --git a/go.mod b/go.mod index 70292c0..8491abb 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.8.4 golang.org/x/net v0.38.0 - golang.org/x/sync v0.12.0 + golang.org/x/sync v0.16.0 google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e google.golang.org/grpc v1.71.0 @@ -22,6 +22,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/text v0.28.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d74e00d..f561d62 100644 --- a/go.sum +++ b/go.sum @@ -752,8 +752,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -839,8 +839,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/examples/fileserver/main.go b/internal/examples/fileserver/main.go index 07e4a95..97a3395 100644 --- a/internal/examples/fileserver/main.go +++ b/internal/examples/fileserver/main.go @@ -18,12 +18,15 @@ import ( "bytes" "context" "flag" + "fmt" "html/template" + "io" "io/fs" "log" "mime" "net/http" "os" + "path" "path/filepath" "connectrpc.com/connect" @@ -31,6 +34,7 @@ import ( testv1 "connectrpc.com/vanguard/internal/gen/vanguard/test/v1" "connectrpc.com/vanguard/internal/gen/vanguard/test/v1/testv1connect" "google.golang.org/genproto/googleapis/api/httpbody" + "google.golang.org/protobuf/types/known/emptypb" ) func main() { @@ -40,10 +44,13 @@ func main() { if err := flagset.Parse(os.Args[1:]); err != nil { log.Fatal(err) } - + fs := os.DirFS(*directory) // Create Connect handler. serviceHandler := &ContentService{ - FS: os.DirFS(*directory), + RWFS: &prefixFS{ + FS: fs, + prefix: *directory, + }, } // And wrap it with Vanguard. service := vanguard.NewService(testv1connect.NewContentServiceHandler(serviceHandler)) @@ -57,6 +64,31 @@ func main() { log.Fatal(http.ListenAndServe(":"+*port, handler)) } +type RWFS interface { + fs.FS + Create(name string) (RWFile, error) +} + +type RWFile interface { + fs.File + Write([]byte) (int, error) +} + +// PrefixFS is something like os.DirFS() +// but now only wraps Create and Open +type prefixFS struct { + fs.FS + prefix string +} + +func (pf *prefixFS) Create(name string) (RWFile, error) { + return os.Create(path.Join(pf.prefix, name)) +} + +func (pf *prefixFS) Open(name string) (fs.File, error) { + return os.Open(path.Join(pf.prefix, name)) +} + var indexHTMLTemplate = template.Must(template.New("http").Parse(` @@ -78,7 +110,9 @@ var indexHTMLTemplate = template.Must(template.New("http").Parse(` type ContentService struct { testv1connect.UnimplementedContentServiceHandler - fs.FS + // Since std io fs.Fs is a read-only fs abstraction + // For some ops like uploading , we need to modify the user file system + RWFS } func (c *ContentService) Index(_ context.Context, req *connect.Request[testv1.IndexRequest]) (*connect.Response[httpbody.HttpBody], error) { @@ -101,7 +135,7 @@ func (c *ContentService) Index(_ context.Context, req *connect.Request[testv1.In var data []byte if !stat.IsDir() { contentType = mime.TypeByExtension(filepath.Ext(name)) - data, err = fs.ReadFile(c.FS, name) + data, err = fs.ReadFile(c.RWFS, name) if err != nil { return nil, err } @@ -113,7 +147,7 @@ func (c *ContentService) Index(_ context.Context, req *connect.Request[testv1.In Title: name, Files: make(map[string]string), } - entries, err := fs.ReadDir(c.FS, name) + entries, err := fs.ReadDir(c.RWFS, name) if err != nil { return nil, err } @@ -132,3 +166,101 @@ func (c *ContentService) Index(_ context.Context, req *connect.Request[testv1.In Data: data, }), nil } + +// Upload impls the connect RPC stream upload mechanism +// Common Usage(curl): +// +// ```bash +// echo "hello from nace" > hello.txt +// curl -X POST --data-binary "@hello.txt" -H "Content-Type: application/octet-stream" localhost:8100/upload_hello.txt:upload +// ``` +func (c *ContentService) Upload( + ctx context.Context, + stream *connect.ClientStream[testv1.UploadRequest], +) (*connect.Response[emptypb.Empty], error) { + if !stream.Receive() { + if err := stream.Err(); err != nil { + return nil, err + } + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("no upload message received")) + } + + msg := stream.Msg() + filename := msg.GetFilename() + + // NOTE: we currently will truncate/overwrite the file if it already exists + file, err := c.RWFS.Create(filename) + if err != nil { + return nil, err + } + defer file.Close() + + // OPEN ME for debugging + // log.Printf("Upload: filename=%q contentType=%q size=%d", + // msg.GetFilename(), + // msg.GetFile().GetContentType(), + // len(msg.GetFile().GetData()), + // ) + + if _, err := file.Write(msg.GetFile().GetData()); err != nil { + return nil, err + } + + for stream.Receive() { + msg := stream.Msg() + // NOTE: The demo currently only impl the same filename uploading + if msg.GetFilename() != filename { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("filename changed during upload: %q", msg.GetFilename())) + } + if _, err := file.Write(msg.GetFile().GetData()); err != nil { + return nil, err + } + } + + if err := stream.Err(); err != nil { + return nil, err + } + + return connect.NewResponse(&emptypb.Empty{}), nil +} + +// Download impls the connect RPC stream download stream +// Common Usage(curl): +// +// ```bash +// curl -X GET -H "Content-Type: application/json" http://localhost:8100/upload_hello.txt:download > download_hello.txt +// ``` +func (c *ContentService) Download( + ctx context.Context, + req *connect.Request[testv1.DownloadRequest], + stream *connect.ServerStream[testv1.DownloadResponse], +) error { + file, err := c.RWFS.Open(req.Msg.Filename) + if err != nil { + return err + } + defer file.Close() + + const largeEnoughSize = 42 * 1024 + + buf := make([]byte, largeEnoughSize) + for { + n, readErr := file.Read(buf) + if n > 0 { + if err := stream.Send(&testv1.DownloadResponse{ + File: &httpbody.HttpBody{ + ContentType: "application/octet-stream", + Data: buf[:n], + }, + }); err != nil { + return err + } + } + if readErr == io.EOF { + return nil + } + if readErr != nil { + return readErr + } + } +}