diff --git a/src/pubsub/examples/src/lib.rs b/src/pubsub/examples/src/lib.rs index d9510bb93a..6b3f166aaa 100644 --- a/src/pubsub/examples/src/lib.rs +++ b/src/pubsub/examples/src/lib.rs @@ -57,6 +57,12 @@ pub async fn run_subscription_samples( subscription::delete_subscription::sample(&client, &project_id, &id).await?; + let id = random_subscription_id(); + subscription_names.push(format!("projects/{project_id}/subscriptions/{id}")); + subscription::create_subscription_exactly_once::sample(&client, &project_id, topic_id, &id) + .await?; + subscriber::exactly_once::sample(&project_id, &id).await?; + Ok(()) } diff --git a/src/pubsub/examples/src/subscriber.rs b/src/pubsub/examples/src/subscriber.rs index 8d8adcb068..2fa3467d59 100644 --- a/src/pubsub/examples/src/subscriber.rs +++ b/src/pubsub/examples/src/subscriber.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod exactly_once; pub mod quickstart_subscriber; pub mod subscriber_stream; diff --git a/src/pubsub/examples/src/subscriber/exactly_once.rs b/src/pubsub/examples/src/subscriber/exactly_once.rs new file mode 100644 index 0000000000..0aae27640a --- /dev/null +++ b/src/pubsub/examples/src/subscriber/exactly_once.rs @@ -0,0 +1,60 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START pubsub_subscriber_exactly_once] +use google_cloud_pubsub::client::Subscriber; +use google_cloud_pubsub::subscriber::handler::Handler; +use std::time::Duration; + +pub async fn sample(project_id: &str, subscription_id: &str) -> anyhow::Result<()> { + let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}"); + let client = Subscriber::builder().build().await?; + let mut stream = client.subscribe(subscription_name).build(); + + // Terminate the example after 10 seconds. + let shutdown_token = stream.shutdown_token(); + let shutdown = tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(10)).await; + shutdown_token.shutdown().await; + }); + + let mut tasks = Vec::new(); + println!("Listening for messages..."); + while let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? { + println!("Received message: {m:?}"); + tasks.push(tokio::spawn(async move { + // We spawn a background task so that receiving the next message + // is not blocked on the server confirming the ack for this + // message. + // + // You might choose to use a work queue in your application. + match h.confirmed_ack().await { + Ok(()) => println!( + "Confirmed ack for message={m:?}. The message will not be redelivered." + ), + Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"), + } + })); + } + shutdown.await?; + println!("Done listening for messages"); + + for t in tasks { + t.await?; + } + println!("Done acking messages"); + + Ok(()) +} +// [END pubsub_subscriber_exactly_once] diff --git a/src/pubsub/examples/src/subscription.rs b/src/pubsub/examples/src/subscription.rs index 964766e8f4..ec37121a09 100644 --- a/src/pubsub/examples/src/subscription.rs +++ b/src/pubsub/examples/src/subscription.rs @@ -13,5 +13,6 @@ // limitations under the License. pub mod create_pull_subscription; +pub mod create_subscription_exactly_once; pub mod delete_subscription; pub mod list_subscriptions; diff --git a/src/pubsub/examples/src/subscription/create_subscription_exactly_once.rs b/src/pubsub/examples/src/subscription/create_subscription_exactly_once.rs new file mode 100644 index 0000000000..ad8dcfb0c7 --- /dev/null +++ b/src/pubsub/examples/src/subscription/create_subscription_exactly_once.rs @@ -0,0 +1,38 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START pubsub_create_subscription_with_exactly_once_delivery] +use google_cloud_pubsub::client::SubscriptionAdmin; + +pub async fn sample( + client: &SubscriptionAdmin, + project_id: &str, + topic_id: &str, + subscription_id: &str, +) -> anyhow::Result<()> { + let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}"); + let topic_name = format!("projects/{project_id}/topics/{topic_id}"); + + let subscription = client + .create_subscription() + .set_name(subscription_name) + .set_topic(topic_name) + .set_enable_exactly_once_delivery(true) + .send() + .await?; + + println!("successfully created subscription {subscription:?}"); + Ok(()) +} +// [END pubsub_create_subscription_with_exactly_once_delivery]