diff --git a/super-agent/src/k8s/client.rs b/super-agent/src/k8s/client.rs index 5a0cd14502..b55c46a2fa 100644 --- a/super-agent/src/k8s/client.rs +++ b/super-agent/src/k8s/client.rs @@ -1,8 +1,6 @@ use super::{ error::K8sError::{self}, - reflectors::{ - DynamicObjectReflector, K8sControllerReflector, ReflectorBuilder, ResourceWithReflector, - }, + reflectors::{Reflector, ReflectorBuilder, ResourceWithReflector}, }; use crate::super_agent::config::helm_release_type_meta; use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet}; @@ -162,7 +160,7 @@ pub struct AsyncK8sClient { struct Dynamic { object_api: Api, - object_reflector: DynamicObjectReflector, + object_reflector: Reflector, } struct Reflectors { @@ -173,10 +171,10 @@ struct Reflectors { #[allow(dead_code)] struct ControllerReflectors { - daemon_sets: K8sControllerReflector, - deployments: K8sControllerReflector, - replica_sets: K8sControllerReflector, - stateful_sets: K8sControllerReflector, + daemon_sets: Reflector, + deployments: Reflector, + replica_sets: Reflector, + stateful_sets: Reflector, } impl ResourceWithReflector for DaemonSet {} @@ -190,8 +188,6 @@ impl AsyncK8sClient { /// If loading from the inCluster config fail we fall back to kube-config /// This will respect the `$KUBECONFIG` envvar, but otherwise default to `~/.kube/config`. /// Not leveraging infer() to check inClusterConfig first - /// - /// pub async fn try_new(namespace: String) -> Result { debug!("trying inClusterConfig for k8s client"); @@ -231,17 +227,13 @@ impl AsyncK8sClient { ) -> Result { Self::try_new(namespace.clone()) .await? - .with_reflectors(&namespace, cr_type_metas) + .with_reflectors(cr_type_metas) .await } - async fn with_reflectors( - mut self, - namespace: &str, - cr_type_metas: Vec, - ) -> Result { + async fn with_reflectors(mut self, cr_type_metas: Vec) -> Result { let dynamics = self.create_dynamic_reflectors(cr_type_metas).await?; - let controllers = self.create_controllers(namespace).await?; + let controllers = self.create_controllers().await?; self.reflectors = Some(Reflectors { dynamics, controllers, @@ -274,7 +266,7 @@ impl AsyncK8sClient { tm.to_owned(), Dynamic { object_api: Api::default_namespaced_with(self.client.to_owned(), &ar), - object_reflector: reflector_builder.dynamic_object_reflector(&ar).await?, + object_reflector: reflector_builder.try_build_with_api_resource(&ar).await?, }, ); } @@ -284,22 +276,14 @@ impl AsyncK8sClient { /// Set up reflectors for DaemonSets, Deployments, StatefulSets, and ReplicaSets. /// /// This function initializes a reflector for each supported resource type within the provided namespace. - async fn create_controllers(&self, namespace: &str) -> Result { + async fn create_controllers(&self) -> Result { let reflector_builder = ReflectorBuilder::new(self.client.clone()); Ok(ControllerReflectors { - daemon_sets: reflector_builder - .try_new_resource_reflector::(namespace) - .await?, - deployments: reflector_builder - .try_new_resource_reflector::(namespace) - .await?, - replica_sets: reflector_builder - .try_new_resource_reflector::(namespace) - .await?, - stateful_sets: reflector_builder - .try_new_resource_reflector::(namespace) - .await?, + daemon_sets: reflector_builder.try_build::().await?, + deployments: reflector_builder.try_build::().await?, + replica_sets: reflector_builder.try_build::().await?, + stateful_sets: reflector_builder.try_build::().await?, }) } @@ -616,7 +600,7 @@ pub(crate) mod test { kind: "Foo".to_string(), }; let k = get_mocked_client(Scenario::APIResource) - .with_reflectors("default", vec![tm.clone()]) + .with_reflectors(vec![tm.clone()]) .await .unwrap(); @@ -652,7 +636,7 @@ pub(crate) mod test { }; let k = get_mocked_client(Scenario::APIResource) - .with_reflectors("default", vec![tm_not_existing, tm.clone()]) + .with_reflectors(vec![tm_not_existing, tm.clone()]) .await .unwrap(); @@ -672,10 +656,8 @@ pub(crate) mod test { #[tokio::test] async fn test_client_build_with_reflectors_and_get_resources() { - let namespace = "default"; - let async_client = get_mocked_client(Scenario::APIResource) - .with_reflectors(namespace, vec![]) + .with_reflectors(vec![]) .await .unwrap(); diff --git a/super-agent/src/k8s/reflectors.rs b/super-agent/src/k8s/reflectors.rs index 17d6191bd2..11d600cce5 100644 --- a/super-agent/src/k8s/reflectors.rs +++ b/super-agent/src/k8s/reflectors.rs @@ -30,7 +30,8 @@ use super::error::K8sError; /// - Be clonable, debuggable, and thread-safe (`Send` and `Sync`). /// /// By implementing this trait for various Kubernetes resources like DaemonSet, Deployment, ReplicaSet, -/// and StatefulSet, we ensure that they can be managed using a generic pattern. +/// and StatefulSet, we ensure that they can be managed using a generic pattern. Besides, we ensure +/// that reflectors can only be built for supported types. pub trait ResourceWithReflector: Resource + Clone @@ -43,14 +44,15 @@ pub trait ResourceWithReflector: { } -/// Reflector builder holds the arguments to build a reflector. Its implementation allows creating a reflector. +/// Reflector builder holds the arguments to build a reflector. +/// Its implementation allows creating a reflector for supported types. /// /// ## Example: /// ```ignore -/// // It depends on `kube::Client` and `kube::discovery::ApiResource` -/// let dynamic_object_reflector = reader::ReflectorBuilder::new(client, "namespace") -/// .with_labels("key=value") -/// .dynamic_object_reflector(api_resource); +/// // We cannot run the example because of the dependencies +/// let builder = reflectors::ReflectorBuilder::new(client); +/// let dynamic_object_reflector = builder.try_build_with_api_resource(api_resource).unwrap(); +/// let deployment_reflector = builder.try_build::().unwrap(); /// ``` pub struct ReflectorBuilder { client: Client, @@ -63,134 +65,63 @@ impl ReflectorBuilder { } /// Builds the DynamicObject reflector using the builder. - pub async fn dynamic_object_reflector( + /// + /// # Arguments + /// * `api_resource` - The [ApiResource] corresponding to the required [DynamicObject]. + /// + /// # Returns + /// Returns the newly built reflector or an error. + + pub async fn try_build_with_api_resource( &self, api_resource: &ApiResource, - ) -> Result { + ) -> Result, K8sError> { // The api consumes the client, so it needs to be owned to allow sharing the builder. let api: Api = Api::default_namespaced_with(self.client.to_owned(), api_resource); + // Initialize the writer for the dynamic type. let writer: Writer = reflector::store::Writer::new(api_resource.to_owned()); - // Selectors need to be cloned since the builder could create more reflectors. - let wc = watcher::Config { - ..Default::default() - }; - - DynamicObjectReflector::new(api, writer, wc).await + Reflector::try_new(api, writer, self.watcher_config()).await } - /// Builds a generic resource reflector using the builder. + /// Builds a reflector using the builder. /// /// # Type Parameters - /// * `K` - Kubernetes resource type implementing the required traits. - /// - /// # Arguments - /// * `namespace` - Namespace in which the resource is located. + /// * `K` - Kubernetes resource type implementing the required trait. /// /// # Returns - /// Returns the newly built generic reflector or an error. - pub async fn try_new_resource_reflector( - &self, - namespace: &str, - ) -> Result, K8sError> + /// Returns the newly built reflector or an error. + pub async fn try_build(&self) -> Result, K8sError> where K: ResourceWithReflector, { - // Create an API instance for the resource type - let api: Api = Api::namespaced(self.client.clone(), namespace); + // Create an API instance for the resource type. + let api: Api = Api::default_namespaced(self.client.clone()); - // Initialize the writer for the resource type + // Initialize the writer for the resource type. let writer: Writer = reflector::store::Writer::default(); - // Clone field and label selectors - let wc = watcher::Config { - ..Default::default() - }; - - // Create and return the generic resource reflector - K8sControllerReflector::new(api, writer, wc).await + Reflector::try_new(api, writer, self.watcher_config()).await } -} - -/// DynamicObjectReflector wraps kube-rs reflectors using any kubernetes object (DynamicObject). -/// The reflector consists of a writer (`reflector::store::Writer`) which reflects all k8s events by means of -/// a watcher, and readers (`reflector::store::Store`) which provide an efficient way to query the corresponding -/// objects. -/// It starts a new routine to start the watcher and keep the reader updated which will be stopped on reflector's drop, -/// and it also holds a reference to a reader which can be safely cloned during the reflector's lifetime. -pub struct DynamicObjectReflector { - reader: reflector::Store, - writer_close_handle: AbortHandle, -} - -impl DynamicObjectReflector { - /// Creates a DynamicObjectReflector (used by [ReflectorBuilder]). - async fn new( - api: Api, - writer: Writer, - wc: watcher::Config, - ) -> Result { - let reader = writer.as_reader(); - let writer_close_handle = start_reflector(api, wc, writer).abort_handle(); - - reader.wait_until_ready().await?; // TODO: should we implement a timeout? - - Ok(DynamicObjectReflector { - reader, - writer_close_handle, - }) - } - - /// Returns a copy of the reader. - // TODO: we are cloning it for now, but we need to check what's the best approach considering its usage. - // We may include additional methods using the reader instead of exposing a copy. - pub fn reader(&self) -> reflector::Store { - self.reader.clone() - } -} -impl Drop for DynamicObjectReflector { - // Abort the reflector's writer task when it drops. - fn drop(&mut self) { - self.writer_close_handle.abort(); + /// Returns the watcher_config to use in reflectors + pub fn watcher_config(&self) -> watcher::Config { + Default::default() } } -fn start_reflector(api: Api, wc: watcher::Config, writer: Writer) -> JoinHandle<()> -where - K: kube::core::Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static, - K::DynamicType: Eq + std::hash::Hash + Clone, -{ - //let writer = writer_builder(); - tokio::spawn(async move { - watcher(api, wc) - // The watcher recovers automatically from api errores, the backoff could be customized. - .default_backoff() - // All changes are reflected into the writer. - .reflect(writer) - // We need to query the events to start the watcher. - .touched_objects() - .for_each(|o| { - if let Some(e) = o.err() { - warn!("Recoverable error watching k8s events: {}", e) - } - future::ready(()) - }) - .await // The watcher runs indefinitely. - }) -} - /// A generic Kubernetes reflector for resources that implement the `k8s_openapi::Resource` trait and have a namespace scope. /// The `K8sControllerReflector` works by keeping an internal reader-writer pair: /// - The reader keeps a read-only cache of Kubernetes objects. /// - The writer continuously updates the cache based on the API stream. /// /// The writer's async task is aborted when the reflector is dropped. -pub struct K8sControllerReflector +pub struct Reflector where - K: ResourceWithReflector, + K: kube::core::Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, { /// The read-only store that maintains a cache of Kubernetes objects of type `K`. reader: reflector::Store, @@ -198,21 +129,26 @@ where writer_close_handle: AbortHandle, } -impl K8sControllerReflector +impl Reflector where - K: ResourceWithReflector, + K: kube::core::Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, { - /// Creates a new `K8sControllerReflector` using the specified API, writer, and watcher config. + /// Creates a new [Reflector] using the specified API, writer, and watcher config. /// /// The function awaits until the cache is fully ready to serve objects. - /// Returns a `Result` with either the initialized `K8sControllerReflector` or an error. - async fn new(api: Api, writer: Writer, wc: watcher::Config) -> Result { + /// Returns a `Result` with either the initialized [Reflector] or an error. + async fn try_new( + api: Api, + writer: Writer, + wc: watcher::Config, + ) -> Result { let reader = writer.as_reader(); - let writer_close_handle = start_reflector(api, wc, writer).abort_handle(); + let writer_close_handle = Self::start_reflector(api, wc, writer).abort_handle(); reader.wait_until_ready().await?; // TODO: should we implement a timeout? - Ok(K8sControllerReflector { + Ok(Reflector { reader, writer_close_handle, }) @@ -222,11 +158,33 @@ where pub fn reader(&self) -> reflector::Store { self.reader.clone() } + + /// Spawns a tokio task waiting for events and updating the provided writer. + /// Returns the task [JoinHandle<()>]. + fn start_reflector(api: Api, wc: watcher::Config, writer: Writer) -> JoinHandle<()> { + tokio::spawn(async move { + watcher(api, wc) + // The watcher recovers automatically from api errors, the backoff could be customized. + .default_backoff() + // All changes are reflected into the writer. + .reflect(writer) + // We need to query the events to start the watcher. + .touched_objects() + .for_each(|o| { + if let Some(e) = o.err() { + warn!("Recoverable error watching k8s events: {}", e) + } + future::ready(()) + }) + .await // The watcher runs indefinitely. + }) + } } -impl Drop for K8sControllerReflector +impl Drop for Reflector where - K: ResourceWithReflector, + K: kube::core::Resource + Clone + DeserializeOwned + Debug + Send + Sync + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, { /// When dropped, abort the writer task to ensure proper cleanup. fn drop(&mut self) { @@ -238,7 +196,6 @@ where mod test { use super::*; use k8s_openapi::api::apps::v1::Deployment; - use kube::core::GroupVersionKind; use tokio::sync::oneshot::{channel, Sender}; async fn mocked_writer_task(_send: Sender<()>) { @@ -250,30 +207,13 @@ mod test { #[tokio::test] async fn test_reflector_abort_writer_on_drop() { - let gvk = GroupVersionKind::gvk("test.group", "v1", "TestKind"); - // Mocked reader and writer - let writer = reflector::store::Writer::new(ApiResource::from_gvk(&gvk)); - let reader = writer.as_reader(); - let (send, recv) = channel(); - let reflector = DynamicObjectReflector { - reader, - writer_close_handle: tokio::spawn(mocked_writer_task(send)).abort_handle(), - }; - // When the reflector is dropped, it should abort the `writer_task`. Consequently, the channel's receiver - // finished with error . - drop(reflector); - assert!(recv.await.is_err()); - } - - #[tokio::test] - async fn test_generic_reflector_abort_writer_on_drop() { // Create a writer and store using `()`, as `Deployment` has no dynamic type. let (_store, writer) = reflector::store::store::(); let reader = writer.as_reader(); let (send, recv) = channel(); - let reflector = K8sControllerReflector { + let reflector = Reflector { reader, writer_close_handle: tokio::spawn(mocked_writer_task(send)).abort_handle(), };