Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.invoke.WrongMethodTypeException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -55,13 +51,6 @@ public class ApimlInstanceRegistry extends InstanceRegistry {

private static final String EXCEPTION_MESSAGE = "Implementation of InstanceRegistry changed, please verify fix of order sending events";

private MethodHandle handleRegistrationMethod;
private MethodHandle handlerResolveInstanceLeaseDurationMethod;
private MethodHandle handleCancellationMethod;

private MethodHandle register2ArgsMethodHandle;
private MethodHandle register3ArgsMethodHandle;
private MethodHandle cancelMethodHandle;
private MethodHandle replicateToPeersMethodHandle;

private final ApplicationContext appCntx;
Expand All @@ -70,6 +59,8 @@ public class ApimlInstanceRegistry extends InstanceRegistry {
private ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry;
private Set<String> staticRegistrationIds = Collections.synchronizedSet(new HashSet<>());

private final ThreadLocal<Integer> RENEW_CORRECTION = new ThreadLocal<>();

public ApimlInstanceRegistry(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
Expand All @@ -96,79 +87,26 @@ public ApimlInstanceRegistry(
*/
private void init() {
try {
Method registrationMethod =
InstanceRegistry.class.getDeclaredMethod("handleRegistration",
InstanceInfo.class, int.class, boolean.class
);
registrationMethod.setAccessible(true);
handleRegistrationMethod = MethodHandles.lookup().unreflect(registrationMethod);

Method cancelationMethod =
InstanceRegistry.class.getDeclaredMethod("handleCancelation",
String.class, String.class, boolean.class
);
cancelationMethod.setAccessible(true);
handleCancellationMethod = MethodHandles.lookup().unreflect(cancelationMethod);

Method resolveInstanceLeaseDurationMethod =
InstanceRegistry.class.getDeclaredMethod("resolveInstanceLeaseDuration",
InstanceInfo.class
);
resolveInstanceLeaseDurationMethod.setAccessible(true);
handlerResolveInstanceLeaseDurationMethod = MethodHandles.lookup().unreflect(resolveInstanceLeaseDurationMethod);

Constructor<MethodHandles.Lookup> lookupConstructor = MethodHandles.Lookup.class.getDeclaredConstructor(Class.class);
lookupConstructor.setAccessible(true);
MethodHandles.Lookup lookup = lookupConstructor.newInstance(PeerAwareInstanceRegistryImpl.class);

register2ArgsMethodHandle =
lookup.findSpecial(
PeerAwareInstanceRegistryImpl.class,
"register",
MethodType.methodType(void.class, InstanceInfo.class, boolean.class),
PeerAwareInstanceRegistryImpl.class
);

cancelMethodHandle =
lookup.findSpecial(
PeerAwareInstanceRegistryImpl.class,
"cancel",
MethodType.methodType(boolean.class, String.class, String.class, boolean.class),
PeerAwareInstanceRegistryImpl.class
);

lookup = lookupConstructor.newInstance(AbstractInstanceRegistry.class);

register3ArgsMethodHandle =
lookup.findSpecial(
AbstractInstanceRegistry.class,
"register",
MethodType.methodType(void.class, InstanceInfo.class, int.class, boolean.class),
AbstractInstanceRegistry.class
);
Field registryField = AbstractInstanceRegistry.class.getDeclaredField("registry");
registryField.setAccessible(true);
this.registry = (ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>) registryField.get(this);

Method replicateToPeers = PeerAwareInstanceRegistryImpl.class.getDeclaredMethod("replicateToPeers", Action.class, String.class, String.class, InstanceInfo.class, InstanceStatus.class, boolean.class);
replicateToPeers.setAccessible(true);

replicateToPeersMethodHandle = MethodHandles.lookup().unreflect(replicateToPeers);
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
} catch (NoSuchFieldException | NoSuchMethodException | IllegalAccessException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
}
}

protected int resolveInstanceLeaseDurationRewritten(final InstanceInfo info) {
try {
return (int) handlerResolveInstanceLeaseDurationMethod.invokeWithArguments(this, info);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
protected void updateRenewsPerMinThreshold() {
Integer correction = RENEW_CORRECTION.get();
if (correction != null) {
this.numberOfRenewsPerMinThreshold += correction;
RENEW_CORRECTION.remove();
}

super.updateRenewsPerMinThreshold();
}

public void peerAwareHeartbeat(InstanceInfo instanceInfo) {
Expand All @@ -191,19 +129,17 @@ public void registerStatically(InstanceInfo instanceInfo, boolean isReplication,
// the maximum lease duration time (Eureka bug: overflow of int during conversion to ms)
int leaseDuration = Integer.MAX_VALUE / 1000;

// temporary register (do not increase count of service to avoid threshold)
synchronized (lock) {
int backup = expectedNumberOfClientsSendingRenews;
try {
register(instanceInfo, leaseDuration, isReplication);
if (peerReplicate) {
replicateToPeersMethodHandle.invokeWithArguments(this, Action.Register, instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null, isReplication);
}
} catch (Throwable e) {
throw new IllegalStateException(EXCEPTION_MESSAGE, e);
} finally {
expectedNumberOfClientsSendingRenews = backup;
try {
// temporary register (do not increase count of service to avoid threshold)
RENEW_CORRECTION.set(-1);
register(instanceInfo, leaseDuration, isReplication);
if (peerReplicate) {
replicateToPeersMethodHandle.invokeWithArguments(this, Action.Register, instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null, isReplication);
}
} catch (Throwable e) {
throw new IllegalStateException(EXCEPTION_MESSAGE, e);
} finally {
RENEW_CORRECTION.remove();
}

// register lease plan to never expired
Expand All @@ -229,34 +165,18 @@ public boolean isExpired(long additionalLeaseMs) {
*/
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
validateInstanceInfo(info);
info = changeServiceId(info);
try {
register3ArgsMethodHandle.invokeWithArguments(this, info, leaseDuration, isReplication);
handleRegistrationMethod.invokeWithArguments(this, info, leaseDuration, isReplication);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
}
validateInstanceInfo(info);
info = changeServiceId(info);

super.register(info, leaseDuration, isReplication);
}

@Override
public void register(InstanceInfo info, final boolean isReplication) {
validateInstanceInfo(info);
info = changeServiceId(info);
try {
register2ArgsMethodHandle.invokeWithArguments(this, info, isReplication);
handleRegistrationMethod.invokeWithArguments(this, info, resolveInstanceLeaseDurationRewritten(info), isReplication);
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
}
validateInstanceInfo(info);
info = changeServiceId(info);

super.register(info, isReplication);
}

/**
Expand Down Expand Up @@ -309,25 +229,16 @@ public boolean isRegisterable(InstanceInfo instanceInfo) {

@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
synchronized (lock) {
int backup = expectedNumberOfClientsSendingRenews;
try {
String[] updatedValues = replaceValues(appName, serverId);
final boolean out = (boolean) cancelMethodHandle.invokeWithArguments(this, updatedValues[0], updatedValues[1], isReplication);
handleCancellationMethod.invokeWithArguments(this, updatedValues[0], updatedValues[1], isReplication);
return out;
} catch (ClassCastException | WrongMethodTypeException e) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, e);
} catch (RuntimeException re) {
throw re;
} catch (Throwable t) {
throw new IllegalArgumentException(EXCEPTION_MESSAGE, t);
} finally {
if (staticRegistrationIds.removeAll(Optional.ofNullable(registry.get(appName)).orElse(Collections.emptyMap()).keySet())) {
// do not change count of instances if it was registered statically
expectedNumberOfClientsSendingRenews = backup;
}
try {
String[] updatedValues = replaceValues(appName, serverId);

if (staticRegistrationIds.removeAll(Optional.ofNullable(registry.get(appName)).orElse(Collections.emptyMap()).keySet())) {
// do not change count of instances if it was registered statically
RENEW_CORRECTION.set(1);
}
return super.cancel(updatedValues[0], updatedValues[1], isReplication);
} finally {
RENEW_CORRECTION.remove();
}
}

Expand Down
Loading