-
Notifications
You must be signed in to change notification settings - Fork 123
docs(pubsub): add exactly-once samples #5442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,12 @@ pub async fn run_subscription_samples( | |
|
|
||
| subscription::delete_subscription::sample(&client, &project_id, &id).await?; | ||
|
|
||
| let id = random_subscription_id(); | ||
| subscription_names.push(format!("projects/{project_id}/subscriptions/{id}")); | ||
| subscription::create_subscription_exactly_once::sample(&client, &project_id, topic_id, &id) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need some cleanup here to delete the subscription ? The previous samples run the delete sample as the last one, which works as a clean up.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I see,
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, but you're right that this deserves a comment for context. My next PR is going to add a comment around the delete subscription line that says it's just for testing a sample, and that all the other deletes are performed by the caller. |
||
| .await?; | ||
| subscriber::exactly_once::sample(&project_id, &id).await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| // Copyright 2026 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| // [START pubsub_subscriber_exactly_once] | ||
| use google_cloud_pubsub::client::Subscriber; | ||
| use google_cloud_pubsub::subscriber::handler::Handler; | ||
| use std::time::Duration; | ||
|
|
||
| pub async fn sample(project_id: &str, subscription_id: &str) -> anyhow::Result<()> { | ||
| let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}"); | ||
| let client = Subscriber::builder().build().await?; | ||
| let mut stream = client.subscribe(subscription_name).build(); | ||
|
|
||
| // Terminate the example after 10 seconds. | ||
| let shutdown_token = stream.shutdown_token(); | ||
| let shutdown = tokio::spawn(async move { | ||
| tokio::time::sleep(Duration::from_secs(10)).await; | ||
| shutdown_token.shutdown().await; | ||
| }); | ||
|
|
||
| let mut tasks = Vec::new(); | ||
| println!("Listening for messages..."); | ||
| while let Some((m, Handler::ExactlyOnce(h))) = stream.next().await.transpose()? { | ||
| println!("Received message: {m:?}"); | ||
| tasks.push(tokio::spawn(async move { | ||
| // We spawn a background task so that receiving the next message | ||
| // is not blocked on the server confirming the ack for this | ||
| // message. | ||
| // | ||
|
dbolduc marked this conversation as resolved.
|
||
| // You might choose to use a work queue in your application. | ||
| match h.confirmed_ack().await { | ||
| Ok(()) => println!( | ||
| "Confirmed ack for message={m:?}. The message will not be redelivered." | ||
| ), | ||
| Err(e) => println!("Failed to confirm ack for message={m:?} with error={e:?}"), | ||
| } | ||
| })); | ||
| } | ||
| shutdown.await?; | ||
| println!("Done listening for messages"); | ||
|
|
||
| for t in tasks { | ||
| t.await?; | ||
| } | ||
| println!("Done acking messages"); | ||
|
|
||
| Ok(()) | ||
| } | ||
| // [END pubsub_subscriber_exactly_once] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| // Copyright 2026 Google LLC | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| // [START pubsub_create_subscription_with_exactly_once_delivery] | ||
| use google_cloud_pubsub::client::SubscriptionAdmin; | ||
|
|
||
| pub async fn sample( | ||
| client: &SubscriptionAdmin, | ||
| project_id: &str, | ||
| topic_id: &str, | ||
| subscription_id: &str, | ||
| ) -> anyhow::Result<()> { | ||
| let subscription_name = format!("projects/{project_id}/subscriptions/{subscription_id}"); | ||
| let topic_name = format!("projects/{project_id}/topics/{topic_id}"); | ||
|
|
||
| let subscription = client | ||
| .create_subscription() | ||
| .set_name(subscription_name) | ||
| .set_topic(topic_name) | ||
| .set_enable_exactly_once_delivery(true) | ||
| .send() | ||
| .await?; | ||
|
|
||
| println!("successfully created subscription {subscription:?}"); | ||
| Ok(()) | ||
| } | ||
| // [END pubsub_create_subscription_with_exactly_once_delivery] |
Uh oh!
There was an error while loading. Please reload this page.