diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceProgramRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceProgramRunner.java index d1a39f718efc..4cce85b42512 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceProgramRunner.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceProgramRunner.java @@ -231,7 +231,7 @@ public ProgramController run(final Program program, ProgramOptions options) { // tries to access cdap data. For example, writing to a FileSet will fail, as the yarn user will // be running the job, but the data directory will be owned by cdap. if (MapReduceTaskContextProvider.isLocal(hConf) || UserGroupInformation.isSecurityEnabled()) { - mapReduceRuntimeService.start(); + mapReduceRuntimeService.startAsync().awaitRunning(); } else { ProgramRunners.startAsUser(cConf.get(Constants.CFG_HDFS_USER), mapReduceRuntimeService); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceRuntimeService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceRuntimeService.java index c858bbf08fb4..fe7dd21c4a1d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceRuntimeService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/batch/MapReduceRuntimeService.java @@ -190,7 +190,7 @@ public void destroy() { } @Override - protected String getServiceName() { + protected String serviceName() { return "MapReduceRunner-" + specification.getName(); } @@ -412,6 +412,7 @@ protected void run() throws Exception { // If we don't sleep, the final stats may not get sent before shutdown. TimeUnit.SECONDS.sleep(2L); + // If the job is not successful, throw exception so that this service will terminate with a failure state // Shutdown will still get executed, but the service will notify failure after that. // However, if it's the job is requested to stop (via triggerShutdown, meaning it's a user action), don't throw @@ -454,7 +455,7 @@ public void run() { } }); t.setDaemon(true); - t.setName(getServiceName()); + t.setName(serviceName()); t.start(); } }; @@ -1012,7 +1013,7 @@ private Location createPluginArchive(Location targetDir) throws IOException { */ private Location copyFileToLocation(File file, Location targetDir) throws IOException { Location targetLocation = targetDir.append(file.getName()).getTempFile(".jar"); - Files.copy(file, Locations.newOutputSupplier(targetLocation)); + Files.asByteSource(file).copyTo(Locations.newByteSink(targetLocation)); return targetLocation; } @@ -1024,8 +1025,7 @@ private Location copyFileToLocation(File file, Location targetDir) throws IOExce private Location copyProgramJar(Location targetDir) throws IOException { Location programJarCopy = targetDir.append("program.jar"); - ByteStreams.copy(Locations.newInputSupplier(programJarLocation), - Locations.newOutputSupplier(programJarCopy)); + Locations.newByteSource(programJarLocation).copyTo(Locations.newByteSink(programJarCopy)); LOG.debug("Copied Program Jar to {}, source: {}", programJarCopy, programJarLocation); return programJarCopy; } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java b/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java index 5e345694fd8b..c34f83561b81 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Throwables; import io.cdap.cdap.api.common.HttpErrorStatusProvider; @@ -76,7 +77,7 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) { // If it is not some known exception type, response with 500. LOG.error("Unexpected error: request={} {} user={}:", request.method().name(), request.getUri(), - Objects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); + MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()); } @@ -84,6 +85,6 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) { private void logWithTrace(HttpRequest request, Throwable t) { LOG.trace("Error in handling request={} {} for user={}:", request.method().name(), request.getUri(), - Objects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); + MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java b/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java index a90f9d361a51..674ed49664d8 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java @@ -90,20 +90,20 @@ private static final class ZkClientServiceProvider implements Provider start() { + public Service startAsync() { if (startedCount.getAndIncrement() == 0) { - return super.start(); + return zkClientService.startAsync(); } - return Futures.immediateFuture(State.RUNNING); + return this; } @Override - public ListenableFuture stop() { + public Service stopAsync() { if (startedCount.decrementAndGet() == 0) { - return super.stop(); + return zkClientService.stopAsync(); } - return Futures.immediateFuture(State.TERMINATED); + return this; + } + + @Override + public void awaitRunning() { + zkClientService.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws java.util.concurrent.TimeoutException { + zkClientService.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + zkClientService.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws java.util.concurrent.TimeoutException { + zkClientService.awaitTerminated(timeout, unit); + } + + @Override + public Throwable failureCause() { + return zkClientService.failureCause(); } }; } @@ -166,12 +193,12 @@ private abstract static class AbstractServiceWithZkClient ext @Override protected final void startUp() throws Exception { - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); try { - delegate.startAndWait(); + delegate.startAsync().awaitRunning(); } catch (Exception e) { try { - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } catch (Exception se) { e.addSuppressed(se); } @@ -182,16 +209,16 @@ protected final void startUp() throws Exception { @Override protected final void shutDown() throws Exception { try { - delegate.stopAndWait(); + delegate.stopAsync().awaitTerminated(); } catch (Exception e) { try { - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } catch (Exception se) { e.addSuppressed(se); } throw e; } - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } protected T getDelegate() { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java index 9543e33fa80c..6e5fef818456 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java @@ -74,7 +74,11 @@ public final void handleError(Throwable cause) { try { LOG.error("Failed to handle upload", cause); if (output != null) { - Closeables.closeQuietly(output); + try { + output.close(); + } catch (IOException e) { + LOG.warn("Failed to close output stream for file {}", file, e); + } } onError(cause); // The netty-http framework will response with 500, no need to response in here. diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java index 10ee554be82f..fa358a05f4f6 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java @@ -88,12 +88,18 @@ public void chunk(ByteBuf request, HttpResponder responder) { @Override public void finished(HttpResponder responder) { - Closeables.closeQuietly(outputStream); + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Failed to close output stream", e); + } + } try (InputStream is = new CombineInputStream(buffer, outputStream == null ? null : spillPath)) { processInput(is, responder); } catch (Exception e) { - Throwables.propagateIfPossible(e); + Throwables.throwIfUnchecked(e); throw new RuntimeException(String.format("Failed to process input from buffer%s", outputStream == null ? "" : " and spill path " + spillPath), e); } finally { @@ -103,7 +109,13 @@ public void finished(HttpResponder responder) { @Override public void handleError(Throwable cause) { - Closeables.closeQuietly(outputStream); + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Failed to close output stream", e); + } + } cleanup(); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java b/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java index 2f7d0e9c1fd9..d9152169006a 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java @@ -328,7 +328,7 @@ public String getSimpleName() { String innerClassName = className.substring(lastDollarSign + 1); // local and anonymous classes are prefixed with number (1,2,3...), anonymous classes are // entirely numeric whereas local classes have the user supplied name as a suffix - return CharMatcher.DIGIT.trimLeadingFrom(innerClassName); + return CharMatcher.inRange('0', '9').trimLeadingFrom(innerClassName); } String packageName = getPackageName(); if (packageName.isEmpty()) { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java index 16ca5b46db1c..4ff7ac9d4894 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java @@ -65,11 +65,25 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public void close() throws IOException { + Throwable error = null; try { super.close(); + } catch (Throwable t) { + error = t; + throw t; } finally { if (sizeProvider instanceof Closeable) { - Closeables.closeQuietly((Closeable) sizeProvider); + try { + ((Closeable) sizeProvider).close(); + } catch (IOException e) { + if (error != null) { + error.addSuppressed(e); + } else { + // If super.close() succeeded but the provider failed, + // you should probably still know about it. + throw e; + } + } } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java index 15e0c8988c43..bd4429581a59 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java @@ -25,6 +25,7 @@ import io.cdap.cdap.common.utils.DirUtils; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -123,7 +124,9 @@ void clearCache(String fileName, long lastModified) { } String getCacheName(Location location) { - return Hashing.md5().hashString(location.toURI().getPath()).toString() + "-" + return Hashing.md5() + .hashString(location.toURI().getPath(), StandardCharsets.UTF_8) + .toString() + "-" + location.getName(); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java index 4eb04253b402..8d3c694bacfa 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java @@ -19,10 +19,8 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.base.Throwables; -import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; +import com.google.common.io.ByteSink; +import com.google.common.io.ByteSource; import io.cdap.cdap.common.lang.FunctionWithException; import io.cdap.cdap.common.lang.jar.BundleJarUtil; import io.cdap.cdap.common.utils.DirUtils; @@ -33,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.lang.reflect.Method; import java.net.URI; import java.net.URISyntaxException; @@ -97,26 +96,33 @@ public int compare(Location o1, Location o2) { }; /** - * Creates a new {@link InputSupplier} that can provides {@link SeekableInputStream} of the given - * path. + * Creates a new {@link ByteSource} that provides {@link SeekableInputStream} of the given path. * * @param fs The {@link org.apache.hadoop.fs.FileSystem} for the given path. - * @param path The path to create {@link io.cdap.cdap.common.io.SeekableInputStream} when - * requested. - * @return A {@link InputSupplier}. + * @param path The path to create {@link SeekableInputStream} when requested. + * @return A {@link ByteSource}. */ - public static InputSupplier newInputSupplier(final FileSystem fs, - final Path path) { - return new InputSupplier() { + public static ByteSource newByteSource(final FileSystem fs, final Path path) { + return new ByteSource() { @Override - public SeekableInputStream getInput() throws IOException { - FSDataInputStream input = fs.open(path); + public SeekableInputStream openStream() throws IOException { + FSDataInputStream input = null; try { + input = fs.open(path); return new DFSSeekableInputStream(input, createDFSStreamSizeProvider(fs, false, path, input)); } catch (Throwable t) { - Closeables.closeQuietly(input); - Throwables.propagateIfInstanceOf(t, IOException.class); + if (input != null) { + try { + input.close(); + } catch (IOException e) { + t.addSuppressed(e); + } + } + Throwables.throwIfUnchecked(t); + if (t instanceof IOException) { + throw (IOException) t; + } throw new IOException(t); } } @@ -124,19 +130,18 @@ public SeekableInputStream getInput() throws IOException { } /** - * Creates a new {@link InputSupplier} that can provides {@link SeekableInputStream} from the - * given location. + * Creates a new {@link ByteSource} that provides {@link SeekableInputStream} from the given location. * * @param location Location for the input stream. - * @return A {@link InputSupplier}. + * @return A {@link ByteSource}. */ - public static InputSupplier newInputSupplier( - final Location location) { - return new InputSupplier() { + public static ByteSource newByteSource(final Location location) { + return new ByteSource() { @Override - public SeekableInputStream getInput() throws IOException { - InputStream input = location.getInputStream(); + public SeekableInputStream openStream() throws IOException { + InputStream input = null; try { + input = location.getInputStream(); if (input instanceof FileInputStream) { return new FileSeekableInputStream((FileInputStream) input); } @@ -147,30 +152,31 @@ public SeekableInputStream getInput() throws IOException { if (locationFactory instanceof FileContextLocationFactory) { final FileContextLocationFactory lf = (FileContextLocationFactory) locationFactory; return lf.getFileContext().getUgi() - .doAs(new PrivilegedExceptionAction() { - @Override - public SeekableInputStream run() throws IOException { - // Disable the FileSystem cache. The FileSystem will be closed when the InputStream is closed - String scheme = lf.getHomeLocation().toURI().getScheme(); - Configuration hConf = new Configuration(lf.getConfiguration()); - hConf.set(String.format("fs.%s.impl.disable.cache", scheme), "true"); - FileSystem fs = FileSystem.get(hConf); - return new DFSSeekableInputStream(dataInput, - createDFSStreamSizeProvider(fs, true, - new Path(location.toURI()), dataInput)); - } + .doAs((PrivilegedExceptionAction) () -> { + String scheme = lf.getHomeLocation().toURI().getScheme(); + Configuration hConf = new Configuration(lf.getConfiguration()); + hConf.set(String.format("fs.%s.impl.disable.cache", scheme), "true"); + FileSystem fs = FileSystem.get(hConf); + return new DFSSeekableInputStream(dataInput, + createDFSStreamSizeProvider(fs, true, + new Path(location.toURI()), dataInput)); }); } - - // This shouldn't happen - // Assumption is if the FS is not a HDFS fs, the location length tells the stream size return new DFSSeekableInputStream(dataInput, location::length); } - throw new IOException("Failed to create SeekableInputStream from location " + location); } catch (Throwable t) { - Closeables.closeQuietly(input); - Throwables.propagateIfInstanceOf(t, IOException.class); + if (input != null) { + try { + input.close(); + } catch (IOException e) { + t.addSuppressed(e); + } + } + Throwables.throwIfUnchecked(t); + if (t instanceof IOException) { + throw (IOException) t; + } throw new IOException(t); } } @@ -343,7 +349,7 @@ private static void expandTarStream(TarArchiveInputStream tis, File targetDir) DirUtils.mkdirs(output); } else { DirUtils.mkdirs(output.getParentFile()); - ByteStreams.copy(tis, com.google.common.io.Files.newOutputStreamSupplier(output)); + com.google.common.io.Files.asByteSink(output).writeFrom(tis); } entry = tis.getNextTarEntry(); } @@ -412,14 +418,18 @@ public T next() throws IOException { } /** - * Creates a new {@link OutputSupplier} that can provides {@link OutputStream} for the given - * location. + * Creates a new {@link ByteSink} that provides {@link OutputStream} for the given location. * * @param location Location for the output. - * @return A {@link OutputSupplier}. + * @return A {@link ByteSink}. */ - public static OutputSupplier newOutputSupplier(final Location location) { - return location::getOutputStream; + public static ByteSink newByteSink(final Location location) { + return new ByteSink() { + @Override + public OutputStream openStream() throws IOException { + return location.getOutputStream(); + } + }; } /** @@ -484,7 +494,7 @@ public static Location getLocationFromAbsolutePath(LocationFactory locationFacto return locationFactory.create(uri); } catch (URISyntaxException e) { // Should not happen. - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -570,7 +580,7 @@ public Method get() { } return getFileLengthMethod; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } }); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java index dfc46ccabd46..0325d1f08179 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common.lang; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Splitter; import com.google.common.base.Throwables; @@ -67,7 +68,7 @@ private ClassLoaders() { */ public static Class loadClass(String className, @Nullable ClassLoader classLoader, Object caller) throws ClassNotFoundException { - ClassLoader cl = Objects.firstNonNull(classLoader, caller.getClass().getClassLoader()); + ClassLoader cl = MoreObjects.firstNonNull(classLoader, caller.getClass().getClassLoader()); return cl.loadClass(className); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java b/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java index 11f95f46aaa0..1371caa99206 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common.logging; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.Maps; import java.lang.reflect.Method; @@ -113,7 +114,7 @@ public Map getSystemTagsAsString() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("systemTags", systemTags) .toString(); } @@ -140,7 +141,7 @@ public String getValue() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("value", value) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java b/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java index 42283b6e1fcf..6941c3de4120 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java @@ -94,13 +94,13 @@ public void leader() { coordinator = new ResourceCoordinator(zk, discoveryServiceClient, new BalancedAssignmentStrategy()); - coordinator.startAndWait(); + coordinator.startAsync().awaitRunning(); } @Override public void follower() { if (coordinator != null) { - coordinator.stopAndWait(); + coordinator.stopAsync().awaitTerminated(); coordinator = null; } } @@ -130,8 +130,8 @@ protected void startUp() throws Exception { Discoverable discoverable = createDiscoverable(serviceName); cancelDiscoverable = discoveryService.register(ResolvingDiscoverable.of(discoverable)); - election.start(); - resourceClient.startAndWait(); + election.startAsync(); + resourceClient.startAsync().awaitRunning(); cancelResourceHandler = resourceClient.subscribe(serviceName, createResourceHandler(discoverable)); @@ -162,7 +162,8 @@ protected void shutDown() throws Exception { LOG.error("Exception while shutting down{}.", serviceName, th); } if (throwable != null) { - throw Throwables.propagate(throwable); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException(throwable); } LOG.info("Stopped ResourceBalancer {} service.", serviceName); } @@ -181,18 +182,18 @@ public void onChange(Collection partitionReplicas) { LOG.info("Partitions changed {}, service: {}", partitions, serviceName); try { if (service != null) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } if (partitions.isEmpty() || !election.isRunning()) { service = null; } else { service = createService(partitions); - service.startAndWait(); + service.startAsync().awaitRunning(); } } catch (Throwable t) { LOG.error("Failed to change partitions, service: {}.", serviceName, t); completion.setException(t); - stop(); + stopAsync(); } } @@ -200,7 +201,7 @@ public void onChange(Collection partitionReplicas) { public void finished(Throwable failureCause) { try { if (service != null) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); service = null; } completion.set(null); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java index 47a52840859f..49b6675ee621 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java @@ -17,6 +17,7 @@ package io.cdap.cdap.common.service; import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.common.util.concurrent.Service; import io.cdap.cdap.api.retry.RetriesExhaustedException; import io.cdap.cdap.common.logging.LogSamplers; import io.cdap.cdap.common.logging.Loggers; @@ -56,7 +57,7 @@ public abstract class AbstractRetryableScheduledService extends AbstractSchedule */ protected AbstractRetryableScheduledService(RetryStrategy retryStrategy) { this.retryStrategy = retryStrategy; - addListener(new ServiceListenerAdapter() { + addListener(new Service.Listener() { @Override public void failed(State from, Throwable failure) { LOG.error("Scheduled service {} terminated due to failure", getServiceName(), failure); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java index 55f616d5020e..8bbd7bced4ef 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java @@ -17,12 +17,16 @@ package io.cdap.cdap.common.service; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.twill.common.Threads; @@ -68,7 +72,30 @@ public void run() { while (!stopped) { try { - currentDelegate.start().get(); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference failureCause = new AtomicReference<>(); + currentDelegate.addListener(new Listener() { + @Override + public void running() { + latch.countDown(); + } + + @Override + public void terminated(Service.State from) { + latch.countDown(); + } + + @Override + public void failed(Service.State from, Throwable failure) { + failureCause.set(failure); + latch.countDown(); + } + }, MoreExecutors.directExecutor()); + currentDelegate.startAsync(); + latch.await(); + if (failureCause.get() != null) { + throw failureCause.get(); + } // Only assigned the delegate if and only if the delegate service started successfully startedService = currentDelegate; break; @@ -112,17 +139,18 @@ protected void doStop() { // the setting of the startedService field. When that happens, the stop failure state is not propagated. // Nevertheless, there won't be any service left behind without stopping. if (startedService != null) { - Futures.addCallback(startedService.stop(), new FutureCallback() { + startedService.addListener(new Service.Listener() { @Override - public void onSuccess(State result) { + public void terminated(State from) { notifyStopped(); } @Override - public void onFailure(Throwable t) { - notifyFailed(t); + public void failed(State from, Throwable failure) { + notifyFailed(failure); } - }, Threads.SAME_THREAD_EXECUTOR); + }, MoreExecutors.directExecutor()); + startedService.stopAsync(); return; } @@ -130,7 +158,25 @@ public void onFailure(Throwable t) { // because if the underlying service is not yet started due to failure, it shouldn't affect the stop state // of this retrying service. if (currentDelegate != null) { - currentDelegate.stop().addListener(this::notifyStopped, Threads.SAME_THREAD_EXECUTOR); + Service.State currentState = currentDelegate.state(); + if (currentState == State.FAILED || currentState == State.TERMINATED) { + // Service is already in a terminal state, just notify that we're stopped + notifyStopped(); + return; + } + currentDelegate.addListener(new Service.Listener() { + @Override + public void terminated(State from) { + notifyStopped(); + } + + @Override + public void failed(State from, Throwable failure) { + notifyStopped(); + } + }, MoreExecutors.directExecutor()); + + currentDelegate.stopAsync(); return; } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java index 74d619454692..470978f1bd69 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java @@ -51,9 +51,9 @@ private Services() { public static void startAndWait(Service service, long timeout, TimeUnit timeoutUnit, @Nullable String timeoutErrorMessage) throws TimeoutException, InterruptedException, ExecutionException { - ListenableFuture startFuture = service.start(); + service.startAsync(); try { - startFuture.get(timeout, timeoutUnit); + service.awaitRunning(timeout, timeoutUnit); } catch (TimeoutException e) { LOG.error(timeoutErrorMessage != null ? timeoutErrorMessage : "Timeout while waiting to start service.", e); @@ -62,19 +62,15 @@ public static void startAndWait(Service service, long timeout, TimeUnit timeoutU timeoutException.setStackTrace(e.getStackTrace()); } try { - service.stop(); + service.stopAsync(); } catch (Exception stopException) { LOG.error("Error while trying to stop service: ", stopException); } throw timeoutException; - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting to start service.", e); - try { - service.stop(); - } catch (Exception stopException) { - LOG.error("Error while trying to stop service:", stopException); - } - throw e; + } catch (IllegalStateException e) { + // awaitRunning throws IllegalStateException if the service has failed + Throwable cause = e.getCause(); + throw new ExecutionException(cause != null ? cause : e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java index d9fd6c75729c..be623fada689 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java @@ -110,7 +110,8 @@ public final void initialize(TwillContext context) { Preconditions.checkArgument(!services.isEmpty(), "Should have at least one service"); LOG.info("Runnable initialized {}", name); } catch (Throwable t) { - throw Throwables.propagate(t); + Throwables.throwIfUnchecked(t); + throw new RuntimeException(t); } } @@ -137,7 +138,9 @@ public void run() { } catch (InterruptedException e) { LOG.debug("Waiting on latch interrupted {}", name); } catch (ExecutionException e) { - throw Throwables.propagate(e.getCause()); + Throwable cause = e.getCause(); + Throwables.throwIfUnchecked(cause); + throw new RuntimeException(cause); } } @@ -153,7 +156,7 @@ public void destroy() { private Service.Listener createServiceListener(final String name, final SettableFuture future) { - return new ServiceListenerAdapter() { + return new Service.Listener() { @Override public void terminated(Service.State from) { LOG.info("Service " + name + " terminated"); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java index 3074529dcf19..8a5e6ab14a1b 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java @@ -16,6 +16,8 @@ package io.cdap.cdap.common.twill; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Service; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -26,25 +28,82 @@ import javax.annotation.Nullable; import org.apache.twill.api.Command; import org.apache.twill.api.ResourceReport; +import org.apache.twill.api.RunId; +import org.apache.twill.api.ServiceController; import org.apache.twill.api.TwillController; import org.apache.twill.api.logging.LogEntry; import org.apache.twill.api.logging.LogHandler; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.Discoverable; import org.apache.twill.discovery.ServiceDiscovered; -import org.apache.twill.internal.AbstractExecutionServiceController; import org.apache.twill.internal.RunIds; /** * A no-op {@link TwillController}. */ -final class NoopTwillController extends AbstractExecutionServiceController implements +final class NoopTwillController extends AbstractIdleService implements TwillController { + private final RunId runId; + NoopTwillController() { - super(RunIds.generate()); + this.runId = RunIds.generate(); + } + + @Override + public Future terminate() { + stopAsync(); + return CompletableFuture.completedFuture(this); + } + + @Override + public void onRunning(final Runnable runnable, Executor executor) { + if (isRunning()) { + executor.execute(runnable); + return; + } + Service.Listener listener = new Service.Listener() { + @Override + public void running() { + runnable.run(); + } + }; + addListener(listener, executor); + } + + @Override + public void onTerminated(final Runnable runnable, Executor executor) { + if (state() == State.TERMINATED || state() == State.FAILED) { + executor.execute(runnable); + return; + } + Service.Listener listener = new Service.Listener() { + @Override + public void terminated(State from) { + runnable.run(); + } + @Override + public void failed(State from, Throwable failure) { + runnable.run(); + } + }; + addListener(listener, executor); } + @Override + public ServiceController.TerminationStatus getTerminationStatus() { + if (state() == State.TERMINATED) { + return ServiceController.TerminationStatus.SUCCEEDED; + } + if (state() == State.FAILED) { + return ServiceController.TerminationStatus.FAILED; + } + return null; // Running or starting + } + @Override + public RunId getRunId() { + return runId; + } @Override public void addLogHandler(LogHandler handler) { // no-op diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java index 6e3e9d69f9a8..7a8dea597912 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common.utils; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; /** @@ -83,7 +84,7 @@ public B getSecond() { */ @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("first", first) .add("second", second) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java index 4598513fe11b..6558f6e943fe 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java @@ -35,7 +35,7 @@ public class TimeBoundIterator extends AbstractIterator { private final Stopwatch stopwatch; public TimeBoundIterator(Iterator delegate, long timeBoundMillis) { - this(delegate, timeBoundMillis, new Stopwatch()); + this(delegate, timeBoundMillis, Stopwatch.createStarted()); } public TimeBoundIterator(Iterator delegate, long timeBoundMillis, Stopwatch stopwatch) { @@ -49,7 +49,7 @@ public TimeBoundIterator(Iterator delegate, long timeBoundMillis, Stopwatch s @Override protected T computeNext() { - if (stopwatch.elapsedMillis() < timeBoundMillis && delegate.hasNext()) { + if (stopwatch.elapsed().toMillis() < timeBoundMillis && delegate.hasNext()) { return delegate.next(); } return endOfData(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java index f9aae9efccdb..37032f9058f1 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.common.zookeeper.coordination; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.primitives.Ints; import java.util.Comparator; @@ -82,7 +83,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("partition", name) .add("replica", replicaId) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java index 01aeb7996ad7..1f62d56d571e 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.common.zookeeper.ZKExtOperations; import java.util.EnumSet; @@ -132,7 +133,7 @@ public ListenableFuture fetchRequirement(String resourceNam return Futures.transform( ZKOperations.ignoreError(zkClient.getData(zkPath), KeeperException.NoNodeException.class, null), - NODE_DATA_TO_REQUIREMENT + NODE_DATA_TO_REQUIREMENT, MoreExecutors.directExecutor() ); } @@ -150,7 +151,7 @@ public ListenableFuture deleteRequirement(String resourceName) { return Futures.transform( ZKOperations.ignoreError(zkClient.delete(zkPath), KeeperException.NoNodeException.class, resourceName), - Functions.constant(resourceName) + Functions.constant(resourceName), MoreExecutors.directExecutor() ); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java index 15ad26371137..f01815938ce9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.common.zookeeper.coordination; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; @@ -60,7 +61,7 @@ public Set getPartitions() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("partitions", partitions) .toString(); @@ -139,7 +140,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("replicas", replicas) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java index 15fa2d73582f..c04094cb9be7 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -81,9 +82,9 @@ public LeaderElectionInfoService(ZKClient zkClient, String leaderElectionPath) { public SortedMap getParticipants(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { try { - Stopwatch stopwatch = new Stopwatch().start(); + Stopwatch stopwatch = Stopwatch.createStarted(); CountDownLatch readyLatch = readyFuture.get(timeout, unit); - long latchTimeout = Math.max(0, stopwatch.elapsedTime(unit) - timeout); + long latchTimeout = Math.max(0, stopwatch.elapsed(unit) - timeout); readyLatch.await(latchTimeout, unit); } catch (ExecutionException e) { // The ready future never throw on get. If this happen, just return an empty map @@ -237,7 +238,7 @@ public void onFailure(Throwable t) { readyLatch.countDown(); } } - }); + }, MoreExecutors.directExecutor()); } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java b/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java index 35c5e51fc9ee..b6391bc80d16 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java +++ b/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java @@ -16,6 +16,7 @@ package io.cdap.cdap.data2.util; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -70,7 +71,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("namespace", namespace) .add("tableName", tableName) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java index a2b5d7e945a9..51d070f3a2b6 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.internal.app.store; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; @@ -186,7 +187,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("programRunId", getProgramRunId()) .add("startTs", getStartTs()) .add("runTs", getRunTs()) diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java index 63396168949b..e06e60466b4d 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java @@ -39,16 +39,16 @@ public class ZKPropertyStoreTest extends PropertyStoreTestBase { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkClient.stopAndWait(); - zkServer.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Override diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java index b41436c0b7bb..c15cc76834a8 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java @@ -71,20 +71,20 @@ public void beforeTest() throws Exception { if (kafkaZkNamespace != null) { ZKClientService zkClient = new DefaultZKClientService(zkServer.getConnectionStr(), 2000, null, ImmutableMultimap.of()); - zkClient.startAndWait(); + zkClient.startAsync().awaitTerminated(); zkClient.create("/" + kafkaZkNamespace, null, CreateMode.PERSISTENT); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); kafkaZkConnect += "/" + kafkaZkNamespace; } kafkaServer = createKafkaServer(kafkaZkConnect, TEMP_FOLDER.newFolder()); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitTerminated(); } @After public void afterTest() { - kafkaServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); zkServer.stopAndWait(); } @@ -101,7 +101,7 @@ public void testWithSharedZkClient() throws Exception { // Get the shared zkclient and start it ZKClientService zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitTerminated(); final int baseZkConns = getZkConnections(); @@ -109,8 +109,8 @@ public void testWithSharedZkClient() throws Exception { final BrokerService brokerService = injector.getInstance(BrokerService.class); // Start both kafka and broker services, it shouldn't affect the state of the shared zk client - kafkaClientService.startAndWait(); - brokerService.startAndWait(); + kafkaClientService.startAsync().awaitTerminated(); + brokerService.startAsync().awaitTerminated(); // Shouldn't affect the shared zk client state Assert.assertTrue(zkClientService.isRunning()); @@ -127,8 +127,8 @@ public Boolean call() throws Exception { }, 5L, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); // Stop both, still shouldn't affect the state of the shared zk client - kafkaClientService.stopAndWait(); - brokerService.stopAndWait(); + kafkaClientService.stopAsync().awaitTerminated(); + brokerService.stopAsync().awaitTerminated(); // Still shouldn't affect the shared zk client Assert.assertTrue(zkClientService.isRunning()); @@ -136,7 +136,7 @@ public Boolean call() throws Exception { // It still shouldn't increase the number of zk client connections Assert.assertEquals(baseZkConns, getZkConnections()); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } @Test @@ -154,7 +154,7 @@ public void testWithDedicatedZkClient() throws Exception { // Get the shared zkclient and start it ZKClientService zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitTerminated(); int baseZkConns = getZkConnections(); @@ -162,12 +162,12 @@ public void testWithDedicatedZkClient() throws Exception { final BrokerService brokerService = injector.getInstance(BrokerService.class); // Start the kafka client, it should increase the zk connections by 1 - kafkaClientService.startAndWait(); + kafkaClientService.startAsync().awaitTerminated(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Start the broker service, // it shouldn't affect the zk connections, as it share the same zk client with kafka client - brokerService.startAndWait(); + brokerService.startAsync().awaitTerminated(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Make sure it is talking to Kafka. @@ -182,17 +182,17 @@ public Boolean call() throws Exception { Assert.assertTrue(zkClientService.isRunning()); // Stop the broker service, it shouldn't affect the zk connections, as it is still used by the kafka client - brokerService.stopAndWait(); + brokerService.stopAsync().awaitTerminated(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Stop the kafka client, the zk connections should be reduced by 1 - kafkaClientService.stopAndWait(); + kafkaClientService.stopAsync().awaitTerminated(); Assert.assertEquals(baseZkConns, getZkConnections()); // Still shouldn't affect the shared zk client Assert.assertTrue(zkClientService.isRunning()); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } /** diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java index c809a29c2824..d106dd5d5211 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java @@ -80,7 +80,7 @@ public void testMasterDiscovery() { ); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); DiscoveryServiceClient discoveryServiceClient = injector @@ -106,7 +106,7 @@ public void testMasterDiscovery() { } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -119,7 +119,7 @@ public void testProgramDiscovery() { ); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { // Register a service using the twill ZKClient. This is to simulate how a user Service program register ProgramId programId = NamespaceId.DEFAULT.app("app").service("service"); @@ -149,7 +149,7 @@ public void testProgramDiscovery() { } } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java index 5781de705674..5c9f2e168714 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java @@ -17,6 +17,7 @@ package io.cdap.cdap.common.internal.remote; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.api.service.worker.RemoteExecutionException; @@ -39,6 +40,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.InMemoryDiscoveryService; import org.junit.After; @@ -90,7 +93,7 @@ public void modify(ChannelPipeline pipeline) { public void beforeTest() { metricCollectors = new HashMap<>(); mockMetricsCollector = createMockMetricsCollectionService(); - mockMetricsCollector.startAndWait(); + mockMetricsCollector.startAsync().awaitRunning(); registered = discoveryService.register(URIScheme.createDiscoverable(Constants.Service.TASK_WORKER, httpService)); } @@ -111,36 +114,57 @@ public byte[] decrypt(byte[] cipherData, byte[] associatedData) throws CipherExc private MetricsCollectionService createMockMetricsCollectionService() { return new MetricsCollectionService() { + private volatile State state = State.NEW; + @Override - public ListenableFuture start() { - return null; + public Service startAsync() { + state = State.RUNNING; + return this; } @Override - public State startAndWait() { - return null; + public Service stopAsync() { + state = State.TERMINATED; + return this; // Must return 'this' for chaining } @Override - public boolean isRunning() { - return false; + public void awaitRunning() { + // No-op: assume started immediately } @Override - public State state() { - return null; + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + // No-op } @Override - public ListenableFuture stop() { + public void awaitTerminated() { + // No-op + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + // No-op + } + + @Override + public Throwable failureCause() { return null; } @Override - public State stopAndWait() { + public boolean isRunning() { + return false; + } + + @Override + public State state() { return null; } + + @Override public void addListener(final Listener listener, final Executor executor) {} @@ -205,7 +229,7 @@ public void testFailedMetrics() throws Exception { // Exception thrown in the task executor should be in the exception message in the caller Assert.assertEquals("Invalid", e.getMessage()); } - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present @@ -224,7 +248,7 @@ public void testSuccessMetrics() throws Exception { RunnableTaskRequest runnableTaskRequest = RunnableTaskRequest.getBuilder(ValidRunnableClass.class.getName()). withParam("param").withNamespace("testNamespace").build(); remoteTaskExecutor.runTask(runnableTaskRequest); - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present @@ -249,7 +273,7 @@ public void testRetryMetrics() throws Exception { } catch (Exception e) { // expected } - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java index 8a5d4b815491..9cd1fc64ad43 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java @@ -72,13 +72,13 @@ public void testResourceBalancerService() throws Exception { // Simple test for resource balancer does react to discovery changes correct // More detailed tests are in ResourceCoordinatorTest, which the ResourceBalancerService depends on ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on stop case final TestBalancerService stopFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, false, false); - stopFailureService.startAndWait(); + stopFailureService.startAsync().awaitRunning(); // Should get all four partitions Tasks.waitFor(ImmutableSet.of(0, 1, 2, 3), new Callable>() { @@ -103,20 +103,20 @@ public Integer call() throws Exception { cancellable.cancel(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @Test public void testServiceStartFailure() throws Exception { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on start case final TestBalancerService startFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, true, false); - startFailureService.startAndWait(); + startFailureService.startAsync().awaitRunning(); // The resource balance service should fail Tasks.waitFor(Service.State.FAILED, new Callable() { @@ -126,20 +126,20 @@ public Service.State call() throws Exception { } }, 10, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @Test public void testServiceStopFailure() throws Exception { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on stop case final TestBalancerService stopFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, false, true); - stopFailureService.startAndWait(); + stopFailureService.startAsync().awaitRunning(); // Should get four partitions Tasks.waitFor(ImmutableSet.of(0, 1, 2, 3), new Callable>() { @@ -165,7 +165,7 @@ public Service.State call() throws Exception { cancellable.cancel(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java index 6aee3250ddb2..102c6bbd1d4f 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java @@ -62,17 +62,15 @@ public void testCommandPortServer() throws Exception { .build(); final CountDownLatch stopLatch = new CountDownLatch(1); - Futures.addCallback(server.start(), new FutureCallback() { - @Override - public void onSuccess(Service.State result) { - stopLatch.countDown(); - } + try{ + server.startAsync().awaitRunning(); + stopLatch.countDown(); + } catch (IllegalStateException e) { + // Service failed to start + stopLatch.countDown(); + throw e; + } - @Override - public void onFailure(Throwable t) { - stopLatch.countDown(); - } - }); // wait a bit for service to start TimeUnit.SECONDS.sleep(1); @@ -95,7 +93,7 @@ public void onFailure(Throwable t) { } } finally { - server.stopAndWait(); + server.stopAsync().awaitTerminated(); } Assert.assertEquals(10, handler.getCounter()); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java index b2233dc07704..dccd1583ff93 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java @@ -36,13 +36,15 @@ */ public class RetryOnStartFailureServiceTest { + private static final long TIMEOUT_SECONDS = 10; + @Test public void testRetrySucceed() throws InterruptedException { CountDownLatch startLatch = new CountDownLatch(1); Service service = new RetryOnStartFailureService( createServiceSupplier(3, startLatch, new CountDownLatch(1), false), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); Assert.assertTrue(startLatch.await(1, TimeUnit.SECONDS)); } @@ -54,14 +56,14 @@ public void testRetryFail() throws InterruptedException { RetryStrategies.limit(10, RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS))); final CountDownLatch failureLatch = new CountDownLatch(1); - service.addListener(new ServiceListenerAdapter() { + service.addListener(new Service.Listener() { @Override public void failed(Service.State from, Throwable failure) { failureLatch.countDown(); } }, Threads.SAME_THREAD_EXECUTOR); - service.start(); + service.startAsync(); Assert.assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); Assert.assertFalse(startLatch.await(100, TimeUnit.MILLISECONDS)); } @@ -73,9 +75,9 @@ public void testStopWhileRetrying() throws InterruptedException { Service service = new RetryOnStartFailureService( createServiceSupplier(1000, new CountDownLatch(1), failureLatch, false), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); Assert.assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } @Test @@ -85,7 +87,7 @@ public void testStopFailurePropagate() throws InterruptedException, TimeoutExcep final RetryOnStartFailureService service = new RetryOnStartFailureService( createServiceSupplier(0, startLatch, new CountDownLatch(1), true), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); // block until the underlying service started successfully Assert.assertTrue(startLatch.await(1, TimeUnit.SECONDS)); // As documented in the RetryOnStartFailureService, there is a small race after the @@ -99,7 +101,7 @@ public Boolean call() throws Exception { } }, 5, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); Assert.fail("Expected failure in stopping"); } catch (Exception e) { Assert.assertEquals("Intentional failure to shutdown", Throwables.getRootCause(e).getMessage()); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java index 2ce4ce226215..6fe1bb793b01 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java @@ -45,9 +45,9 @@ protected long runTask() { } }; - service.start(); + service.startAsync(); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } @Test @@ -59,12 +59,12 @@ protected long runTask() throws Exception { } }; - service.start(); + service.startAsync(); // Wait for the service to fail Tasks.waitFor(Service.State.FAILED, service::state, 5, TimeUnit.SECONDS, 10, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { // The root cause should be the one throw from the runTask. It should suppressed the retry exhausted exception. Throwable rootCause = Throwables.getRootCause(e); @@ -89,12 +89,12 @@ protected boolean shouldRetry(Exception ex) { } }; - service.start(); + service.startAsync(); // Wait for the service to fail Tasks.waitFor(Service.State.FAILED, service::state, 5, TimeUnit.SECONDS, 10, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { // The root cause should be the one throw from the runTask. Throwable rootCause = Throwables.getRootCause(e); @@ -118,8 +118,8 @@ protected long runTask() throws Exception { return 1L; } }; - service.start(); + service.startAsync(); Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java index 8fcf8a0014b8..10eb8ae35005 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java @@ -131,7 +131,7 @@ public void testLocalPortForwarding() throws Exception { // Starts an echo server for testing the port forwarding EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { // Creates the DataConsumer for receiving data and validating the lifecycle StringBuilder received = new StringBuilder(); @@ -185,7 +185,7 @@ public void finished() { } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -193,7 +193,7 @@ public void finished() { public void testForwardingOnSessionClose() throws Exception { EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { SSHConfig sshConfig = getSSHConfig(); AtomicBoolean finished = new AtomicBoolean(false); @@ -236,7 +236,7 @@ public void finished() { } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -244,7 +244,7 @@ public void finished() { public void testRemotePortForwarding() throws Exception { EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { SSHConfig sshConfig = getSSHConfig(); @@ -264,7 +264,7 @@ public void testRemotePortForwarding() throws Exception { } } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -320,7 +320,13 @@ protected void run() throws IOException { } catch (IOException e) { LOG.error("Exception raised from the EchoServer handling thread", e); } finally { - Closeables.closeQuietly(socket); + try{ + if(socket != null && !socket.isClosed()){ + socket.shutdownInput(); + } + } catch (IOException e) { + // ignore + } } }); @@ -337,7 +343,13 @@ protected void run() throws IOException { @Override protected void triggerShutdown() { stopped = true; - Closeables.closeQuietly(serverSocket); + try{ + if(serverSocket != null && !serverSocket.isClosed()){ + serverSocket.close(); + } + } catch (IOException e) { + // ignore + } } } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java index 34758d971a5a..cfc5446698a9 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java @@ -34,7 +34,7 @@ public class TimeBoundIteratorTest { @Test public void testTimeBoundNotHit() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createStarted(ticker); List list = new ArrayList<>(); list.add(0); @@ -54,7 +54,7 @@ public void testTimeBoundNotHit() { @Test public void testTimeBoundImmediatelyHit() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createStarted(ticker); List list = new ArrayList<>(); list.add(0); @@ -70,7 +70,7 @@ public void testTimeBoundImmediatelyHit() { @Test public void testEarlyStop() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createStarted(ticker); List list = new ArrayList<>(); list.add(0); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java index 4abd581060df..3ee982697dff 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java @@ -68,8 +68,8 @@ public void testGetAndSet() throws Exception { ZKClientService zkClient1 = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); ZKClientService zkClient2 = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient1.startAndWait(); - zkClient2.startAndWait(); + zkClient1.startAsync().awaitTerminated(); + zkClient2.startAsync().awaitRunning(); // First a node would get created since no node there. ZKExtOperations.updateOrCreate(zkClient1, path, new Function() { @@ -134,15 +134,15 @@ public Integer apply(@Nullable Integer input) { Assert.assertNull(result); - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } @Test public void testCreateOrSet() throws Exception { String path = "/parent/testCreateOrSet"; ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); // Create with "1" Assert.assertEquals(1, ZKExtOperations.createOrSet(zkClient, path, @@ -156,14 +156,14 @@ public void testCreateOrSet() throws Exception { // Should get "2" back Assert.assertEquals(2, INT_CODEC.decode(zkClient.getData(path).get().getData()).intValue()); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } @Test public void testSetOrCreate() throws Exception { String path = "/parent/testSetOrCreate"; ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); // Create with "1" Assert.assertEquals(1, ZKExtOperations.setOrCreate(zkClient, path, @@ -177,7 +177,7 @@ public void testSetOrCreate() throws Exception { // Should get "2" back Assert.assertEquals(2, INT_CODEC.decode(zkClient.getData(path).get().getData()).intValue()); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } @AfterClass diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java index b7813fb679bc..2e1e78d74c80 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java @@ -71,18 +71,18 @@ public void testAssignment() throws InterruptedException, ExecutionException { new ZkClientModule(), new ZkDiscoveryModule()); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); try { ResourceCoordinator coordinator = new ResourceCoordinator(zkClient, injector.getInstance(DiscoveryServiceClient.class), new BalancedAssignmentStrategy()); - coordinator.startAndWait(); + coordinator.startAsync().awaitRunning(); try { ResourceCoordinatorClient client = new ResourceCoordinatorClient(zkClient); - client.startAndWait(); + client.startAsync().awaitRunning(); try { // Create a requirement @@ -171,14 +171,14 @@ public void testAssignment() throws InterruptedException, ExecutionException { cancelDiscoverable2.cancel(); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - coordinator.stopAndWait(); + coordinator.stopAsync().awaitTerminated(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java index 7ce62d05dbb2..e1c84f00bb59 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java @@ -71,12 +71,12 @@ public void testParticipants() throws Exception { List zkClients = new ArrayList<>(); ZKClientService infoZKClient = DefaultZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - infoZKClient.startAndWait(); + infoZKClient.startAsync().awaitRunning(); zkClients.add(infoZKClient); // Start the LeaderElectionInfoService LeaderElectionInfoService infoService = new LeaderElectionInfoService(infoZKClient, prefix); - infoService.startAndWait(); + infoService.startAsync().awaitRunning(); // This will timeout as there is no leader election node created yet try { @@ -90,7 +90,7 @@ public void testParticipants() throws Exception { List leaderElections = new ArrayList<>(); for (int i = 0; i < size; i++) { ZKClientService zkClient = DefaultZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClients.add(zkClient); final int participantId = i; @@ -105,7 +105,7 @@ public void follower() { LOG.info("Follow: {}", participantId); } }); - leaderElection.start(); + leaderElection.startAsync(); leaderElections.add(leaderElection); } @@ -136,7 +136,7 @@ public boolean apply(LeaderElectionInfoService.Participant input) { int expectedSize = size; for (LeaderElection leaderElection : leaderElections) { - leaderElection.stopAndWait(); + leaderElection.stopAsync().awaitTerminated(); Tasks.waitFor(--expectedSize, new Callable() { @Override public Integer call() throws Exception { @@ -150,10 +150,10 @@ public Integer call() throws Exception { Assert.assertTrue(snapshot.isEmpty()); Assert.assertEquals(participants, snapshot); - infoService.stopAndWait(); + infoService.stopAsync().awaitTerminated(); for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } diff --git a/pom.xml b/pom.xml index 01e064b96e36..1fff2f53790c 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 2.0.0 0.1.0 2.3.1 - 13.0.1 + 32.0.0-jre 4.0 3.3.6 2.2.4