Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 180 additions & 87 deletions Cargo.lock

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions examples/rust/streams-tcp-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "streams-tcp-client"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true, features = ["net"] }
80 changes: 80 additions & 0 deletions examples/rust/streams-tcp-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use core::time::Duration;

use anyhow::Context as _;
use bytes::Bytes;
use clap::Parser;
use futures::{stream, StreamExt as _};
use tokio::{time, try_join};
use tokio_stream::wrappers::IntervalStream;
use tracing::debug;

mod bindings {
wit_bindgen_wrpc::generate!({
with: {
"wrpc-examples:streams/handler": generate
}
});
}

use bindings::wrpc_examples::streams::handler::{echo, Req};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to invoke `wrpc-examples:streams/handler.echo` on
#[arg(default_value = "[::1]:7762")]
addr: String,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();

let Args { addr } = Args::parse();

let wrpc = wrpc_transport::tcp::Client::from(&addr);

let numbers = Box::pin(
stream::iter(1..)
.take(10)
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| i)
.ready_chunks(10),
);

// `stream<u8>` items are chunked using [`Bytes`]
let bytes = Box::pin(
stream::iter(b"foo bar baz")
.zip(IntervalStream::new(time::interval(Duration::from_secs(1))))
.map(|(i, _)| *i)
.ready_chunks(10)
.map(Bytes::from),
);

let (mut numbers, mut bytes, io) = echo(&wrpc, (), Req { numbers, bytes })
.await
.context("failed to invoke `wrpc-examples:streams/handler.echo`")?;
try_join!(
async {
if let Some(io) = io {
debug!("performing async I/O");
io.await.context("failed to complete async I/O")
} else {
Ok(())
}
},
async {
while let Some(item) = numbers.next().await {
eprintln!("numbers: {item:?}");
}
Ok(())
},
async {
while let Some(item) = bytes.next().await {
eprintln!("bytes: {item:?}");
}
Ok(())
}
)?;
Ok(())
}
4 changes: 4 additions & 0 deletions examples/rust/streams-tcp-client/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[streams]
path = "../../../wit/streams"
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
1 change: 1 addition & 0 deletions examples/rust/streams-tcp-client/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streams = "../../../wit/streams"
17 changes: 17 additions & 0 deletions examples/rust/streams-tcp-client/wit/deps/streams/streams.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wrpc-examples:streams;

interface handler {
record req {
numbers: stream<u64>,
bytes: stream<u8>,
}
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
}

world client {
import handler;
}

world server {
export handler;
}
5 changes: 5 additions & 0 deletions examples/rust/streams-tcp-client/wit/world.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package wrpc-examples:streams-rust-client;

world client {
include wrpc-examples:streams/client;
}
32 changes: 32 additions & 0 deletions examples/rust/streams-tcp-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "streams-tcp-server"
version = "0.1.0"

authors.workspace = true
categories.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bytes = { workspace = true }
clap = { workspace = true, features = [
"color",
"derive",
"error-context",
"help",
"std",
"suggestions",
"usage",
] }
futures = { workspace = true }
tokio = { workspace = true, features = ["net", "rt-multi-thread", "signal"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
] }
wit-bindgen-wrpc = { workspace = true }
wrpc-transport = { workspace = true, features = ["net"] }
119 changes: 119 additions & 0 deletions examples/rust/streams-tcp-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use core::net::SocketAddr;
use core::pin::{pin, Pin};

use std::sync::Arc;

use anyhow::Context as _;
use bytes::Bytes;
use clap::Parser;
use futures::stream::select_all;
use futures::{Stream, StreamExt as _};
use tokio::task::JoinSet;
use tokio::{select, signal};
use tracing::{debug, error, info, warn};

mod bindings {
wit_bindgen_wrpc::generate!({
with: {
"wrpc-examples:streams/handler": generate,
}
});
}

use bindings::exports::wrpc_examples::streams::handler::Req;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Address to serve `wrpc-examples:streams/handler.echo` on
#[arg(default_value = "[::1]:7762")]
addr: String,
}

#[derive(Clone, Copy)]
struct Server;

impl bindings::exports::wrpc_examples::streams::handler::Handler<SocketAddr> for Server {
async fn echo(
&self,
_ctx: SocketAddr,
Req { numbers, bytes }: Req,
) -> anyhow::Result<(
Pin<Box<dyn Stream<Item = Vec<u64>> + Send>>,
Pin<Box<dyn Stream<Item = Bytes> + Send>>,
)> {
Ok((numbers, bytes))
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt().init();

let Args { addr } = Args::parse();

let lis = tokio::net::TcpListener::bind(&addr)
.await
.with_context(|| format!("failed to bind TCP listener on `{addr}`"))?;
let srv = Arc::new(wrpc_transport::Server::default());
let accept = tokio::spawn({
let srv = Arc::clone(&srv);
async move {
loop {
if let Err(err) = srv.accept(&lis).await {
error!(?err, "failed to accept TCP connection");
}
}
}
});

let invocations = bindings::serve(srv.as_ref(), Server)
.await
.context("failed to serve `wrpc-examples:streams/handler.echo`")?;
// NOTE: This will conflate all invocation streams into a single stream via `futures::stream::SelectAll`,
// to customize this, iterate over the returned `invocations` and set up custom handling per export
let mut invocations = select_all(
invocations
.into_iter()
.map(|(instance, name, invocations)| invocations.map(move |res| (instance, name, res))),
);
let shutdown = signal::ctrl_c();
let mut shutdown = pin!(shutdown);
let mut tasks = JoinSet::new();
loop {
select! {
Some((instance, name, res)) = invocations.next() => {
match res {
Ok(fut) => {
debug!(instance, name, "invocation accepted");
tasks.spawn(async move {
if let Err(err) = fut.await {
warn!(?err, "failed to handle invocation");
} else {
info!(instance, name, "invocation successfully handled");
}
});
}
Err(err) => {
warn!(?err, instance, name, "failed to accept invocation");
}
}
}
Some(res) = tasks.join_next() => {
if let Err(err) = res {
error!(?err, "failed to join task")
}
}
res = &mut shutdown => {
accept.abort();
// wait for all invocations to complete
while let Some(res) = tasks.join_next().await {
if let Err(err) = res {
error!(?err, "failed to join task")
}
}
return res.context("failed to listen for ^C")
}
}
}
}
4 changes: 4 additions & 0 deletions examples/rust/streams-tcp-server/wit/deps.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[streams]
path = "../../../wit/streams"
sha256 = "5064bee90ebea73f1695987191fbbfea71ed2dbb69839814009490b4fbe8e96f"
sha512 = "dfca3844d91c6c8e83fefd7b9511a366b464cf69d017c61b671409cb26dc9490a0e59a8e60ef15b77fdeb4fc1b8d9e6efa11c2fb1a1dabd0141e5e6afe8a59b9"
1 change: 1 addition & 0 deletions examples/rust/streams-tcp-server/wit/deps.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
streams = "../../../wit/streams"
17 changes: 17 additions & 0 deletions examples/rust/streams-tcp-server/wit/deps/streams/streams.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package wrpc-examples:streams;

interface handler {
record req {
numbers: stream<u64>,
bytes: stream<u8>,
}
echo: func(r: req) -> (numbers: stream<u64>, bytes: stream<u8>);
}

world client {
import handler;
}

world server {
export handler;
}
5 changes: 5 additions & 0 deletions examples/rust/streams-tcp-server/wit/world.wit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package wrpc-examples:streams-rust-server;

world server {
include wrpc-examples:streams/server;
}
Loading