Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/pubsub/examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ pub async fn run_subscription_samples(

subscription::delete_subscription::sample(&client, &project_id, &id).await?;

let id = random_subscription_id();
subscription_names.push(format!("projects/{project_id}/subscriptions/{id}"));
subscription::create_subscription_exactly_once::sample(&client, &project_id, topic_id, &id)
Comment thread
dbolduc marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some cleanup here to delete the subscription ? The previous samples run the delete sample as the last one, which works as a clean up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see, run_subscription_samples receives a subscription_names list to be filled with things to be deleted later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but you're right that this deserves a comment for context.

My next PR is going to add a comment around the delete subscription line that says it's just for testing a sample, and that all the other deletes are performed by the caller.

.await?;
subscriber::exactly_once::sample(&project_id, &id).await?;

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/pubsub/examples/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod exactly_once;
pub mod quickstart_subscriber;
pub mod subscriber_stream;
60 changes: 60 additions & 0 deletions src/pubsub/examples/src/subscriber/exactly_once.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// [START pubsub_subscriber_exactly_once]
use google_cloud_pubsub::client::Subscriber;
use google_cloud_pubsub::subscriber::handler::Handler;
use std::time::Duration;

pub async fn sample(project_id: &str, subscription_id: &str) -> anyhow::Result<()> {
let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}");
let client = Subscriber::builder().build().await?;
let mut stream = client.subscribe(subscription_name).build();

// Terminate the example after 10 seconds.
let shutdown_token = stream.shutdown_token();
let shutdown = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
shutdown_token.shutdown().await;
});

let mut tasks = Vec::new();
println!("Listening for messages...");
while let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? {
println!("Received message: {m:?}");
tasks.push(tokio::spawn(async move {
// We spawn a background task so that receiving the next message
// is not blocked on the server confirming the ack for this
// message.
//
Comment thread
dbolduc marked this conversation as resolved.
// You might choose to use a work queue in your application.
match h.confirmed_ack().await {
Ok(()) => println!(
"Confirmed ack for message={m:?}. The message will not be redelivered."
),
Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"),
}
}));
}
shutdown.await?;
println!("Done listening for messages");

for t in tasks {
t.await?;
}
println!("Done acking messages");

Ok(())
}
// [END pubsub_subscriber_exactly_once]
1 change: 1 addition & 0 deletions src/pubsub/examples/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
// limitations under the License.

pub mod create_pull_subscription;
pub mod create_subscription_exactly_once;
pub mod delete_subscription;
pub mod list_subscriptions;
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// [START pubsub_create_subscription_with_exactly_once_delivery]
use google_cloud_pubsub::client::SubscriptionAdmin;

pub async fn sample(
client: &SubscriptionAdmin,
project_id: &str,
topic_id: &str,
subscription_id: &str,
) -> anyhow::Result<()> {
let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}");
let topic_name = format!("projects/{project_id}/topics/{topic_id}");

let subscription = client
.create_subscription()
.set_name(subscription_name)
.set_topic(topic_name)
.set_enable_exactly_once_delivery(true)
.send()
.await?;

println!("successfully created subscription {subscription:?}");
Ok(())
}
// [END pubsub_create_subscription_with_exactly_once_delivery]
Loading