diff --git a/.gitignore b/.gitignore index 440facec..50de7c31 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ /.vscode /.idea/ *.iml + +/healthscope/target \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 65f4fca8..cf1c85cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,27 @@ dependencies = [ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "healthscope" +version = "0.1.0" +dependencies = [ + "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.28 (registry+https://github.com/rust-lang/crates.io-index)", + "k8s-openapi 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "kube 0.16.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.9.19 (registry+https://github.com/rust-lang/crates.io-index)", + "scylla 0.1.0", + "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "spectral 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "http" version = "0.1.17" @@ -507,7 +528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "k8s-openapi" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -531,7 +552,7 @@ dependencies = [ [[package]] name = "kube" -version = "0.15.1" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -540,7 +561,7 @@ dependencies = [ "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "k8s-openapi 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "k8s-openapi 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "openssl 0.10.21 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.19 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1183,8 +1204,8 @@ dependencies = [ "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.12.28 (registry+https://github.com/rust-lang/crates.io-index)", - "k8s-openapi 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "kube 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", + "k8s-openapi 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "kube 0.16.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1764,9 +1785,9 @@ dependencies = [ "checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f" -"checksum k8s-openapi 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "67f1f42d95221841fc66fd949591ad8ecbb133274aa739222fd06cef2cf894ef" +"checksum k8s-openapi 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "30e2ac1496d5920d157d0eb5ab453b0af1e6622d5ffa8e7f9c60d435d13342da" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -"checksum kube 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)" = "93ccc57b64dfb98b9f3eda0a8e0eac2bf16cec7d64cbc8f1658bc4b5953f2317" +"checksum kube 0.16.1 (registry+https://github.com/rust-lang/crates.io-index)" = "999e096714140913deb79850f33a1cd99c38acf0824305604456ccd0bc2c6cde" "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" "checksum lazycell 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b294d6fa9ee409a054354afc4352b0b9ef7ca222c69b8812cbea9e7d2bf3783f" "checksum libc 0.2.54 (registry+https://github.com/rust-lang/crates.io-index)" = "c6785aa7dd976f5fbf3b71cfd9cd49d7f783c1ff565a858d71031c6c313aa5c6" diff --git a/Cargo.toml b/Cargo.toml index 6a2bab9d..944581e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,17 +5,22 @@ authors = ["Matt Butcher ", "Jianbo Sun "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +kube = { version = "0.16.1", features = ["openapi"] } +k8s-openapi = { version = "0.5.1", features = ["v1_15"] } +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +failure = "0.1.5" +futures = "0.1.21" +spectral = "0.6" +reqwest = "0.9.17" +log = "0.4" +env_logger = "0.6.1" +hyper = "0.12" +clap = "~2.33" +chrono = "0.4" +scylla = { path = '../' } diff --git a/healthscope/README.md b/healthscope/README.md new file mode 100644 index 00000000..8bb9c355 --- /dev/null +++ b/healthscope/README.md @@ -0,0 +1,12 @@ +# Health Scope Controller + +Health Scope Controller used for periodically check health scope crd and check the health of all the components related. + +## What will health scope controller do? + +1. periodically check all component health and update the CR status. +2. serve as a http server, to output aggregated health information. + +## How to install? + +1. \ No newline at end of file diff --git a/healthscope/src/lib.rs b/healthscope/src/lib.rs new file mode 100644 index 00000000..0b0e1071 --- /dev/null +++ b/healthscope/src/lib.rs @@ -0,0 +1,3 @@ +extern crate chrono; +extern crate futures; +extern crate scylla; diff --git a/healthscope/src/main.rs b/healthscope/src/main.rs new file mode 100644 index 00000000..2c22bfc6 --- /dev/null +++ b/healthscope/src/main.rs @@ -0,0 +1,368 @@ +use chrono::{DateTime, Utc}; +use clap::{App, Arg}; +use env_logger; +use failure::{format_err, Error}; +use futures::task::{current, Task}; +use futures::{future, Async}; +use hyper::rt::Future; +use hyper::service::{service_fn, service_fn_ok}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use kube::api::{ListParams, ObjectList, RawApi}; +use kube::{client::APIClient, config::incluster_config, config::load_kube_config}; +use log::{debug, error, info}; +use scylla::instigator::{combine_name, CONFIG_GROUP, CONFIG_VERSION}; +use scylla::schematic::component_instance::KubeComponentInstance; +use scylla::schematic::scopes::health::{ + ComponentInfo, HealthScopeObject, HealthStatus, HEALTH_SCOPE_CRD, HEALTH_SCOPE_GROUP, + HEALTH_SCOPE_VERSION, +}; + +const DEFAULT_NAMESPACE: &str = "default"; +const DEFAULT_PROBE_INTERVAL: i64 = 30; + +fn kubeconfig() -> kube::Result { + // If env var is set, use in cluster config + if std::env::var("KUBERNETES_PORT").is_ok() { + return incluster_config(); + } + load_kube_config() +} + +fn main() -> Result<(), Error> { + let flags = App::new("healthscope") + .version(env!("CARGO_PKG_VERSION")) + .arg( + Arg::with_name("metrics-addr") + .short("m") + .long("metrics-addr") + .default_value(":8080") + .help("The address the metric endpoint binds to."), + ) + .arg( + Arg::with_name("addr") + .short("p") + .long("endpoint-address") + .default_value(":80") + .help("The address the health scope endpoint binds to."), + ) + .get_matches(); + let metrics_addr = "0.0.0.0".to_owned() + flags.value_of("metrics-addr").unwrap(); + let endpoint_addr = "0.0.0.0".to_owned() + flags.value_of("addr").unwrap(); + + env_logger::init(); + info!("starting server"); + + let top_ns = std::env::var("KUBERNETES_NAMESPACE").unwrap_or_else(|_| DEFAULT_NAMESPACE.into()); + let top_cfg = kubeconfig().expect("Load default kubeconfig"); + + let cfg_watch = top_cfg.clone(); + + let health_scope_watch = std::thread::spawn(move || { + let ns = top_ns.clone(); + let healthscope_resource = RawApi::customResource("healthscopes") + .version("v1alpha1") + .group("core.hydra.io") + .within(ns.as_str()); + let client = APIClient::new(cfg_watch); + let mut cnt = 0; + loop { + let req = healthscope_resource.list(&ListParams::default())?; + match client.request::>(req) { + Ok(health_scopes) => { + for scope in health_scopes.items { + if let Err(res) = aggregate_health(&client, scope, ns.clone()) { + // Log the error and continue. + error!("Error processing event: {:?}", res) + }; + } + } + Err(e) => log::error!("get health scope list err {}", e), + } + cnt = (cnt + 1) % 10; + if cnt == 0 { + debug!("health scope aggregate loop running..."); + } + //FIXME: we could change this to use an informer if we have a runtime controller queue + std::thread::sleep(std::time::Duration::from_secs(1)); + } + }); + + let server = std::thread::spawn(move || { + let addr = endpoint_addr.parse().unwrap(); + info!("Server is running on {}", addr); + hyper::rt::run( + Server::bind(&addr) + .serve(move || service_fn(serve_health)) + .map_err(|e| eprintln!("server error: {}", e)), + ); + }); + + std::thread::spawn(move || { + let addr = metrics_addr.parse().unwrap(); + info!("Health server is running on {}", addr); + hyper::rt::run( + Server::bind(&addr) + .serve(|| { + service_fn_ok(|_req| match (_req.method(), _req.uri().path()) { + (&Method::GET, "/health") => { + debug!("health check"); + Response::new(Body::from("OK")) + } + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("")) + .unwrap(), + }) + }) + .map_err(|e| eprintln!("health server error: {}", e)), + ); + }) + .join() + .unwrap(); + + server.join().unwrap(); + health_scope_watch.join().unwrap() +} + +use std::{ + sync::{Arc, Mutex}, + thread, +}; +pub struct HealthFuture { + shared_state: Arc>, +} + +/// Shared state between the future and the waiting thread +struct SharedState { + /// Whether or not the sleep time has elapsed + completed: bool, + resp: String, + task: Option, +} + +impl Future for HealthFuture { + type Item = Response; + type Error = hyper::Error; + fn poll(&mut self) -> futures::Poll, hyper::Error> { + // Look at the shared state to see if the timer has already completed. + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.completed { + Ok(Async::Ready(Response::new(Body::from( + shared_state.resp.clone(), + )))) + } else { + shared_state.task = Some(current()); + Ok(Async::NotReady) + } + } +} + +impl HealthFuture { + /// Create a new `TimerFuture` which will complete after the provided + /// timeout. + pub fn new(instance: String) -> Self { + let shared_state = Arc::new(Mutex::new(SharedState { + completed: false, + task: None, + resp: String::new(), + })); + + // Spawn the new thread + let thread_shared_state = shared_state.clone(); + thread::spawn(move || { + let res = match request_health(instance) { + Ok(status) => status.clone(), + Err(err) => { + error!("{:?}", err); + format!("{}", err) + } + }; + + let mut shared_state = thread_shared_state.lock().unwrap(); + // Signal that the request has completed and wake up the last + // task on which the future was polled, if one exists. + shared_state.completed = true; + shared_state.resp = res; + if let Some(ref task) = shared_state.task { + task.notify() + } + }); + + HealthFuture { shared_state } + } +} + +type BoxFut = Box, Error = hyper::Error> + Send>; +fn serve_health(req: Request) -> BoxFut { + let mut response = Response::new(Body::empty()); + let path = req.uri().path().to_owned(); + match (req.method(), path) { + (&Method::GET, path) => { + let instance = path.trim_start_matches('/').to_string(); + debug!("{} health scope requested", instance); + return Box::new(HealthFuture::new(instance)); + } + _ => *response.status_mut() = StatusCode::NOT_FOUND, + } + Box::new(future::ok(response)) +} + +fn request_health(instance_name: String) -> Result { + let namespace = + std::env::var("KUBERNETES_NAMESPACE").unwrap_or_else(|_| DEFAULT_NAMESPACE.into()); + let cfg = kubeconfig().unwrap(); + println!( + "cfg {:?}, instance {}", + cfg.base_path.clone(), + instance_name + ); + let client = &(APIClient::new(cfg)); + println!("client namespace {}", namespace.clone()); + let healthscope_resource = RawApi::customResource("healthscopes") + .version("v1alpha1") + .group("core.hydra.io") + .within(namespace.as_str()); + let req = healthscope_resource.get(instance_name.as_str())?; + let obj = client.request::(req)?; + let mut health = "healthy"; + obj.status.map(|status| { + status.clone().components.map(|comps| { + comps.iter().for_each(|c| { + if let Some(real_status) = c.status.as_ref() { + if real_status != "healthy" { + health = "unhealthy" + } + }; + }) + }) + }); + Ok(health.to_string()) +} + +fn aggregate_health( + client: &APIClient, + mut event: HealthScopeObject, + namespace: String, +) -> Result<(), Error> { + let interval = event.spec.probe_interval.unwrap_or(DEFAULT_PROBE_INTERVAL); + if !time_to_aggregate(event.status.clone(), interval) { + return Ok(()); + } + info!("start to probe instance: {}", event.metadata.name); + match ( + event.spec.probe_method.as_str(), + event.spec.probe_endpoint.as_str(), + ) { + ("kube-get", ".status") => { + let components = + event + .status + .and_then(|status| status.components) + .and_then(|mut components| { + for c in components.iter_mut() { + c.status = Some(get_health_from_component( + client, + c.clone(), + namespace.clone(), + )) + } + Some(components) + }); + event.status = Some(HealthStatus { + components, + last_aggregate_timestamp: Some(Utc::now().to_rfc3339()), + }); + let pp = kube::api::PatchParams::default(); + let healthscope_resource = RawApi::customResource(HEALTH_SCOPE_CRD) + .version(HEALTH_SCOPE_VERSION) + .group(HEALTH_SCOPE_GROUP) + .within(namespace.as_str()); + let req = healthscope_resource.patch( + event.metadata.clone().name.as_str(), + &pp, + serde_json::to_vec(&event)?, + )?; + client.request::(req)?; + Ok(()) + } + _ => Err(format_err!( + "unknown probe-method {} and probe_endpoint {}", + event.spec.probe_method, + event.spec.probe_endpoint + )), + } +} + +fn get_health_from_component(client: &APIClient, info: ComponentInfo, namespace: String) -> String { + let name = combine_name(info.name, info.instance_name); + let crd_req = RawApi::customResource("componentinstances") + .group(CONFIG_GROUP) + .version(CONFIG_VERSION) + .within(namespace.as_str()); + let req = crd_req.get(name.as_str()).unwrap(); + let res: KubeComponentInstance = match client.request(req) { + Ok(ins) => ins, + Err(e) => { + error!("get component instance failed {}", e); + return "unhealthy".to_string(); + } + }; + res.status.unwrap_or_else(|| "unhealthy".to_string()) +} + +fn time_to_aggregate(status: Option, interval: i64) -> bool { + if interval <= 0 { + return true; + } + if status.is_none() || status.clone().unwrap().last_aggregate_timestamp.is_none() { + return true; + } + let last_aggregate_time = status.unwrap().last_aggregate_timestamp.unwrap(); + let last_time = match DateTime::parse_from_rfc3339(last_aggregate_time.as_str()) { + Ok(last) => last, + Err(e) => { + error!("parse last time err {}", e); + return true; + } + }; + let sys_time = Utc::now(); + let duration = sys_time.signed_duration_since(last_time); + if duration.num_seconds() >= interval { + return true; + } + false +} + +#[cfg(test)] +mod test { + use crate::{request_health, time_to_aggregate}; + use chrono::{Duration, Utc}; + use scylla::schematic::scopes::health::HealthStatus; + + #[test] + fn test_time_to_action() { + assert_eq!(time_to_aggregate(None, 10), true); + let status = Some(HealthStatus { + last_aggregate_timestamp: None, + ..Default::default() + }); + assert_eq!(time_to_aggregate(status, 10), true); + let status = Some(HealthStatus { + last_aggregate_timestamp: Some( + Utc::now() + .checked_sub_signed(Duration::seconds(11)) + .unwrap() + .to_rfc3339(), + ), + ..Default::default() + }); + assert_eq!(time_to_aggregate(status.clone(), 10), true); + assert_eq!(time_to_aggregate(status.clone(), 15), false); + assert_eq!(time_to_aggregate(status.clone(), 0), true); + } + #[test] + fn test_request_health() { + let res = request_health("my-health-scope".to_string()).unwrap(); + assert_eq!(res, "healthy".to_string()) + } +} diff --git a/src/instigator.rs b/src/instigator.rs index 2b82a643..10845e12 100644 --- a/src/instigator.rs +++ b/src/instigator.rs @@ -5,14 +5,15 @@ use log::{debug, info}; use serde_json::json; use std::collections::BTreeMap; +use crate::schematic::variable::Variable; use crate::{ lifecycle::Phase, schematic::{ component::Component, component_instance::KubeComponentInstance, - configuration::ApplicationConfiguration, - configuration::ComponentConfiguration, + configuration::{ApplicationConfiguration, ComponentConfiguration, ScopeBinding}, parameter::{resolve_parameters, resolve_values, ParameterValue}, + scopes::{self, Health, Network, OAMScope}, variable::{get_variable_values, resolve_variables}, OAMStatus, Status, }, @@ -34,7 +35,7 @@ pub const COMPONENT_RECORD_ANNOTATION: &str = "component_record_annotation"; /// Type alias for the results that all instantiation operations return pub type InstigatorResult = Result<(), Error>; -type OpResource = Object; +pub type OpResource = Object; type ParamMap = BTreeMap; /// This error is returned when a component cannot be found. @@ -125,7 +126,18 @@ impl Instigator { component.name.clone(), status.clone() ); - + let mut health_state = "healthy".to_string(); + for (_, v) in status.clone() { + if v != "running" && v != "created" && v != "succeeded" { + health_state = "unhealthy".to_string(); + break; + } + } + self.component_instance_set_status( + component.name.clone(), + inst_name.clone(), + health_state, + )?; // Load all of the traits related to this component. let mut trait_manager = TraitManager { config_name: name.clone(), @@ -173,10 +185,44 @@ impl Instigator { /// The workhorse for Instigator. /// This will execute only Add, Modify, and Delete phases. fn exec(&self, event: OpResource, mut phase: Phase) -> InstigatorResult { - // TODO: - // - Resolve scope bindings let name = event.metadata.name.clone(); + let variables = event.spec.variables.clone().unwrap_or_else(|| vec![]); let owner_ref = config_owner_reference(name.clone(), event.metadata.uid.clone())?; + if event.spec.scopes.is_some() { + let scopes = load_scopes( + self.client.clone(), + self.namespace.clone(), + name.clone(), + event.spec.clone(), + variables.clone(), + )?; + match phase { + Phase::Add => { + for sc in scopes.iter() { + sc.create(owner_ref.clone())?; + } + } + Phase::Modify => { + for sc in scopes.iter() { + sc.modify()?; + } + } + Phase::Delete => { + for sc in scopes.iter() { + sc.delete()?; + } + } + _ => { + return Err(format_err!( + "unknown phase for scopes {:?} on {}", + phase.clone(), + name.clone(), + )) + } + } + // according to the spec, if this is not empty, this AppConfig will be scope instance only, so we could return after we resolved scopes. + return Ok(()); + } let record_ann = event.metadata.annotations.get(COMPONENT_RECORD_ANNOTATION); let mut last_components = get_record_annotation(record_ann)?; @@ -210,16 +256,39 @@ impl Instigator { } else if record.is_none() && phase == Phase::Modify { phase = Phase::Add } + let mut scope_overlap = BTreeMap::new(); + // TODO: if we don't manually add scopes, there are default scopes should be bind + for sc in &component + .application_scopes + .clone() + .unwrap_or_else(|| vec![]) + { + let scopes = + get_scope_instance(sc.clone(), self.namespace.clone(), self.client.clone())?; + for scope in scopes.iter() { + if !scope.allow_overlap() { + if scope_overlap.get(&scope.scope_type()).is_some() { + return Err(format_err!( + "scope {} {} do not allow overlap", + sc.clone(), + scope.scope_type() + )); + } + scope_overlap.insert(scope.scope_type(), true); + } + scope.add(component.clone())?; + } + } component_updated = true; // Resolve variables/parameters - let variables = event.spec.variables.clone().unwrap_or_else(|| vec![]); + let parent = get_variable_values(Some(variables.clone())); let child = component .parameter_values .clone() - .map(|values| resolve_variables(values, variables)) + .map(|values| resolve_variables(values, variables.clone())) .unwrap_or_else(|| Ok(vec![]))?; let params = resolve_parameters( @@ -239,26 +308,27 @@ impl Instigator { component.name.clone(), inst_name.clone(), ); - if ownref.is_err() { - let e = ownref.unwrap_err().to_string(); - if !e.contains("NotFound") { - // Wrap the error to make it clear where we failed - // During deletion, this might indicate that something else - // remove the component instance. - return Err(format_err!( - "{:?} on {}: {}", - phase.clone(), + match ownref { + Err(err) => { + let e = err.to_string(); + if !e.contains("NotFound") { + // Wrap the error to make it clear where we failed + // During deletion, this might indicate that something else + // remove the component instance. + return Err(format_err!( + "{:?} on {}: {}", + phase.clone(), + inst_name.clone(), + e + )); + } + Some(self.create_component_instance( + component.name.clone(), inst_name.clone(), - e - )); + owner_ref.clone(), + )?) } - Some(self.create_component_instance( - component.name.clone(), - inst_name.clone(), - owner_ref.clone(), - )?) - } else { - Some(ownref.unwrap()) + Ok(own) => Some(own), } } _ => None, @@ -364,6 +434,17 @@ impl Instigator { )?; //delete component instance and let owner_reference to delete real resource self.delete_component_instance(component.name.clone(), inst_name.clone())?; + for sc in &component + .application_scopes + .clone() + .unwrap_or_else(|| vec![]) + { + let scopes = + get_scope_instance(sc.clone(), self.namespace.clone(), self.client.clone())?; + for scope in scopes.iter() { + scope.remove(component.clone())?; + } + } } // if no component was updated or this is an delete phase, just return without status change. if !component_updated || phase == Phase::Delete { @@ -582,6 +663,29 @@ impl Instigator { }; Ok(vec![owner]) } + + fn component_instance_set_status( + &self, + component_name: String, + instance_name: String, + status: String, + ) -> Result<(), Error> { + let name = combine_name(component_name, instance_name); + let crd_req = RawApi::customResource("componentinstances") + .group(CONFIG_GROUP) + .version(CONFIG_VERSION) + .within(self.namespace.as_str()); + let req = crd_req.get(name.as_str())?; + let mut res: KubeComponentInstance = self.client.request(req)?; + res.status = Some(status); + let req = crd_req.patch( + name.as_str(), + &PatchParams::default(), + serde_json::to_vec(&res)?, + )?; + let _: KubeComponentInstance = self.client.request(req)?; + Ok(()) + } } /// combine_name combine component name with instance_name, @@ -647,3 +751,118 @@ pub fn get_component_def( pub fn get_values(values: Option>) -> Vec { values.or_else(|| Some(vec![])).unwrap() } + +/// Load application scopes from scope_bindings +/// check if there is not allowed overlap +pub fn load_scopes( + client: APIClient, + namespace: String, + instance_name: String, + spec: ApplicationConfiguration, + variables: Vec, +) -> Result, failure::Error> { + let mut scopes: Vec = vec![]; + for scope_binding in spec.scopes.iter() { + for sc in scope_binding.iter() { + let param = sc + .parameter_values + .clone() + .map(|values| resolve_variables(values, variables.clone())) + .unwrap_or_else(|| Ok(vec![])) + .unwrap(); + let scope = load_scope( + client.clone(), + namespace.clone(), + instance_name.clone(), + &sc, + param.clone(), + )?; + scopes.insert(scopes.len(), scope); + } + } + Ok(scopes) +} + +// load_scope should load scope from k8s crd +// NOTE: this is a temporary solution just return core scope here +fn load_scope( + client: APIClient, + namespace: String, + instance_name: String, + binding: &ScopeBinding, + param: Vec, +) -> Result { + debug!("Scope binding params: {:?}", &binding.parameter_values); + load_scope_by_type( + client.clone(), + namespace, + instance_name, + binding.scope_type.as_str(), + param, + ) +} + +fn load_scope_by_type( + client: APIClient, + namespace: String, + instance_name: String, + scope_type: &str, + param: Vec, +) -> Result { + match scope_type { + scopes::NETWORK_SCOPE => Ok(OAMScope::Network(Network::from_params( + instance_name.clone(), + namespace.clone(), + client.clone(), + param, + )?)), + scopes::HEALTH_SCOPE => Ok(OAMScope::Health(Health::from_params( + instance_name.clone(), + namespace.clone(), + client.clone(), + param, + )?)), + _ => Err(format_err!( + "unknown scope {} type {}", + instance_name, + scope_type + )), + } +} + +type KubeOpsConfig = Object; + +//get_scope_instance load scope instance by load AppConfig object +fn get_scope_instance( + name: String, + ns: String, + client: APIClient, +) -> Result, failure::Error> { + let mut scopes = vec![]; + let resource = RawApi::customResource(CONFIG_CRD) + .within(ns.as_str()) + .group(CONFIG_GROUP) + .version(CONFIG_VERSION); + //init all the existing objects at initiate, this should be done by informer + let req = resource.get(name.as_str())?; + let cfg = client.request::(req)?; + for scope_binding in cfg.spec.scopes.clone().unwrap_or_else(|| vec![]).iter() { + let param = scope_binding + .parameter_values + .clone() + .map(|values| { + resolve_variables(values, cfg.spec.variables.clone().unwrap_or_else(|| vec![])) + }) + .unwrap_or_else(|| Ok(vec![])) + .unwrap(); + let scope = load_scope( + client.clone(), + ns.clone(), + cfg.metadata.name.clone(), + scope_binding, + param, + )?; + scopes.insert(scopes.len(), scope) + } + Ok(scopes) +} diff --git a/src/instigator_test.rs b/src/instigator_test.rs index 53d5c5d5..d362a55d 100644 --- a/src/instigator_test.rs +++ b/src/instigator_test.rs @@ -29,6 +29,7 @@ fn test_record_ann() { instance_name: "inst123".to_string(), parameter_values: None, traits: None, + application_scopes: None, }, }; let cr2 = ComponentRecord { @@ -38,6 +39,7 @@ fn test_record_ann() { instance_name: "inst321".to_string(), parameter_values: None, traits: None, + application_scopes: None, }, }; one.insert("comp1".to_string(), cr.clone()); @@ -61,6 +63,7 @@ fn test_check_diff() { instance_name: "test_inst".to_string(), parameter_values: None, traits: None, + application_scopes: None, }, }; let old_record = ComponentRecord { @@ -70,6 +73,7 @@ fn test_check_diff() { instance_name: "test_inst".to_string(), parameter_values: None, traits: None, + application_scopes: None, }, }; @@ -83,6 +87,7 @@ fn test_check_diff() { instance_name: "test_inst".to_string(), parameter_values: None, traits: None, + application_scopes: None, }, }; assert_eq!(check_diff(Some(new_record2), &old_record), true); @@ -93,6 +98,7 @@ fn test_check_diff() { instance_name: "test_inst".to_string(), parameter_values: Some(vec![]), traits: None, + application_scopes: None, }, }; assert_eq!(check_diff(Some(new_record3), &old_record), true); diff --git a/src/main.rs b/src/main.rs index 1fb7fd2e..8637d98a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -188,7 +188,7 @@ fn handle_event( // For now, as this to be an innocuous albeit annoying error displayed // in the logs, we just filter "AlreadyExists" errors to reduce confusion. Ok(()) - }, + } _ => Err(format_err!("APIError: {:?}", e)), }, } diff --git a/src/schematic.rs b/src/schematic.rs index a3c5e38a..fad90272 100644 --- a/src/schematic.rs +++ b/src/schematic.rs @@ -5,6 +5,7 @@ pub mod component; pub mod component_instance; pub mod configuration; pub mod parameter; +pub mod scopes; pub mod traits; pub mod variable; diff --git a/src/schematic/component_instance.rs b/src/schematic/component_instance.rs index f060c562..8d1e65eb 100644 --- a/src/schematic/component_instance.rs +++ b/src/schematic/component_instance.rs @@ -5,4 +5,4 @@ pub struct ComponentInstance { } /// Convenience type for Kubernetes wrapped ComponentInstance. -pub type KubeComponentInstance = kube::api::Object; +pub type KubeComponentInstance = kube::api::Object; diff --git a/src/schematic/configuration.rs b/src/schematic/configuration.rs index 1a000625..65ee0af2 100644 --- a/src/schematic/configuration.rs +++ b/src/schematic/configuration.rs @@ -16,6 +16,8 @@ pub struct ComponentConfiguration { pub parameter_values: Option>, /// Traits to attach to the component pub traits: Option>, + /// Application Scopes which the component was involved + pub application_scopes: Option>, } /// ApplicationConfiguration is the top-level configuration object in OAM. @@ -38,5 +40,7 @@ pub struct ScopeBinding { #[serde(rename(serialize = "type", deserialize = "type"))] pub scope_type: String, + //TODO this should use Properties here, but we don't have Properties yet, keep consistent with TraitBinding. + #[serde(rename(serialize = "properties", deserialize = "properties"))] pub parameter_values: Option>, } diff --git a/src/schematic/parameter.rs b/src/schematic/parameter.rs index a6647613..f4dd0a24 100644 --- a/src/schematic/parameter.rs +++ b/src/schematic/parameter.rs @@ -188,3 +188,31 @@ pub struct ParameterValue { pub value: Option, pub from_param: Option, } + +pub fn extract_string_params(name: &str, params: Vec) -> Option { + let value = extract_value_params(name, params.clone()); + if let serde_json::Value::String(vs) = value.unwrap_or(Default::default()) { + return Some(vs); + } + None +} + +pub fn extract_number_params( + name: &str, + params: Vec, +) -> Option { + let value = extract_value_params(name, params.clone()); + if let serde_json::Value::Number(vs) = value.unwrap_or(Default::default()) { + return Some(vs); + } + None +} + +pub fn extract_value_params(name: &str, params: Vec) -> Option { + params.iter().find_map(|param_value| { + if param_value.name.eq(name) { + return param_value.clone().value; + } + None + }) +} diff --git a/src/schematic/parameter_test.rs b/src/schematic/parameter_test.rs index 344dfc81..590ba530 100644 --- a/src/schematic/parameter_test.rs +++ b/src/schematic/parameter_test.rs @@ -178,3 +178,45 @@ fn test_resolve_value() { ); assert_eq!("123".to_string(), got.unwrap()); } + +#[test] +fn test_extract_params() { + let mut params = vec![]; + params.insert( + 0, + ParameterValue { + name: "a".to_string(), + value: Some("1".into()), + from_param: None, + }, + ); + params.insert( + 1, + ParameterValue { + name: "b".to_string(), + value: Some("2".into()), + from_param: None, + }, + ); + params.insert( + 2, + ParameterValue { + name: "c".to_string(), + value: Some("3".into()), + from_param: None, + }, + ); + assert_eq!( + Some("1".to_string()), + extract_string_params("a", params.clone()) + ); + assert_eq!( + Some("2".to_string()), + extract_string_params("b", params.clone()) + ); + assert_eq!( + Some("3".to_string()), + extract_string_params("c", params.clone()) + ); + assert_eq!(None, extract_string_params("d", params.clone())); +} diff --git a/src/schematic/scopes.rs b/src/schematic/scopes.rs new file mode 100644 index 00000000..914a1ca0 --- /dev/null +++ b/src/schematic/scopes.rs @@ -0,0 +1,82 @@ +// Re-exports +pub mod health; +pub use crate::schematic::scopes::health::Health; +pub mod network; +use crate::schematic::configuration::ComponentConfiguration; +pub use crate::schematic::scopes::network::Network; +use failure::Error; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as meta; + +pub const HEALTH_SCOPE: &str = "core.hydra.io/v1alpha1.HealthScope"; +pub const NETWORK_SCOPE: &str = "core.hydra.io/v1alpha1.NetworkScope"; + +/// Scopes describes Hydra application scopes. +/// +/// Application scopes are used to group components together into logical applications +/// by providing different forms of application boundaries with common group behaviors. +/// For example, a health scope will aggregate health states for components and determine whether it's healthy or not. +pub enum OAMScope { + Health(Health), + Network(Network), +} + +fn convert_owner_ref(owner: meta::OwnerReference) -> kube::api::OwnerReference { + kube::api::OwnerReference { + controller: owner.controller.unwrap_or(false), + blockOwnerDeletion: owner.block_owner_deletion.unwrap_or(false), + name: owner.name, + apiVersion: owner.api_version, + kind: owner.kind, + uid: owner.uid, + } +} + +impl OAMScope { + pub fn allow_overlap(&self) -> bool { + match self { + OAMScope::Health(h) => h.allow_overlap(), + OAMScope::Network(n) => n.allow_overlap(), + } + } + pub fn scope_type(&self) -> String { + match self { + OAMScope::Health(h) => h.scope_type(), + OAMScope::Network(n) => n.scope_type(), + } + } + /// create will create a real scope instance + pub fn create(&self, owner: meta::OwnerReference) -> Result<(), Error> { + match self { + OAMScope::Health(h) => h.create(convert_owner_ref(owner.clone())), + OAMScope::Network(n) => n.create(owner.clone()), + } + } + /// modify will modify the scope instance + pub fn modify(&self) -> Result<(), Error> { + match self { + OAMScope::Health(h) => h.modify(), + OAMScope::Network(n) => n.modify(), + } + } + /// delete will delete the scope instance, we can depend on OwnerReference if only k8s objects were created + pub fn delete(&self) -> Result<(), Error> { + match self { + OAMScope::Health(h) => h.delete(), + OAMScope::Network(n) => n.delete(), + } + } + /// add will add a component to this scope + pub fn add(&self, spec: ComponentConfiguration) -> Result<(), Error> { + match self { + OAMScope::Health(h) => h.add(spec), + OAMScope::Network(n) => n.add(spec), + } + } + /// remove will remove component from this scope + pub fn remove(&self, spec: ComponentConfiguration) -> Result<(), Error> { + match self { + OAMScope::Health(h) => h.remove(spec), + OAMScope::Network(n) => n.remove(spec), + } + } +} diff --git a/src/schematic/scopes/health.rs b/src/schematic/scopes/health.rs new file mode 100644 index 00000000..40a65b9c --- /dev/null +++ b/src/schematic/scopes/health.rs @@ -0,0 +1,336 @@ +use crate::schematic::configuration::ComponentConfiguration; +use crate::schematic::parameter::{ + self, extract_number_params, extract_string_params, ParameterValue, +}; +use crate::schematic::scopes::HEALTH_SCOPE; +use failure::Error; +use kube::{api::RawApi, client::APIClient}; +use log::info; + +pub const HEALTH_SCOPE_CRD: &str = "healthscopes"; +pub const HEALTH_SCOPE_GROUP: &str = "core.hydra.io"; +pub const HEALTH_SCOPE_VERSION: &str = "v1alpha1"; +pub const HEALTH_SCOPE_KIND: &str = "HealthScope"; +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct HealthScope { + pub probe_method: String, + pub probe_endpoint: String, + pub probe_timeout: Option, + pub probe_interval: Option, + pub failure_rate_threshold: Option, + pub healthy_rate_threshold: Option, + pub health_threshold_percentage: Option, + pub required_healthy_components: Option>, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ComponentInfo { + pub name: String, + pub instance_name: String, + pub status: Option, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct HealthStatus { + pub components: Option>, + pub last_aggregate_timestamp: Option, +} +impl Default for HealthStatus { + fn default() -> Self { + HealthStatus { + components: None, + last_aggregate_timestamp: None, + } + } +} + +pub type HealthScopeObject = kube::api::Object; + +/// Health scope is defined as https://github.com/microsoft/hydra-spec/blob/master/4.application_scopes.md#health-scope +#[derive(Clone)] +pub struct Health { + client: APIClient, + namespace: String, + pub name: String, + pub allow_component_overlap: bool, + pub probe_method: String, + pub probe_endpoint: String, + pub probe_timeout: Option, + pub probe_interval: Option, + pub failure_rate_threshold: Option, + pub healthy_rate_threshold: Option, + pub health_threshold_percentage: Option, + pub required_healthy_components: Option>, +} + +impl Health { + pub fn from_params( + name: String, + namespace: String, + client: APIClient, + params: Vec, + ) -> Result { + let probe_method = match extract_string_params("probe-method", params.clone()) { + Some(network_id) => network_id, + None => return Err(format_err!("probe-method is not exist")), + }; + let probe_endpoint = match extract_string_params("probe-endpoint", params.clone()) { + Some(network_id) => network_id, + None => return Err(format_err!("probe-endpoint is not exist")), + }; + let probe_timeout = + extract_number_params("probe-timeout", params.clone()).and_then(|v| v.as_i64()); + let probe_interval = + extract_number_params("probe-interval", params.clone()).and_then(|v| v.as_i64()); + let failure_rate_threshold = + extract_number_params("failure-rate-threshold", params.clone()) + .and_then(|v| v.as_f64()); + let healthy_rate_threshold = + extract_number_params("healthy-rate-threshold", params.clone()) + .and_then(|v| v.as_f64()); + let health_threshold_percentage = + extract_number_params("health-threshold-percentage", params.clone()) + .and_then(|v| v.as_f64()); + let required_healthy_components = + parameter::extract_value_params("required-healthy-components", params.clone()) + .and_then(|v| v.as_array().cloned()) + .and_then(|v| { + v.iter() + .map(|x| x.as_str().and_then(|v| Some(v.to_string()))) + .clone() + .collect() + }); + Ok(Health { + name, + namespace, + client, + allow_component_overlap: true, + probe_method, + probe_endpoint, + probe_timeout, + probe_interval, + failure_rate_threshold, + healthy_rate_threshold, + health_threshold_percentage, + required_healthy_components, + }) + } + pub fn allow_overlap(&self) -> bool { + self.allow_component_overlap + } + pub fn scope_type(&self) -> String { + String::from(HEALTH_SCOPE) + } + pub fn create(&self, owner: kube::api::OwnerReference) -> Result<(), Error> { + let pp = kube::api::PostParams::default(); + let mut owners = vec![]; + owners.insert(0, owner); + let scope = HealthScopeObject { + spec: HealthScope { + probe_method: self.probe_method.clone(), + probe_endpoint: self.probe_endpoint.clone(), + probe_timeout: self.probe_timeout.clone(), + probe_interval: self.probe_interval, + failure_rate_threshold: self.failure_rate_threshold, + healthy_rate_threshold: self.healthy_rate_threshold, + health_threshold_percentage: self.health_threshold_percentage, + required_healthy_components: self.required_healthy_components.clone(), + }, + types: kube::api::TypeMeta { + apiVersion: Some(HEALTH_SCOPE_GROUP.to_string() + "/" + HEALTH_SCOPE_VERSION), + kind: Some(HEALTH_SCOPE_KIND.to_string()), + }, + metadata: kube::api::ObjectMeta { + name: self.name.clone(), + ownerReferences: owners, + ..Default::default() + }, + status: None, + }; + let healthscope_resource = RawApi::customResource(HEALTH_SCOPE_CRD) + .version(HEALTH_SCOPE_VERSION) + .group(HEALTH_SCOPE_GROUP) + .within(self.namespace.as_str()); + let req = healthscope_resource.create(&pp, serde_json::to_vec(&scope)?)?; + let err = self + .client + .request::(req) + .err() + .and_then(|e| { + if e.api_error() + .and_then(|api_err| { + if api_err.reason == "AlreadyExists" { + return Some(()); + } + None + }) + .is_some() + { + return None; + } + Some(e) + }); + if err.is_some() { + return Err(err.unwrap().into()); + } + info!("health scope {} created", self.name.clone()); + Ok(()) + } + pub fn modify(&self) -> Result<(), Error> { + Err(format_err!("health scope modify not implemented")) + } + /// let OwnerReference delete + pub fn delete(&self) -> Result<(), Error> { + Ok(()) + } + pub fn add(&self, spec: ComponentConfiguration) -> Result<(), Error> { + let mut obj = self.get_obj()?; + let mut components = self.remove_one(spec.clone(), obj.status.clone()); + components.insert( + components.len(), + ComponentInfo { + name: spec.name.clone(), + instance_name: spec.instance_name.clone(), + status: None, + }, + ); + obj.status = Some(HealthStatus { + components: Some(components), + ..Default::default() + }); + info!( + "add component {} to health scope {}", + spec.name.clone(), + self.name.clone() + ); + self.patch_obj(obj) + } + pub fn remove(&self, spec: ComponentConfiguration) -> Result<(), Error> { + let mut obj = self.get_obj()?; + let components = self.remove_one(spec.clone(), obj.status.clone()); + obj.status = Some(HealthStatus { + components: Some(components), + ..Default::default() + }); + self.patch_obj(obj) + } + + pub fn get_obj(&self) -> Result { + let healthscope_resource = RawApi::customResource(HEALTH_SCOPE_CRD) + .version(HEALTH_SCOPE_VERSION) + .group(HEALTH_SCOPE_GROUP) + .within(self.namespace.as_str()); + let req = healthscope_resource.get(self.name.as_str())?; + Ok(self.client.request::(req)?) + } + fn remove_one( + &self, + spec: ComponentConfiguration, + status: Option, + ) -> Vec { + let mut components = vec![]; + if let Some(status) = status { + for comp in status.components.unwrap_or_else(|| vec![]).iter() { + if comp.name == spec.name && comp.instance_name == spec.instance_name { + continue; + } + components.insert(components.len(), comp.clone()) + } + } + components + } + fn patch_obj(&self, obj: HealthScopeObject) -> Result<(), Error> { + let pp = kube::api::PatchParams::default(); + let healthscope_resource = RawApi::customResource(HEALTH_SCOPE_CRD) + .version(HEALTH_SCOPE_VERSION) + .group(HEALTH_SCOPE_GROUP) + .within(self.namespace.as_str()); + let req = healthscope_resource.patch(self.name.as_str(), &pp, serde_json::to_vec(&obj)?)?; + self.client.request::(req)?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::schematic::parameter::ParameterValue; + use crate::schematic::scopes::{health::Health, HEALTH_SCOPE}; + use kube::client::APIClient; + use kube::config::Configuration; + /// This mock builds a KubeConfig that will not be able to make any requests. + fn mock_kube_config() -> Configuration { + Configuration { + base_path: ".".into(), + client: reqwest::Client::new(), + } + } + #[test] + fn test_create_health() { + let mut params = vec![]; + params.insert( + params.len(), + ParameterValue { + name: "probe-method".to_string(), + value: Some("httpGet".into()), + from_param: None, + }, + ); + params.insert( + params.len(), + ParameterValue { + name: "probe-endpoint".to_string(), + value: Some("/v1/health".into()), + from_param: None, + }, + ); + params.insert( + params.len(), + ParameterValue { + name: "probe-timeout".to_string(), + value: Some(10.into()), + from_param: None, + }, + ); + params.insert( + params.len(), + ParameterValue { + name: "failure-rate-threshold".to_string(), + value: Some(80.into()), + from_param: None, + }, + ); + let mut comps = vec![]; + comps.insert(0, serde_json::Value::from("comp1")); + comps.insert(1, serde_json::Value::from("comp2")); + params.insert( + params.len(), + ParameterValue { + name: "required-healthy-components".to_string(), + value: Some(serde_json::Value::Array(comps)), + from_param: None, + }, + ); + + let net = Health::from_params( + "test-health".to_string(), + "namespace".to_string(), + APIClient::new(mock_kube_config()), + params, + ) + .unwrap(); + assert_eq!(true, net.allow_overlap()); + assert_eq!(HEALTH_SCOPE.to_string(), net.scope_type()); + assert_eq!("test-health".to_string(), net.name); + assert_eq!("httpGet".to_string(), net.probe_method); + assert_eq!("/v1/health".to_string(), net.probe_endpoint); + assert_eq!(Some(10), net.probe_timeout); + assert_eq!(Some(80.0), net.failure_rate_threshold); + let mut comps = vec![]; + comps.insert(0, "comp1".to_string()); + comps.insert(1, "comp2".to_string()); + assert_eq!(Some(comps), net.required_healthy_components); + } +} diff --git a/src/schematic/scopes/network.rs b/src/schematic/scopes/network.rs new file mode 100644 index 00000000..d2ba823d --- /dev/null +++ b/src/schematic/scopes/network.rs @@ -0,0 +1,117 @@ +use crate::schematic::configuration::ComponentConfiguration; +/// Network scope is defined as https://github.com/microsoft/hydra-spec/blob/master/4.application_scopes.md#network-scope +/// Now we don't really implement network scope, this is just a framework as the spec describe. +use crate::schematic::parameter::{extract_string_params, ParameterValue}; +use crate::schematic::scopes::NETWORK_SCOPE; +use failure::{format_err, Error}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as meta; +use kube::client::APIClient; + +#[derive(Clone)] +pub struct Network { + client: APIClient, + namespace: String, + pub name: String, + pub allow_component_overlap: bool, + pub network_id: String, + pub subnet_id: String, + pub internet_gateway_type: Option, +} + +impl Network { + pub fn from_params( + name: String, + namespace: String, + client: APIClient, + params: Vec, + ) -> Result { + let network_id = match extract_string_params("network-id", params.clone()) { + Some(network_id) => network_id, + None => return Err(format_err!("network-id is not exist")), + }; + let subnet_id = match extract_string_params("subnet-id", params.clone()) { + Some(network_id) => network_id, + None => return Err(format_err!("subnet-id is not exist")), + }; + Ok(Network { + network_id, + subnet_id, + name, + namespace, + client, + internet_gateway_type: extract_string_params("internet-gateway-type", params.clone()), + allow_component_overlap: false, + }) + } + pub fn allow_overlap(&self) -> bool { + self.allow_component_overlap + } + pub fn scope_type(&self) -> String { + String::from(NETWORK_SCOPE) + } + pub fn create(&self, _owner: meta::OwnerReference) -> Result<(), Error> { + Err(format_err!("network scope create not implemented")) + } + pub fn modify(&self) -> Result<(), Error> { + Err(format_err!("network scope modify not implemented")) + } + /// could let OwnerReference delete + pub fn delete(&self) -> Result<(), Error> { + Err(format_err!("network scope delete not implemented")) + } + pub fn add(&self, _spec: ComponentConfiguration) -> Result<(), Error> { + Err(format_err!("network scope add component not implemented")) + } + pub fn remove(&self, _spec: ComponentConfiguration) -> Result<(), Error> { + Err(format_err!( + "network scope remove component not implemented" + )) + } +} + +#[cfg(test)] +mod test { + use crate::schematic::parameter::ParameterValue; + use crate::schematic::scopes::{Network, NETWORK_SCOPE}; + use kube::client::APIClient; + use kube::config::Configuration; + /// This mock builds a KubeConfig that will not be able to make any requests. + fn mock_kube_config() -> Configuration { + Configuration { + base_path: ".".into(), + client: reqwest::Client::new(), + } + } + + #[test] + fn test_create_network() { + let mut params = vec![]; + params.insert( + 0, + ParameterValue { + name: "network-id".to_string(), + value: Some("nid".into()), + from_param: None, + }, + ); + params.insert( + 1, + ParameterValue { + name: "subnet-id".to_string(), + value: Some("sid".into()), + from_param: None, + }, + ); + let net = Network::from_params( + "test-net".to_string(), + "namespace".to_string(), + APIClient::new(mock_kube_config()), + params, + ) + .unwrap(); + assert_eq!(false, net.allow_overlap()); + assert_eq!(NETWORK_SCOPE.to_string(), net.scope_type()); + assert_eq!("nid".to_string(), net.network_id); + assert_eq!("sid".to_string(), net.subnet_id); + } +} diff --git a/src/schematic/traits/ingress.rs b/src/schematic/traits/ingress.rs index 4cde2caa..e374d11f 100644 --- a/src/schematic/traits/ingress.rs +++ b/src/schematic/traits/ingress.rs @@ -129,8 +129,8 @@ impl TraitImplementation for Ingress { if let Some(status) = ingress.status { if let Some(_lbstatus) = status.load_balancer { - //we can just put Created to status, or combine Hostname and IP to status. - resource.insert(key.clone(), "Created".to_string()); + //we can just put created to status, or combine Hostname and IP to status. + resource.insert(key.clone(), "created".to_string()); return Some(resource); } }