Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 18 additions & 36 deletions super-agent/src/k8s/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use super::{
error::K8sError::{self},
reflectors::{
DynamicObjectReflector, K8sControllerReflector, ReflectorBuilder, ResourceWithReflector,
},
reflectors::{Reflector, ReflectorBuilder, ResourceWithReflector},
};
use crate::super_agent::config::helm_release_type_meta;
use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, ReplicaSet, StatefulSet};
Expand Down Expand Up @@ -162,7 +160,7 @@ pub struct AsyncK8sClient {

struct Dynamic {
object_api: Api<DynamicObject>,
object_reflector: DynamicObjectReflector,
object_reflector: Reflector<DynamicObject>,
}

struct Reflectors {
Expand All @@ -173,10 +171,10 @@ struct Reflectors {

#[allow(dead_code)]
struct ControllerReflectors {
daemon_sets: K8sControllerReflector<DaemonSet>,
deployments: K8sControllerReflector<Deployment>,
replica_sets: K8sControllerReflector<ReplicaSet>,
stateful_sets: K8sControllerReflector<StatefulSet>,
daemon_sets: Reflector<DaemonSet>,
deployments: Reflector<Deployment>,
replica_sets: Reflector<ReplicaSet>,
stateful_sets: Reflector<StatefulSet>,
}

impl ResourceWithReflector for DaemonSet {}
Expand All @@ -190,8 +188,6 @@ impl AsyncK8sClient {
/// If loading from the inCluster config fail we fall back to kube-config
/// This will respect the `$KUBECONFIG` envvar, but otherwise default to `~/.kube/config`.
/// Not leveraging infer() to check inClusterConfig first
///
///
pub async fn try_new(namespace: String) -> Result<Self, K8sError> {
debug!("trying inClusterConfig for k8s client");

Expand Down Expand Up @@ -231,17 +227,13 @@ impl AsyncK8sClient {
) -> Result<Self, K8sError> {
Self::try_new(namespace.clone())
.await?
.with_reflectors(&namespace, cr_type_metas)
.with_reflectors(cr_type_metas)
.await
}

async fn with_reflectors(
mut self,
namespace: &str,
cr_type_metas: Vec<TypeMeta>,
) -> Result<Self, K8sError> {
async fn with_reflectors(mut self, cr_type_metas: Vec<TypeMeta>) -> Result<Self, K8sError> {
let dynamics = self.create_dynamic_reflectors(cr_type_metas).await?;
let controllers = self.create_controllers(namespace).await?;
let controllers = self.create_controllers().await?;
self.reflectors = Some(Reflectors {
dynamics,
controllers,
Expand Down Expand Up @@ -274,7 +266,7 @@ impl AsyncK8sClient {
tm.to_owned(),
Dynamic {
object_api: Api::default_namespaced_with(self.client.to_owned(), &ar),
object_reflector: reflector_builder.dynamic_object_reflector(&ar).await?,
object_reflector: reflector_builder.try_build_with_api_resource(&ar).await?,
},
);
}
Expand All @@ -284,22 +276,14 @@ impl AsyncK8sClient {
/// Set up reflectors for DaemonSets, Deployments, StatefulSets, and ReplicaSets.
///
/// This function initializes a reflector for each supported resource type within the provided namespace.
async fn create_controllers(&self, namespace: &str) -> Result<ControllerReflectors, K8sError> {
async fn create_controllers(&self) -> Result<ControllerReflectors, K8sError> {
let reflector_builder = ReflectorBuilder::new(self.client.clone());

Ok(ControllerReflectors {
daemon_sets: reflector_builder
.try_new_resource_reflector::<DaemonSet>(namespace)
.await?,
deployments: reflector_builder
.try_new_resource_reflector::<Deployment>(namespace)
.await?,
replica_sets: reflector_builder
.try_new_resource_reflector::<ReplicaSet>(namespace)
.await?,
stateful_sets: reflector_builder
.try_new_resource_reflector::<StatefulSet>(namespace)
.await?,
daemon_sets: reflector_builder.try_build::<DaemonSet>().await?,
deployments: reflector_builder.try_build::<Deployment>().await?,
replica_sets: reflector_builder.try_build::<ReplicaSet>().await?,
stateful_sets: reflector_builder.try_build::<StatefulSet>().await?,
})
}

Expand Down Expand Up @@ -616,7 +600,7 @@ pub(crate) mod test {
kind: "Foo".to_string(),
};
let k = get_mocked_client(Scenario::APIResource)
.with_reflectors("default", vec![tm.clone()])
.with_reflectors(vec![tm.clone()])
.await
.unwrap();

Expand Down Expand Up @@ -652,7 +636,7 @@ pub(crate) mod test {
};

let k = get_mocked_client(Scenario::APIResource)
.with_reflectors("default", vec![tm_not_existing, tm.clone()])
.with_reflectors(vec![tm_not_existing, tm.clone()])
.await
.unwrap();

Expand All @@ -672,10 +656,8 @@ pub(crate) mod test {

#[tokio::test]
async fn test_client_build_with_reflectors_and_get_resources() {
let namespace = "default";

let async_client = get_mocked_client(Scenario::APIResource)
.with_reflectors(namespace, vec![])
.with_reflectors(vec![])
.await
.unwrap();

Expand Down
Loading