Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions schema/notifications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,8 @@ components:
serviceRef:
type: object
properties:
id:
type: integer
connectUrl:
type: object
description: Payload of type URI
Expand All @@ -1045,6 +1047,8 @@ components:
cryostat:
type: object
additionalProperties: true
agent:
type: boolean
jvmId:
type: string
required:
Expand Down
201 changes: 174 additions & 27 deletions src/main/java/io/cryostat/discovery/KubeEndpointSlicesDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityNotFoundException;
import jakarta.persistence.LockModeType;
import jakarta.transaction.Transactional;
import jakarta.transaction.Transactional.TxType;
Expand Down Expand Up @@ -352,14 +353,21 @@ public void handleQueryEvent(NamespaceQueryEvent evt) {

Set<Target> observedTargets = observedTargetsWithHierarchy.keySet();

Target.compare(persistedTargets)
.to(observedTargets)
.removed()
.forEach(
(t) ->
notify(
EndpointDiscoveryEvent.from(
namespace, t, null, EventKind.LOST)));
var removedTargets = Target.compare(persistedTargets).to(observedTargets).removed();

logger.debugv(
"Namespace {0}: Found {1} persisted targets, {2} observed targets, {3}"
+ " removed targets",
namespace,
persistedTargets.size(),
observedTargets.size(),
removedTargets.size());

removedTargets.forEach(
(t) -> {
logger.debugv("Publishing LOST event for target: {0}", t.connectUrl);
notify(EndpointDiscoveryEvent.from(namespace, t, null, EventKind.LOST));
});

Target.compare(persistedTargets)
.to(observedTargets)
Expand Down Expand Up @@ -708,31 +716,93 @@ DiscoveryNode buildOwnershipHierarchy(HasMetadata kubeObj, DiscoveryNode node) {
}

private void pruneOwnerChain(DiscoveryNode nsNode, Target target) {
target = Target.getTargetByConnectUrl(target.connectUrl);
logger.debugv("Pruning owner chain for target: {0}", target.connectUrl);
try {
Target managedTarget = Target.getTargetByConnectUrl(target.connectUrl);

DiscoveryNode child = target.discoveryNode;
while (true) {
DiscoveryNode parent = child.parent;
if (managedTarget == null || managedTarget.id == null) {
logger.debugv("Target already deleted: {0}", target.connectUrl);
return;
}

if (parent == null) {
break;
logger.debugv(
"Found managed target with id {0}, starting pruning from discoveryNode {1}",
managedTarget.id,
managedTarget.discoveryNode != null ? managedTarget.discoveryNode.id : "null");

DiscoveryNode endpointNode = managedTarget.discoveryNode;
if (endpointNode == null || endpointNode.id == null) {
logger.debugv("No discoveryNode for target: {0}", target.connectUrl);
managedTarget.delete();
return;
}

parent.children.remove(child);
child.parent = null;
parent.persist();
endpointNode = entityManager.find(DiscoveryNode.class, endpointNode.id);
logger.debugv(
"Loaded discoveryNode {0} with parent {1}",
endpointNode.id, endpointNode.parent != null ? endpointNode.parent.id : "null");

if (parent.hasChildren()
|| parent.nodeType.equals(KubeDiscoveryNodeType.NAMESPACE.getKind())) {
break;
// Get the parent BEFORE deleting the target, because deletion will cascade
DiscoveryNode child = endpointNode.parent;

// Delete the target first - this will cascade delete its discoveryNode (the
// Endpoint/EndpointSlice)
managedTarget.delete();
entityManager.flush();

// If there's no parent, we're done
if (child == null) {
logger.debugv("No parent node to prune");
return;
}

child = parent;
}
// Walk up the hierarchy, removing childless nodes from their parents
// Rely on orphan removal to delete them, don't call delete() explicitly
while (child != null) {
DiscoveryNode parent = child.parent;

entityManager.flush();
nsNode.persist();
target.delete();
if (parent == null) {
logger.debugv(
"Reached orphaned node {0} (id={1}), relying on orphan removal",
child.name, child.id);
break;
}

entityManager.refresh(parent); // Reload from DB with current children

// Remove child from parent's collection - orphan removal will delete it
parent.children.remove(child);
child.parent = null;

boolean hasChildren = parent.hasChildren();
Comment thread
andrewazores marked this conversation as resolved.
boolean isNamespace =
parent.nodeType.equals(KubeDiscoveryNodeType.NAMESPACE.getKind());
logger.debugv(
"Parent node {0} (id={1}, type={2}): hasChildren={3}, isNamespace={4}",
parent.name, parent.id, parent.nodeType, hasChildren, isNamespace);

if (hasChildren || isNamespace) {
logger.debugv("Stopping pruning at parent node {0}", parent.name);
break;
}

logger.debugv("Continuing to prune parent node {0}", parent.name);
// Remove child from parent's collection - orphan removal will delete it
parent.children.remove(child);
child.parent = null;

// Move up to check the parent too
child = parent;
}

entityManager.flush();
nsNode.persist();

} catch (EntityNotFoundException e) {
logger.debugv("Target was deleted during pruning operation: {0}", target.connectUrl, e);
entityManager.flush();
nsNode.persist();
}
}

/**
Expand Down Expand Up @@ -778,14 +848,70 @@ private DiscoveryNode buildOwnershipHierarchy(
podNode.children.add(targetNode);
targetNode.parent = podNode;

DiscoveryNode rootNode = buildOwnershipHierarchy(pod.getLeft(), podNode);
DiscoveryNode rootNode = buildOwnershipHierarchyWithDbLookup(pod.getLeft(), podNode);

nsNode.children.add(rootNode);
rootNode.parent = nsNode;

return rootNode;
}

/**
* Builds the ownership hierarchy using database lookups to reuse existing nodes.
*
* @param kubeObj The Kubernetes object to start from
* @param node The DiscoveryNode representing the Kubernetes object
* @return The root node of the ownership chain (the topmost owner)
*/
private DiscoveryNode buildOwnershipHierarchyWithDbLookup(
HasMetadata kubeObj, DiscoveryNode node) {
Pair<HasMetadata, DiscoveryNode> current = Pair.of(kubeObj, node);

while (true) {
Pair<HasMetadata, DiscoveryNode> owner = getOwnerNodeWithDbLookup(current);
if (owner == null) {
break;
}

DiscoveryNode ownerNode = owner.getRight();
DiscoveryNode childNode = current.getRight();

if (!ownerNode.children.contains(childNode)) {
ownerNode.children.add(childNode);
}
childNode.parent = ownerNode;

current = owner;
}

return current.getRight();
}

/**
* Gets the owner node using database lookup to reuse existing nodes.
*
* @param child Pair of Kubernetes object and its DiscoveryNode
* @return Pair of owner Kubernetes object and its DiscoveryNode from DB, or null if no owner
*/
private Pair<HasMetadata, DiscoveryNode> getOwnerNodeWithDbLookup(
Pair<HasMetadata, DiscoveryNode> child) {
HasMetadata childRef = child.getLeft();
if (childRef == null) {
return null;
}
List<OwnerReference> owners = childRef.getMetadata().getOwnerReferences();
if (owners.isEmpty()) {
return null;
}
String namespace = childRef.getMetadata().getNamespace();
OwnerReference owner =
owners.stream()
.filter(o -> KubeDiscoveryNodeType.fromKubernetesKind(o.getKind()) != null)
.findFirst()
.orElse(owners.get(0));
return queryForNode(namespace, owner.getName(), owner.getKind());
}

private void persistNodeChain(DiscoveryNode nsNode, Target target, DiscoveryNode targetNode) {
List<DiscoveryNode> nodeChain = collectNodeChain(targetNode, nsNode);

Expand Down Expand Up @@ -923,7 +1049,9 @@ private Pair<HasMetadata, DiscoveryNode> queryForNode(
DiscoveryNode.byTypeWithName(
nodeType,
name,
n -> namespace.equals(n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY)),
n ->
namespace.equals(n.labels.get(DISCOVERY_NAMESPACE_LABEL_KEY))
&& isInKubernetesApiRealm(n),
n -> {
Map<String, String> labels = new HashMap<>();
if (kubeObj != null && kubeObj.getMetadata().getLabels() != null) {
Expand All @@ -936,6 +1064,25 @@ private Pair<HasMetadata, DiscoveryNode> queryForNode(
return Pair.of(kubeObj, node);
}

/**
* Check if a node belongs to the KubernetesApi Realm by walking up its parent chain.
*
* @param node The node to check
* @return true if the node is in the KubernetesApi Realm, false otherwise
*/
private boolean isInKubernetesApiRealm(DiscoveryNode node) {
DiscoveryNode current = node;
while (current != null) {
if (current.nodeType != null
&& current.nodeType.equals(NodeType.BaseNodeType.REALM.getKind())
&& REALM.equals(current.name)) {
return true;
}
current = current.parent;
}
return false;
}

@DisallowConcurrentExecution
static class EndpointsResyncJob implements Job {
@Inject Logger logger;
Expand Down
62 changes: 50 additions & 12 deletions src/main/java/io/cryostat/expressions/MatchExpressionEvaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,34 +177,66 @@ public boolean applies(MatchExpression matchExpression, Target target) throws Sc
public List<Target> getMatchedTargets(MatchExpression matchExpression) {
List<Target> targets =
QuarkusTransaction.joiningExisting()
.call(() -> Target.<Target>listAll())
.parallelStream()
.call(
() -> {
List<Target> allTargets = Target.<Target>listAll();
// Force eager loading of lazy associations before detaching
allTargets.forEach(this::eagerLoadAssociations);
return allTargets;
});
List<Target> matched =
targets.parallelStream()
.filter(
target -> {
try {
return applies(matchExpression, target);
} catch (ScriptException e) {
logger.error(
"Error while processing expression: "
+ matchExpression,
e);
logger.warnv(
e,
"Script error evaluating expression for target {0}"
+ " ({1}): {2}",
target.id,
target.connectUrl,
matchExpression);
return false;
} catch (Exception e) {
logger.warnv(
e,
"Unexpected error evaluating expression for target"
+ " {0} ({1})",
target.id,
target.connectUrl);
return false;
}
})
.collect(Collectors.toList());

var ids = new HashSet<>();
var it = targets.iterator();
// Deduplicate by jvmId
var ids = new HashSet<String>();
var it = matched.iterator();
while (it.hasNext()) {
var t = it.next();
if (ids.contains(t.jvmId)) {
if (t.jvmId != null && ids.contains(t.jvmId)) {
it.remove();
continue;
}
ids.add(t.jvmId);
if (t.jvmId != null) {
ids.add(t.jvmId);
}
}

return targets;
return matched;
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
private void eagerLoadAssociations(Target t) {
if (t.annotations != null) {
t.annotations.cryostat();
t.annotations.platform();
}
if (t.labels != null) {
t.labels.size();
}
}

@Name("io.cryostat.rules.MatchExpressionEvaluator.MatchExpressionApplies")
Expand Down Expand Up @@ -315,7 +347,13 @@ private String[] getJfrEventTypeIds(SimplifiedTarget st) {
.toList()
.toArray(new String[0]));
} catch (Exception e) {
Log.error(e);
// Log with more context about the failure to help diagnose issues
Log.warnv(
e,
"Failed to get JFR event types for target {0} ({1}). Match expression"
+ " evaluation may be incomplete.",
st.jvmId,
st.connectUrl);
return new String[0];
}
}
Expand Down
Loading
Loading