Google-Guava Concurrent包里的Service框架浅析

简介:

原文地址  译文地址 译者:何一昕 校对:方腾飞

概述

Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。例如web服务器,RPC服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。

使用一个服务

一个服务正常生命周期有:

服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync() 。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。

Service也提供了一些方法用于等待服务状态转换的完成:

通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。

同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。

Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。

基础实现类

AbstractIdleService

 AbstractIdleService 类简单实现了Service接口、其在running状态时不会执行任何动作–因此在running时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承AbstractIdleService类,然后自己实现startUp() 和shutDown()方法就可以了。


1 protected void startUp() {
2 servlets.add(new GcStatsServlet());
3 }
4 protected void shutDown() {}

如上面的例子、由于任何请求到GcStatsServlet时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。

AbstractExecutionThreadService

AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:


1 public void run() {
2   while (isRunning()) {
3     // perform a unit of work
4   }
5 }

另外,你还可以重载triggerShutdown()方法让run()方法结束返回。

重载startUp()和shutDown()方法是可选的,不影响服务本身状态的管理


01 protected void startUp() {
02 dispatcher.listenForConnections(port, queue);
03  }
04  protected void run() {
05    Connection connection;
06    while ((connection = queue.take() != POISON)) {
07      process(connection);
08    }
09  }
10  protected void triggerShutdown() {
11    dispatcher.stopListeningForConnections(queue);
12    queue.put(POISON);
13  }

start()内部会调用startUp()方法,创建一个线程、然后在线程内调用run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。

AbstractScheduledService

AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit)  和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现;具体用法见javadoc。

AbstractService

如需要自定义的线程管理、可以通过扩展 AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。

继承AbstractService方法必须实现两个方法.

  • doStart():  首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
  • doStop() 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。

doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。

使用ServiceManager

除了对Service接口提供基础的实现类,Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:

  • startAsync()  : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
  • stopAsync() 将停止所有被管理的服务。
  • addListener 会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
  • awaitHealthy() 会等待所有的服务达到Running状态
  • awaitStopped()会等待所有服务达到终止状态

检测类的方法有:

  • isHealthy()  如果所有的服务处于Running状态、会返回True
  • servicesByState()以状态为索引返回当前所有服务的快照
  • startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。

我们建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。

附:TestCase、也可以作为练习Demo

ServiceTest


01 </pre>
02 /*
03  * Copyright (C) 2013 The Guava Authors
04  *
05  * Licensed under the Apache License, Version 2.0 (the "License");
06  * you may not use this file except in compliance with the License.
07  * You may obtain a copy of the License at
08  *
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17  
18 package com.google.common.util.concurrent;
19  
20 import static com.google.common.util.concurrent.Service.State.FAILED;
21 import static com.google.common.util.concurrent.Service.State.NEW;
22 import static com.google.common.util.concurrent.Service.State.RUNNING;
23 import static com.google.common.util.concurrent.Service.State.STARTING;
24 import static com.google.common.util.concurrent.Service.State.STOPPING;
25 import static com.google.common.util.concurrent.Service.State.TERMINATED;
26  
27 import junit.framework.TestCase;
28  
29 /**
30  * Unit tests for {@link Service}
31  */
32 public class ServiceTest extends TestCase {
33  
34 /** Assert on the comparison ordering of the State enum since we guarantee it. */
35  public void testStateOrdering() {
36  // List every valid (direct) state transition.
37  assertLessThan(NEW, STARTING);
38  assertLessThan(NEW, TERMINATED);
39  
40  assertLessThan(STARTING, RUNNING);
41  assertLessThan(STARTING, STOPPING);
42  assertLessThan(STARTING, FAILED);
43  
44  assertLessThan(RUNNING, STOPPING);
45  assertLessThan(RUNNING, FAILED);
46  
47  assertLessThan(STOPPING, FAILED);
48  assertLessThan(STOPPING, TERMINATED);
49  }
50  
51  private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
52  if (a.compareTo(b) >= 0) {
53  fail(String.format("Expected %s to be less than %s", a, b));
54  }
55  }
56 }
57 <pre>

AbstractIdleServiceTest


001 /*
002  * Copyright (C) 2009 The Guava Authors
003  *
004  * Licensed under the Apache License, Version 2.0 (the "License");
005  * you may not use this file except in compliance with the License.
006  * You may obtain a copy of the License at
007  *
009  *
010  * Unless required by applicable law or agreed to in writing, software
011  * distributed under the License is distributed on an "AS IS" BASIS,
012  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013  * See the License for the specific language governing permissions and
014  * limitations under the License.
015  */
016  
017 package com.google.common.util.concurrent;
018  
019 import static org.truth0.Truth.ASSERT;
020  
021 import com.google.common.collect.Lists;
022  
023 import junit.framework.TestCase;
024  
025 import java.util.List;
026 import java.util.concurrent.Executor;
027 import java.util.concurrent.TimeUnit;
028 import java.util.concurrent.TimeoutException;
029  
030 /**
031  * Tests for {@link AbstractIdleService}.
032  *
033  * @author Chris Nokleberg
034  * @author Ben Yu
035  */
036 public class AbstractIdleServiceTest extends TestCase {
037  
038 // Functional tests using real thread. We only verify publicly visible state.
039  // Interaction assertions are done by the single-threaded unit tests.
040  
041 public static class FunctionalTest extends TestCase {
042  
043 private static class DefaultService extends AbstractIdleService {
044  @Override protected void startUp() throws Exception {}
045  @Override protected void shutDown() throws Exception {}
046  }
047  
048 public void testServiceStartStop() throws Exception {
049  AbstractIdleService service = new DefaultService();
050  service.startAsync().awaitRunning();
051  assertEquals(Service.State.RUNNING, service.state());
052  service.stopAsync().awaitTerminated();
053  assertEquals(Service.State.TERMINATED, service.state());
054  }
055  
056 public void testStart_failed() throws Exception {
057  final Exception exception = new Exception("deliberate");
058  AbstractIdleService service = new DefaultService() {
059  @Override protected void startUp() throws Exception {
060  throw exception;
061  }
062  };
063  try {
064  service.startAsync().awaitRunning();
065  fail();
066  } catch (RuntimeException e) {
067  assertSame(exception, e.getCause());
068  }
069  assertEquals(Service.State.FAILED, service.state());
070  }
071  
072 public void testStop_failed() throws Exception {
073  final Exception exception = new Exception("deliberate");
074  AbstractIdleService service = new DefaultService() {
075  @Override protected void shutDown() throws Exception {
076  throw exception;
077  }
078  };
079  service.startAsync().awaitRunning();
080  try {
081  service.stopAsync().awaitTerminated();
082  fail();
083  } catch (RuntimeException e) {
084  assertSame(exception, e.getCause());
085  }
086  assertEquals(Service.State.FAILED, service.state());
087  }
088  }
089  
090 public void testStart() {
091  TestService service = new TestService();
092  assertEquals(0, service.startUpCalled);
093  service.startAsync().awaitRunning();
094  assertEquals(1, service.startUpCalled);
095  assertEquals(Service.State.RUNNING, service.state());
096  ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
097  }
098  
099 public void testStart_failed() {
100  final Exception exception = new Exception("deliberate");
101  TestService service = new TestService() {
102  @Override protected void startUp() throws Exception {
103  super.startUp();
104  throw exception;
105  }
106  };
107  assertEquals(0, service.startUpCalled);
108  try {
109  service.startAsync().awaitRunning();
110  fail();
111  } catch (RuntimeException e) {
112  assertSame(exception, e.getCause());
113  }
114  assertEquals(1, service.startUpCalled);
115  assertEquals(Service.State.FAILED, service.state());
116  ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
117  }
118  
119 public void testStop_withoutStart() {
120  TestService service = new TestService();
121  service.stopAsync().awaitTerminated();
122  assertEquals(0, service.startUpCalled);
123  assertEquals(0, service.shutDownCalled);
124  assertEquals(Service.State.TERMINATED, service.state());
125  ASSERT.that(service.transitionStates).isEmpty();
126  }
127  
128 public void testStop_afterStart() {
129  TestService service = new TestService();
130  service.startAsync().awaitRunning();
131  assertEquals(1, service.startUpCalled);
132  assertEquals(0, service.shutDownCalled);
133  service.stopAsync().awaitTerminated();
134  assertEquals(1, service.startUpCalled);
135  assertEquals(1, service.shutDownCalled);
136  assertEquals(Service.State.TERMINATED, service.state());
137  ASSERT.that(service.transitionStates)
138  .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
139  }
140  
141 public void testStop_failed() {
142  final Exception exception = new Exception("deliberate");
143  TestService service = new TestService() {
144  @Override protected void shutDown() throws Exception {
145  super.shutDown();
146  throw exception;
147  }
148  };
149  service.startAsync().awaitRunning();
150  assertEquals(1, service.startUpCalled);
151  assertEquals(0, service.shutDownCalled);
152  try {
153  service.stopAsync().awaitTerminated();
154  fail();
155  } catch (RuntimeException e) {
156  assertSame(exception, e.getCause());
157  }
158  assertEquals(1, service.startUpCalled);
159  assertEquals(1, service.shutDownCalled);
160  assertEquals(Service.State.FAILED, service.state());
161  ASSERT.that(service.transitionStates)
162  .has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
163  }
164  
165 public void testServiceToString() {
166  AbstractIdleService service = new TestService();
167  assertEquals("TestService [NEW]", service.toString());
168  service.startAsync().awaitRunning();
169  assertEquals("TestService [RUNNING]", service.toString());
170  service.stopAsync().awaitTerminated();
171  assertEquals("TestService [TERMINATED]", service.toString());
172  }
173  
174 public void testTimeout() throws Exception {
175  // Create a service whose executor will never run its commands
176  Service service = new TestService() {
177  @Override protected Executor executor() {
178  return new Executor() {
179  @Override public void execute(Runnable command) {}
180  };
181  }
182  };
183  try {
184  service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
185  fail("Expected timeout");
186  } catch (TimeoutException e) {
187  ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
188  }
189  }
190  
191 private static class TestService extends AbstractIdleService {
192  int startUpCalled = 0;
193  int shutDownCalled = 0;
194  final List<State> transitionStates = Lists.newArrayList();
195  
196 @Override protected void startUp() throws Exception {
197  assertEquals(0, startUpCalled);
198  assertEquals(0, shutDownCalled);
199  startUpCalled++;
200  assertEquals(State.STARTING, state());
201  }
202  
203 @Override protected void shutDown() throws Exception {
204  assertEquals(1, startUpCalled);
205  assertEquals(0, shutDownCalled);
206  shutDownCalled++;
207  assertEquals(State.STOPPING, state());
208  }
209  
210 @Override protected Executor executor() {
211  transitionStates.add(state());
212  return MoreExecutors.sameThreadExecutor();
213  }
214  }
215 }
216  
217 <pre>

AbstractScheduledServiceTest


001 </pre>
002 /*
003  * Copyright (C) 2011 The Guava Authors
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  */
017  
018 package com.google.common.util.concurrent;
019  
020 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
021 import com.google.common.util.concurrent.Service.State;
022  
023 import junit.framework.TestCase;
024  
025 import java.util.concurrent.CountDownLatch;
026 import java.util.concurrent.CyclicBarrier;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.ScheduledExecutorService;
031 import java.util.concurrent.ScheduledFuture;
032 import java.util.concurrent.ScheduledThreadPoolExecutor;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicBoolean;
035 import java.util.concurrent.atomic.AtomicInteger;
036  
037 /**
038  * Unit test for {@link AbstractScheduledService}.
039  *
040  * @author Luke Sandberg
041  */
042  
043 public class AbstractScheduledServiceTest extends TestCase {
044  
045 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
046  volatile ScheduledFuture<?> future = null;
047  
048 volatile boolean atFixedRateCalled = false;
049  volatile boolean withFixedDelayCalled = false;
050  volatile boolean scheduleCalled = false;
051  
052 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
053  @Override
054  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
055  long delay, TimeUnit unit) {
056  return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
057  }
058  };
059  
060 public void testServiceStartStop() throws Exception {
061  NullService service = new NullService();
062  service.startAsync().awaitRunning();
063  assertFalse(future.isDone());
064  service.stopAsync().awaitTerminated();
065  assertTrue(future.isCancelled());
066  }
067  
068 private class NullService extends AbstractScheduledService {
069  @Override protected void runOneIteration() throws Exception {}
070  @Override protected Scheduler scheduler() { return configuration; }
071  @Override protected ScheduledExecutorService executor() { return executor; }
072  }
073  
074 public void testFailOnExceptionFromRun() throws Exception {
075  TestService service = new TestService();
076  service.runException = new Exception();
077  service.startAsync().awaitRunning();
078  service.runFirstBarrier.await();
079  service.runSecondBarrier.await();
080  try {
081  future.get();
082  fail();
083  } catch (ExecutionException e) {
084  // An execution exception holds a runtime exception (from throwables.propogate) that holds our
085  // original exception.
086  assertEquals(service.runException, e.getCause().getCause());
087  }
088  assertEquals(service.state(), Service.State.FAILED);
089  }
090  
091 public void testFailOnExceptionFromStartUp() {
092  TestService service = new TestService();
093  service.startUpException = new Exception();
094  try {
095  service.startAsync().awaitRunning();
096  fail();
097  } catch (IllegalStateException e) {
098  assertEquals(service.startUpException, e.getCause());
099  }
100  assertEquals(0, service.numberOfTimesRunCalled.get());
101  assertEquals(Service.State.FAILED, service.state());
102  }
103  
104 public void testFailOnExceptionFromShutDown() throws Exception {
105  TestService service = new TestService();
106  service.shutDownException = new Exception();
107  service.startAsync().awaitRunning();
108  service.runFirstBarrier.await();
109  service.stopAsync();
110  service.runSecondBarrier.await();
111  try {
112  service.awaitTerminated();
113  fail();
114  } catch (IllegalStateException e) {
115  assertEquals(service.shutDownException, e.getCause());
116  }
117  assertEquals(Service.State.FAILED, service.state());
118  }
119  
120 public void testRunOneIterationCalledMultipleTimes() throws Exception {
121  TestService service = new TestService();
122  service.startAsync().awaitRunning();
123  for (int i = 1; i < 10; i++) {
124  service.runFirstBarrier.await();
125  assertEquals(i, service.numberOfTimesRunCalled.get());
126  service.runSecondBarrier.await();
127  }
128  service.runFirstBarrier.await();
129  service.stopAsync();
130  service.runSecondBarrier.await();
131  service.stopAsync().awaitTerminated();
132  }
133  
134 public void testExecutorOnlyCalledOnce() throws Exception {
135  TestService service = new TestService();
136  service.startAsync().awaitRunning();
137  // It should be called once during startup.
138  assertEquals(1, service.numberOfTimesExecutorCalled.get());
139  for (int i = 1; i < 10; i++) {
140  service.runFirstBarrier.await();
141  assertEquals(i, service.numberOfTimesRunCalled.get());
142  service.runSecondBarrier.await();
143  }
144  service.runFirstBarrier.await();
145  service.stopAsync();
146  service.runSecondBarrier.await();
147  service.stopAsync().awaitTerminated();
148  // Only called once overall.
149  assertEquals(1, service.numberOfTimesExecutorCalled.get());
150  }
151  
152 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
153  final CountDownLatch terminationLatch = new CountDownLatch(1);
154  AbstractScheduledService service = new AbstractScheduledService() {
155  volatile ScheduledExecutorService executorService;
156  @Override protected void runOneIteration() throws Exception {}
157  
158 @Override protected ScheduledExecutorService executor() {
159  if (executorService == null) {
160  executorService = super.executor();
161  // Add a listener that will be executed after the listener that shuts down the executor.
162  addListener(new Listener() {
163  @Override public void terminated(State from) {
164  terminationLatch.countDown();
165  }
166  }, MoreExecutors.sameThreadExecutor());
167  }
168  return executorService;
169  }
170  
171 @Override protected Scheduler scheduler() {
172  return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
173  }};
174  
175 service.startAsync();
176  assertFalse(service.executor().isShutdown());
177  service.awaitRunning();
178  service.stopAsync();
179  terminationLatch.await();
180  assertTrue(service.executor().isShutdown());
181  assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
182  }
183  
184 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
185  final CountDownLatch failureLatch = new CountDownLatch(1);
186  AbstractScheduledService service = new AbstractScheduledService() {
187  volatile ScheduledExecutorService executorService;
188  @Override protected void runOneIteration() throws Exception {}
189  
190 @Override protected void startUp() throws Exception {
191  throw new Exception("Failed");
192  }
193  
194 @Override protected ScheduledExecutorService executor() {
195  if (executorService == null) {
196  executorService = super.executor();
197  // Add a listener that will be executed after the listener that shuts down the executor.
198  addListener(new Listener() {
199  @Override public void failed(State from, Throwable failure) {
200  failureLatch.countDown();
201  }
202  }, MoreExecutors.sameThreadExecutor());
203  }
204  return executorService;
205  }
206  
207 @Override protected Scheduler scheduler() {
208  return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
209  }};
210  
211 try {
212  service.startAsync().awaitRunning();
213  fail("Expected service to fail during startup");
214  } catch (IllegalStateException expected) {}
215  failureLatch.await();
216  assertTrue(service.executor().isShutdown());
217  assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
218  }
219  
220 public void testSchedulerOnlyCalledOnce() throws Exception {
221  TestService service = new TestService();
222  service.startAsync().awaitRunning();
223  // It should be called once during startup.
224  assertEquals(1, service.numberOfTimesSchedulerCalled.get());
225  for (int i = 1; i < 10; i++) {
226  service.runFirstBarrier.await();
227  assertEquals(i, service.numberOfTimesRunCalled.get());
228  service.runSecondBarrier.await();
229  }
230  service.runFirstBarrier.await();
231  service.stopAsync();
232  service.runSecondBarrier.await();
233  service.awaitTerminated();
234  // Only called once overall.
235  assertEquals(1, service.numberOfTimesSchedulerCalled.get());
236  }
237  
238 private class TestService extends AbstractScheduledService {
239  CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
240  CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
241  
242 volatile boolean startUpCalled = false;
243  volatile boolean shutDownCalled = false;
244  AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
245  AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
246  AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
247  volatile Exception runException = null;
248  volatile Exception startUpException = null;
249  volatile Exception shutDownException = null;
250  
251 @Override
252  protected void runOneIteration() throws Exception {
253  assertTrue(startUpCalled);
254  assertFalse(shutDownCalled);
255  numberOfTimesRunCalled.incrementAndGet();
256  assertEquals(State.RUNNING, state());
257  runFirstBarrier.await();
258  runSecondBarrier.await();
259  if (runException != null) {
260  throw runException;
261  }
262  }
263  
264 @Override
265  protected void startUp() throws Exception {
266  assertFalse(startUpCalled);
267  assertFalse(shutDownCalled);
268  startUpCalled = true;
269  assertEquals(State.STARTING, state());
270  if (startUpException != null) {
271  throw startUpException;
272  }
273  }
274  
275 @Override
276  protected void shutDown() throws Exception {
277  assertTrue(startUpCalled);
278  assertFalse(shutDownCalled);
279  shutDownCalled = true;
280  if (shutDownException != null) {
281  throw shutDownException;
282  }
283  }
284  
285 @Override
286  protected ScheduledExecutorService executor() {
287  numberOfTimesExecutorCalled.incrementAndGet();
288  return executor;
289  }
290  
291 @Override
292  protected Scheduler scheduler() {
293  numberOfTimesSchedulerCalled.incrementAndGet();
294  return configuration;
295  }
296  }
297  
298 public static class SchedulerTest extends TestCase {
299  // These constants are arbitrary and just used to make sure that the correct method is called
300  // with the correct parameters.
301  private static final int initialDelay = 10;
302  private static final int delay = 20;
303  private static final TimeUnit unit = TimeUnit.MILLISECONDS;
304  
305 // Unique runnable object used for comparison.
306  final Runnable testRunnable = new Runnable() {@Override public void run() {}};
307  boolean called = false;
308  
309 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
310  long delay, TimeUnit unit) {
311  assertFalse(called); // only called once.
312  called = true;
313  assertEquals(SchedulerTest.initialDelay, initialDelay);
314  assertEquals(SchedulerTest.delay, delay);
315  assertEquals(SchedulerTest.unit, unit);
316  assertEquals(testRunnable, command);
317  }
318  
319 public void testFixedRateSchedule() {
320  Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
321  schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
322  @Override
323  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
324  long period, TimeUnit unit) {
325  assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
326  return null;
327  }
328  }, testRunnable);
329  assertTrue(called);
330  }
331  
332 public void testFixedDelaySchedule() {
333  Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
334  schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
335  @Override
336  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
337  long delay, TimeUnit unit) {
338  assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
339  return null;
340  }
341  }, testRunnable);
342  assertTrue(called);
343  }
344  
345 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
346  public AtomicInteger scheduleCounter = new AtomicInteger(0);
347  @Override
348  protected Schedule getNextSchedule() throws Exception {
349  scheduleCounter.incrementAndGet();
350  return new Schedule(0, TimeUnit.SECONDS);
351  }
352  }
353  
354 public void testCustomSchedule_startStop() throws Exception {
355  final CyclicBarrier firstBarrier = new CyclicBarrier(2);
356  final CyclicBarrier secondBarrier = new CyclicBarrier(2);
357  final AtomicBoolean shouldWait = new AtomicBoolean(true);
358  Runnable task = new Runnable() {
359  @Override public void run() {
360  try {
361  if (shouldWait.get()) {
362  firstBarrier.await();
363  secondBarrier.await();
364  }
365  } catch (Exception e) {
366  throw new RuntimeException(e);
367  }
368  }
369  };
370  TestCustomScheduler scheduler = new TestCustomScheduler();
371  Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
372  firstBarrier.await();
373  assertEquals(1, scheduler.scheduleCounter.get());
374  secondBarrier.await();
375  firstBarrier.await();
376  assertEquals(2, scheduler.scheduleCounter.get());
377  shouldWait.set(false);
378  secondBarrier.await();
379  future.cancel(false);
380  }
381  
382 public void testCustomSchedulerServiceStop() throws Exception {
383  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
384  service.startAsync().awaitRunning();
385  service.firstBarrier.await();
386  assertEquals(1, service.numIterations.get());
387  service.stopAsync();
388  service.secondBarrier.await();
389  service.awaitTerminated();
390  // Sleep for a while just to ensure that our task wasn't called again.
391  Thread.sleep(unit.toMillis(3 * delay));
392  assertEquals(1, service.numIterations.get());
393  }
394  
395 public void testBig() throws Exception {
396  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
397  @Override protected Scheduler scheduler() {
398  return new AbstractScheduledService.CustomScheduler() {
399  @Override
400  protected Schedule getNextSchedule() throws Exception {
401  // Explicitly yield to increase the probability of a pathological scheduling.
402  Thread.yield();
403  return new Schedule(0, TimeUnit.SECONDS);
404  }
405  };
406  }
407  };
408  service.useBarriers = false;
409  service.startAsync().awaitRunning();
410  Thread.sleep(50);
411  service.useBarriers = true;
412  service.firstBarrier.await();
413  int numIterations = service.numIterations.get();
414  service.stopAsync();
415  service.secondBarrier.await();
416  service.awaitTerminated();
417  assertEquals(numIterations, service.numIterations.get());
418  }
419  
420 private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
421  final AtomicInteger numIterations = new AtomicInteger(0);
422  volatile boolean useBarriers = true;
423  final CyclicBarrier firstBarrier = new CyclicBarrier(2);
424  final CyclicBarrier secondBarrier = new CyclicBarrier(2);
425  
426 @Override protected void runOneIteration() throws Exception {
427  numIterations.incrementAndGet();
428  if (useBarriers) {
429  firstBarrier.await();
430  secondBarrier.await();
431  }
432  }
433  
434 @Override protected ScheduledExecutorService executor() {
435  // use a bunch of threads so that weird overlapping schedules are more likely to happen.
436  return Executors.newScheduledThreadPool(10);
437  }
438  
439 @Override protected void startUp() throws Exception {}
440  
441 @Override protected void shutDown() throws Exception {}
442  
443 @Override protected Scheduler scheduler() {
444  return new CustomScheduler() {
445  @Override
446  protected Schedule getNextSchedule() throws Exception {
447  return new Schedule(delay, unit);
448  }};
449  }
450  }
451  
452 public void testCustomSchedulerFailure() throws Exception {
453  TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
454  service.startAsync().awaitRunning();
455  for (int i = 1; i < 4; i++) {
456  service.firstBarrier.await();
457  assertEquals(i, service.numIterations.get());
458  service.secondBarrier.await();
459  }
460  Thread.sleep(1000);
461  try {
462  service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
463  fail();
464  } catch (IllegalStateException e) {
465  assertEquals(State.FAILED, service.state());
466  }
467  }
468  
469 private static class TestFailingCustomScheduledService extends AbstractScheduledService {
470  final AtomicInteger numIterations = new AtomicInteger(0);
471  final CyclicBarrier firstBarrier = new CyclicBarrier(2);
472  final CyclicBarrier secondBarrier = new CyclicBarrier(2);
473  
474 @Override protected void runOneIteration() throws Exception {
475  numIterations.incrementAndGet();
476  firstBarrier.await();
477  secondBarrier.await();
478  }
479  
480 @Override protected ScheduledExecutorService executor() {
481  // use a bunch of threads so that weird overlapping schedules are more likely to happen.
482  return Executors.newScheduledThreadPool(10);
483  }
484  
485 @Override protected Scheduler scheduler() {
486  return new CustomScheduler() {
487  @Override
488  protected Schedule getNextSchedule() throws Exception {
489  if (numIterations.get() > 2) {
490  throw new IllegalStateException("Failed");
491  }
492  return new Schedule(delay, unit);
493  }};
494  }
495  }
496  }
497 }
498 <pre>

AbstractServiceTest


001 </pre>
002 /*
003  * Copyright (C) 2009 The Guava Authors
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  */
017  
018 package com.google.common.util.concurrent;
019  
020 import static java.lang.Thread.currentThread;
021 import static java.util.concurrent.TimeUnit.SECONDS;
022  
023 import com.google.common.collect.ImmutableList;
024 import com.google.common.collect.Iterables;
025 import com.google.common.collect.Lists;
026 import com.google.common.util.concurrent.Service.Listener;
027 import com.google.common.util.concurrent.Service.State;
028  
029 import junit.framework.TestCase;
030  
031 import java.lang.Thread.UncaughtExceptionHandler;
032 import java.util.List;
033 import java.util.concurrent.CountDownLatch;
034 import java.util.concurrent.TimeUnit;
035 import java.util.concurrent.atomic.AtomicInteger;
036 import java.util.concurrent.atomic.AtomicReference;
037  
038 import javax.annotation.concurrent.GuardedBy;
039  
040 /**
041  * Unit test for {@link AbstractService}.
042  *
043  * @author Jesse Wilson
044  */
045 public class AbstractServiceTest extends TestCase {
046  
047 private Thread executionThread;
048  private Throwable thrownByExecutionThread;
049  
050 public void testNoOpServiceStartStop() throws Exception {
051  NoOpService service = new NoOpService();
052  RecordingListener listener = RecordingListener.record(service);
053  
054 assertEquals(State.NEW, service.state());
055  assertFalse(service.isRunning());
056  assertFalse(service.running);
057  
058 service.startAsync();
059  assertEquals(State.RUNNING, service.state());
060  assertTrue(service.isRunning());
061  assertTrue(service.running);
062  
063 service.stopAsync();
064  assertEquals(State.TERMINATED, service.state());
065  assertFalse(service.isRunning());
066  assertFalse(service.running);
067  assertEquals(
068  ImmutableList.of(
069  State.STARTING,
070  State.RUNNING,
071  State.STOPPING,
072  State.TERMINATED),
073  listener.getStateHistory());
074  }
075  
076 public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
077  NoOpService service = new NoOpService();
078  
079 service.startAsync().awaitRunning();
080  assertEquals(State.RUNNING, service.state());
081  
082 service.stopAsync().awaitTerminated();
083  assertEquals(State.TERMINATED, service.state());
084  }
085  
086 public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
087  NoOpService service = new NoOpService();
088  
089 service.startAsync().awaitRunning();
090  assertEquals(State.RUNNING, service.state());
091  
092 service.stopAsync().awaitTerminated();
093  assertEquals(State.TERMINATED, service.state());
094  }
095  
096 public void testNoOpServiceStopIdempotence() throws Exception {
097  NoOpService service = new NoOpService();
098  RecordingListener listener = RecordingListener.record(service);
099  service.startAsync().awaitRunning();
100  assertEquals(State.RUNNING, service.state());
101  
102 service.stopAsync();
103  service.stopAsync();
104  assertEquals(State.TERMINATED, service.state());
105  assertEquals(
106  ImmutableList.of(
107  State.STARTING,
108  State.RUNNING,
109  State.STOPPING,
110  State.TERMINATED),
111  listener.getStateHistory());
112  }
113  
114 public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
115  NoOpService service = new NoOpService();
116  
117 service.startAsync().awaitRunning();
118  
119 service.stopAsync().awaitTerminated();
120  service.stopAsync();
121  assertEquals(State.TERMINATED, service.state());
122  }
123  
124 public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
125  NoOpService service = new NoOpService();
126  
127 service.startAsync().awaitRunning();
128  assertEquals(State.RUNNING, service.state());
129  
130 service.stopAsync().awaitTerminated();
131  service.stopAsync().awaitTerminated();
132  assertEquals(State.TERMINATED, service.state());
133  }
134  
135 public void testNoOpServiceStartStopAndWaitUninterruptible()
136  throws Exception {
137  NoOpService service = new NoOpService();
138  
139 currentThread().interrupt();
140  try {
141  service.startAsync().awaitRunning();
142  assertEquals(State.RUNNING, service.state());
143  
144 service.stopAsync().awaitTerminated();
145  assertEquals(State.TERMINATED, service.state());
146  
147 assertTrue(currentThread().isInterrupted());
148  } finally {
149  Thread.interrupted(); // clear interrupt for future tests
150  }
151  }
152  
153 private static class NoOpService extends AbstractService {
154  boolean running = false;
155  
156 @Override protected void doStart() {
157  assertFalse(running);
158  running = true;
159  notifyStarted();
160  }
161  
162 @Override protected void doStop() {
163  assertTrue(running);
164  running = false;
165  notifyStopped();
166  }
167  }
168  
169 public void testManualServiceStartStop() throws Exception {
170  ManualSwitchedService service = new ManualSwitchedService();
171  RecordingListener listener = RecordingListener.record(service);
172  
173 service.startAsync();
174  assertEquals(State.STARTING, service.state());
175  assertFalse(service.isRunning());
176  assertTrue(service.doStartCalled);
177  
178 service.notifyStarted(); // usually this would be invoked by another thread
179  assertEquals(State.RUNNING, service.state());
180  assertTrue(service.isRunning());
181  
182 service.stopAsync();
183  assertEquals(State.STOPPING, service.state());
184  assertFalse(service.isRunning());
185  assertTrue(service.doStopCalled);
186  
187 service.notifyStopped(); // usually this would be invoked by another thread
188  assertEquals(State.TERMINATED, service.state());
189  assertFalse(service.isRunning());
190  assertEquals(
191  ImmutableList.of(
192  State.STARTING,
193  State.RUNNING,
194  State.STOPPING,
195  State.TERMINATED),
196  listener.getStateHistory());
197  
198 }
199  
200 public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
201  ManualSwitchedService service = new ManualSwitchedService();
202  RecordingListener listener = RecordingListener.record(service);
203  
204 service.startAsync();
205  service.notifyStarted();
206  service.notifyStopped();
207  assertEquals(State.TERMINATED, service.state());
208  assertFalse(service.isRunning());
209  assertFalse(service.doStopCalled);
210  
211 assertEquals(
212  ImmutableList.of(
213  State.STARTING,
214  State.RUNNING,
215  State.TERMINATED),
216  listener.getStateHistory());
217  }
218  
219 public void testManualServiceStopWhileStarting() throws Exception {
220  ManualSwitchedService service = new ManualSwitchedService();
221  RecordingListener listener = RecordingListener.record(service);
222  
223 service.startAsync();
224  assertEquals(State.STARTING, service.state());
225  assertFalse(service.isRunning());
226  assertTrue(service.doStartCalled);
227  
228 service.stopAsync();
229  assertEquals(State.STOPPING, service.state());
230  assertFalse(service.isRunning());
231  assertFalse(service.doStopCalled);
232  
233 service.notifyStarted();
234  assertEquals(State.STOPPING, service.state());
235  assertFalse(service.isRunning());
236  assertTrue(service.doStopCalled);
237  
238 service.notifyStopped();
239  assertEquals(State.TERMINATED, service.state());
240  assertFalse(service.isRunning());
241  assertEquals(
242  ImmutableList.of(
243  State.STARTING,
244  State.STOPPING,
245  State.TERMINATED),
246  listener.getStateHistory());
247  }
248  
249 /**
250  * This tests for a bug where if {@link Service#stopAsync()} was called while the service was
251  * {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
252  * called multiple times.
253  */
254  public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
255  ManualSwitchedService service = new ManualSwitchedService();
256  final AtomicInteger stopppingCount = new AtomicInteger();
257  service.addListener(new Listener() {
258  @Override public void stopping(State from) {
259  stopppingCount.incrementAndGet();
260  }
261  }, MoreExecutors.sameThreadExecutor());
262  
263 service.startAsync();
264  service.stopAsync();
265  assertEquals(1, stopppingCount.get());
266  service.stopAsync();
267  assertEquals(1, stopppingCount.get());
268  }
269  
270 public void testManualServiceStopWhileNew() throws Exception {
271  ManualSwitchedService service = new ManualSwitchedService();
272  RecordingListener listener = RecordingListener.record(service);
273  
274 service.stopAsync();
275  assertEquals(State.TERMINATED, service.state());
276  assertFalse(service.isRunning());
277  assertFalse(service.doStartCalled);
278  assertFalse(service.doStopCalled);
279  assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
280  }
281  
282 public void testManualServiceFailWhileStarting() throws Exception {
283  ManualSwitchedService service = new ManualSwitchedService();
284  RecordingListener listener = RecordingListener.record(service);
285  service.startAsync();
286  service.notifyFailed(EXCEPTION);
287  assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
288  }
289  
290 public void testManualServiceFailWhileRunning() throws Exception {
291  ManualSwitchedService service = new ManualSwitchedService();
292  RecordingListener listener = RecordingListener.record(service);
293  service.startAsync();
294  service.notifyStarted();
295  service.notifyFailed(EXCEPTION);
296  assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
297  listener.getStateHistory());
298  }
299  
300 public void testManualServiceFailWhileStopping() throws Exception {
301  ManualSwitchedService service = new ManualSwitchedService();
302  RecordingListener listener = RecordingListener.record(service);
303  service.startAsync();
304  service.notifyStarted();
305  service.stopAsync();
306  service.notifyFailed(EXCEPTION);
307  assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
308  listener.getStateHistory());
309  }
310  
311 public void testManualServiceUnrequestedStop() {
312  ManualSwitchedService service = new ManualSwitchedService();
313  
314 service.startAsync();
315  
316 service.notifyStarted();
317  assertEquals(State.RUNNING, service.state());
318  assertTrue(service.isRunning());
319  assertFalse(service.doStopCalled);
320  
321 service.notifyStopped();
322  assertEquals(State.TERMINATED, service.state());
323  assertFalse(service.isRunning());
324  assertFalse(service.doStopCalled);
325  }
326  
327 /**
328  * The user of this service should call {@link #notifyStarted} and {@link
329  * #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
330  */
331  private static class ManualSwitchedService extends AbstractService {
332  boolean doStartCalled = false;
333  boolean doStopCalled = false;
334  
335 @Override protected void doStart() {
336  assertFalse(doStartCalled);
337  doStartCalled = true;
338  }
339  
340 @Override protected void doStop() {
341  assertFalse(doStopCalled);
342  doStopCalled = true;
343  }
344  }
345  
346 public void testAwaitTerminated() throws Exception {
347  final NoOpService service = new NoOpService();
348  Thread waiter = new Thread() {
349  @Override public void run() {
350  service.awaitTerminated();
351  }
352  };
353  waiter.start();
354  service.startAsync().awaitRunning();
355  assertEquals(State.RUNNING, service.state());
356  service.stopAsync();
357  waiter.join(100); // ensure that the await in the other thread is triggered
358  assertFalse(waiter.isAlive());
359  }
360  
361 public void testAwaitTerminated_FailedService() throws Exception {
362  final ManualSwitchedService service = new ManualSwitchedService();
363  final AtomicReference<Throwable> exception = Atomics.newReference();
364  Thread waiter = new Thread() {
365  @Override public void run() {
366  try {
367  service.awaitTerminated();
368  fail("Expected an IllegalStateException");
369  } catch (Throwable t) {
370  exception.set(t);
371  }
372  }
373  };
374  waiter.start();
375  service.startAsync();
376  service.notifyStarted();
377  assertEquals(State.RUNNING, service.state());
378  service.notifyFailed(EXCEPTION);
379  assertEquals(State.FAILED, service.state());
380  waiter.join(100);
381  assertFalse(waiter.isAlive());
382  assertTrue(exception.get() instanceof IllegalStateException);
383  assertEquals(EXCEPTION, exception.get().getCause());
384  }
385  
386 public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
387  ThreadedService service = new ThreadedService();
388  RecordingListener listener = RecordingListener.record(service);
389  service.startAsync().awaitRunning();
390  assertEquals(State.RUNNING, service.state());
391  
392 service.awaitRunChecks();
393  
394 service.stopAsync().awaitTerminated();
395  assertEquals(State.TERMINATED, service.state());
396  
397 throwIfSet(thrownByExecutionThread);
398  assertEquals(
399  ImmutableList.of(
400  State.STARTING,
401  State.RUNNING,
402  State.STOPPING,
403  State.TERMINATED),
404  listener.getStateHistory());
405  }
406  
407 public void testThreadedServiceStopIdempotence() throws Throwable {
408  ThreadedService service = new ThreadedService();
409  
410 service.startAsync().awaitRunning();
411  assertEquals(State.RUNNING, service.state());
412  
413 service.awaitRunChecks();
414  
415 service.stopAsync();
416  service.stopAsync().awaitTerminated();
417  assertEquals(State.TERMINATED, service.state());
418  
419 throwIfSet(thrownByExecutionThread);
420  }
421  
422 public void testThreadedServiceStopIdempotenceAfterWait()
423  throws Throwable {
424  ThreadedService service = new ThreadedService();
425  
426 service.startAsync().awaitRunning();
427  assertEquals(State.RUNNING, service.state());
428  
429 service.awaitRunChecks();
430  
431 service.stopAsync().awaitTerminated();
432  service.stopAsync();
433  assertEquals(State.TERMINATED, service.state());
434  
435 executionThread.join();
436  
437 throwIfSet(thrownByExecutionThread);
438  }
439  
440 public void testThreadedServiceStopIdempotenceDoubleWait()
441  throws Throwable {
442  ThreadedService service = new ThreadedService();
443  
444 service.startAsync().awaitRunning();
445  assertEquals(State.RUNNING, service.state());
446  
447 service.awaitRunChecks();
448  
449 service.stopAsync().awaitTerminated();
450  service.stopAsync().awaitTerminated();
451  assertEquals(State.TERMINATED, service.state());
452  
453 throwIfSet(thrownByExecutionThread);
454  }
455  
456 public void testManualServiceFailureIdempotence() {
457  ManualSwitchedService service = new ManualSwitchedService();
458  RecordingListener.record(service);
459  service.startAsync();
460  service.notifyFailed(new Exception("1"));
461  service.notifyFailed(new Exception("2"));
462  assertEquals("1", service.failureCause().getMessage());
463  try {
464  service.awaitRunning();
465  fail();
466  } catch (IllegalStateException e) {
467  assertEquals("1", e.getCause().getMessage());
468  }
469  }
470  
471 private class ThreadedService extends AbstractService {
472  final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);
473  
474 /*
475  * The main test thread tries to stop() the service shortly after
476  * confirming that it is running. Meanwhile, the service itself is trying
477  * to confirm that it is running. If the main thread's stop() call happens
478  * before it has the chance, the test will fail. To avoid this, the main
479  * thread calls this method, which waits until the service has performed
480  * its own "running" check.
481  */
482  void awaitRunChecks() throws InterruptedException {
483  assertTrue("Service thread hasn't finished its checks. "
484  + "Exception status (possibly stale): " + thrownByExecutionThread,
485  hasConfirmedIsRunning.await(10, SECONDS));
486  }
487  
488 @Override protected void doStart() {
489  assertEquals(State.STARTING, state());
490  invokeOnExecutionThreadForTest(new Runnable() {
491  @Override public void run() {
492  assertEquals(State.STARTING, state());
493  notifyStarted();
494  assertEquals(State.RUNNING, state());
495  hasConfirmedIsRunning.countDown();
496  }
497  });
498  }
499  
500 @Override protected void doStop() {
501  assertEquals(State.STOPPING, state());
502  invokeOnExecutionThreadForTest(new Runnable() {
503  @Override public void run() {
504  assertEquals(State.STOPPING, state());
505  notifyStopped();
506  assertEquals(State.TERMINATED, state());
507  }
508  });
509  }
510  }
511  
512 private void invokeOnExecutionThreadForTest(Runnable runnable) {
513  executionThread = new Thread(runnable);
514  executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
515  @Override
516  public void uncaughtException(Thread thread, Throwable e) {
517  thrownByExecutionThread = e;
518  }
519  });
520  executionThread.start();
521  }
522  
523 private static void throwIfSet(Throwable t) throws Throwable {
524  if (t != null) {
525  throw t;
526  }
527  }
528  
529 public void testStopUnstartedService() throws Exception {
530  NoOpService service = new NoOpService();
531  RecordingListener listener = RecordingListener.record(service);
532  
533 service.stopAsync();
534  assertEquals(State.TERMINATED, service.state());
535  
536 try {
537  service.startAsync();
538  fail();
539  } catch (IllegalStateException expected) {}
540  assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
541  }
542  
543 public void testFailingServiceStartAndWait() throws Exception {
544  StartFailingService service = new StartFailingService();
545  RecordingListener listener = RecordingListener.record(service);
546  
547 try {
548  service.startAsync().awaitRunning();
549  fail();
550  } catch (IllegalStateException e) {
551  assertEquals(EXCEPTION, service.failureCause());
552  assertEquals(EXCEPTION, e.getCause());
553  }
554  assertEquals(
555  ImmutableList.of(
556  State.STARTING,
557  State.FAILED),
558  listener.getStateHistory());
559  }
560  
561 public void testFailingServiceStopAndWait_stopFailing() throws Exception {
562  StopFailingService service = new StopFailingService();
563  RecordingListener listener = RecordingListener.record(service);
564  
565 service.startAsync().awaitRunning();
566  try {
567  service.stopAsync().awaitTerminated();
568  fail();
569  } catch (IllegalStateException e) {
570  assertEquals(EXCEPTION, service.failureCause());
571  assertEquals(EXCEPTION, e.getCause());
572  }
573  assertEquals(
574  ImmutableList.of(
575  State.STARTING,
576  State.RUNNING,
577  State.STOPPING,
578  State.FAILED),
579  listener.getStateHistory());
580  }
581  
582 public void testFailingServiceStopAndWait_runFailing() throws Exception {
583  RunFailingService service = new RunFailingService();
584  RecordingListener listener = RecordingListener.record(service);
585  
586 service.startAsync();
587  try {
588  service.awaitRunning();
589  fail();
590  } catch (IllegalStateException e) {
591  assertEquals(EXCEPTION, service.failureCause());
592  assertEquals(EXCEPTION, e.getCause());
593  }
594  assertEquals(
595  ImmutableList.of(
596  State.STARTING,
597  State.RUNNING,
598  State.FAILED),
599  listener.getStateHistory());
600  }
601  
602 public void testThrowingServiceStartAndWait() throws Exception {
603  StartThrowingService service = new StartThrowingService();
604  RecordingListener listener = RecordingListener.record(service);
605  
606 try {
607  service.startAsync().awaitRunning();
608  fail();
609  } catch (IllegalStateException e) {
610  assertEquals(service.exception, service.failureCause());
611  assertEquals(service.exception, e.getCause());
612  }
613  assertEquals(
614  ImmutableList.of(
615  State.STARTING,
616  State.FAILED),
617  listener.getStateHistory());
618  }
619  
620 public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
621  StopThrowingService service = new StopThrowingService();
622  RecordingListener listener = RecordingListener.record(service);
623  
624 service.startAsync().awaitRunning();
625  try {
626  service.stopAsync().awaitTerminated();
627  fail();
628  } catch (IllegalStateException e) {
629  assertEquals(service.exception, service.failureCause());
630  assertEquals(service.exception, e.getCause());
631  }
632  assertEquals(
633  ImmutableList.of(
634  State.STARTING,
635  State.RUNNING,
636  State.STOPPING,
637  State.FAILED),
638  listener.getStateHistory());
639  }
640  
641 public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
642  RunThrowingService service = new RunThrowingService();
643  RecordingListener listener = RecordingListener.record(service);
644  
645 service.startAsync();
646  try {
647  service.awaitTerminated();
648  fail();
649  } catch (IllegalStateException e) {
650  assertEquals(service.exception, service.failureCause());
651  assertEquals(service.exception, e.getCause());
652  }
653  assertEquals(
654  ImmutableList.of(
655  State.STARTING,
656  State.RUNNING,
657  State.FAILED),
658  listener.getStateHistory());
659  }
660  
661 public void testFailureCause_throwsIfNotFailed() {
662  StopFailingService service = new StopFailingService();
663  try {
664  service.failureCause();
665  fail();
666  } catch (IllegalStateException e) {
667  // expected
668  }
669  service.startAsync().awaitRunning();
670  try {
671  service.failureCause();
672  fail();
673  } catch (IllegalStateException e) {
674  // expected
675  }
676  try {
677  service.stopAsync().awaitTerminated();
678  fail();
679  } catch (IllegalStateException e) {
680  assertEquals(EXCEPTION, service.failureCause());
681  assertEquals(EXCEPTION, e.getCause());
682  }
683  }
684  
685 public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
686  final StartFailingService service = new StartFailingService();
687  service.startAsync();
688  assertEquals(State.FAILED, service.state());
689  service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
690  Thread thread = new Thread() {
691  @Override public void run() {
692  // Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
693  service.stopAsync();
694  }
695  };
696  thread.start();
697  thread.join(100);
698  assertFalse(thread + " is deadlocked", thread.isAlive());
699  }
700  
701 public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
702  final NoOpThreadedService service = new NoOpThreadedService();
703  service.addListener(new Listener() {
704  @Override public void running() {
705  service.awaitRunning();
706  }
707  }, MoreExecutors.sameThreadExecutor());
708  service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
709  service.stopAsync();
710  }
711  
712 public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
713  final NoOpThreadedService service = new NoOpThreadedService();
714  service.addListener(new Listener() {
715  @Override public void terminated(State from) {
716  service.stopAsync().awaitTerminated();
717  }
718  }, MoreExecutors.sameThreadExecutor());
719  service.startAsync().awaitRunning();
720  
721 Thread thread = new Thread() {
722  @Override public void run() {
723  service.stopAsync().awaitTerminated();
724  }
725  };
726  thread.start();
727  thread.join(100);
728  assertFalse(thread + " is deadlocked", thread.isAlive());
729  }
730  
731 private static class NoOpThreadedService extends AbstractExecutionThreadService {
732  final CountDownLatch latch = new CountDownLatch(1);
733  @Override protected void run() throws Exception {
734  latch.await();
735  }
736  @Override protected void triggerShutdown() {
737  latch.countDown();
738  }
739  }
740  
741 private static class StartFailingService extends AbstractService {
742  @Override protected void doStart() {
743  notifyFailed(EXCEPTION);
744  }
745  
746 @Override protected void doStop() {
747  fail();
748  }
749  }
750  
751 private static class RunFailingService extends AbstractService {
752  @Override protected void doStart() {
753  notifyStarted();
754  notifyFailed(EXCEPTION);
755  }
756  
757 @Override protected void doStop() {
758  fail();
759  }
760  }
761  
762 private static class StopFailingService extends AbstractService {
763  @Override protected void doStart() {
764  notifyStarted();
765  }
766  
767 @Override protected void doStop() {
768  notifyFailed(EXCEPTION);
769  }
770  }
771  
772 private static class StartThrowingService extends AbstractService {
773  
774 final RuntimeException exception = new RuntimeException("deliberate");
775  
776 @Override protected void doStart() {
777  throw exception;
778  }
779  
780 @Override protected void doStop() {
781  fail();
782  }
783  }
784  
785 private static class RunThrowingService extends AbstractService {
786  
787 final RuntimeException exception = new RuntimeException("deliberate");
788  
789 @Override protected void doStart() {
790  notifyStarted();
791  throw exception;
792  }
793  
794 @Override protected void doStop() {
795  fail();
796  }
797  }
798  
799 private static class StopThrowingService extends AbstractService {
800  
801 final RuntimeException exception = new RuntimeException("deliberate");
802  
803 @Override protected void doStart() {
804  notifyStarted();
805  }
806  
807 @Override protected void doStop() {
808  throw exception;
809  }
810  }
811  
812 private static class RecordingListener extends Listener {
813  static RecordingListener record(Service service) {
814  RecordingListener listener = new RecordingListener(service);
815  service.addListener(listener, MoreExecutors.sameThreadExecutor());
816  return listener;
817  }
818  
819 final Service service;
820  
821 RecordingListener(Service service) {
822  this.service = service;
823  }
824  
825 @GuardedBy("this")
826  final List<State> stateHistory = Lists.newArrayList();
827  final CountDownLatch completionLatch = new CountDownLatch(1);
828  
829 ImmutableList<State> getStateHistory() throws Exception {
830  completionLatch.await();
831  synchronized (this) {
832  return ImmutableList.copyOf(stateHistory);
833  }
834  }
835  
836 @Override public synchronized void starting() {
837  assertTrue(stateHistory.isEmpty());
838  assertNotSame(State.NEW, service.state());
839  stateHistory.add(State.STARTING);
840  }
841  
842 @Override public synchronized void running() {
843  assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
844  stateHistory.add(State.RUNNING);
845  service.awaitRunning();
846  assertNotSame(State.STARTING, service.state());
847  }
848  
849 @Override public synchronized void stopping(State from) {
850  assertEquals(from, Iterables.getLast(stateHistory));
851  stateHistory.add(State.STOPPING);
852  if (from == State.STARTING) {
853  try {
854  service.awaitRunning();
855  fail();
856  } catch (IllegalStateException expected) {
857  assertNull(expected.getCause());
858  assertTrue(expected.getMessage().equals(
859  "Expected the service to be RUNNING, but was STOPPING"));
860  }
861  }
862  assertNotSame(from, service.state());
863  }
864  
865 @Override public synchronized void terminated(State from) {
866  assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
867  stateHistory.add(State.TERMINATED);
868  assertEquals(State.TERMINATED, service.state());
869  if (from == State.NEW) {
870  try {
871  service.awaitRunning();
872  fail();
873  } catch (IllegalStateException expected) {
874  assertNull(expected.getCause());
875  assertTrue(expected.getMessage().equals(
876  "Expected the service to be RUNNING, but was TERMINATED"));
877  }
878  }
879  completionLatch.countDown();
880  }
881  
882 @Override public synchronized void failed(State from, Throwable failure) {
883  assertEquals(from, Iterables.getLast(stateHistory));
884  stateHistory.add(State.FAILED);
885  assertEquals(State.FAILED, service.state());
886  assertEquals(failure, service.failureCause());
887  if (from == State.STARTING) {
888  try {
889  service.awaitRunning();
890  fail();
891  } catch (IllegalStateException e) {
892  assertEquals(failure, e.getCause());
893  }
894  }
895  try {
896  service.awaitTerminated();
897  fail();
898  } catch (IllegalStateException e) {
899  assertEquals(failure, e.getCause());
900  }
901  completionLatch.countDown();
902  }
903  }
904  
905 public void testNotifyStartedWhenNotStarting() {
906  AbstractService service = new DefaultService();
907  try {
908  service.notifyStarted();
909  fail();
910  } catch (IllegalStateException expected) {}
911  }
912  
913 public void testNotifyStoppedWhenNotRunning() {
914  AbstractService service = new DefaultService();
915  try {
916  service.notifyStopped();
917  fail();
918  } catch (IllegalStateException expected) {}
919  }
920  
921 public void testNotifyFailedWhenNotStarted() {
922  AbstractService service = new DefaultService();
923  try {
924  service.notifyFailed(new Exception());
925  fail();
926  } catch (IllegalStateException expected) {}
927  }
928  
929 public void testNotifyFailedWhenTerminated() {
930  NoOpService service = new NoOpService();
931  service.startAsync().awaitRunning();
932  service.stopAsync().awaitTerminated();
933  try {
934  service.notifyFailed(new Exception());
935  fail();
936  } catch (IllegalStateException expected) {}
937  }
938  
939 private static class DefaultService extends AbstractService {
940  @Override protected void doStart() {}
941  @Override protected void doStop() {}
942  }
943  
944 private static final Exception EXCEPTION = new Exception();
945 }
946 <pre>

ServiceManagerTest


001 </pre>
002 /*
003  * Copyright (C) 2012 The Guava Authors
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  */
017 package com.google.common.util.concurrent;
018  
019 import static java.util.Arrays.asList;
020  
021 import com.google.common.collect.ImmutableMap;
022 import com.google.common.collect.ImmutableSet;
023 import com.google.common.collect.Lists;
024 import com.google.common.collect.Sets;
025 import com.google.common.testing.NullPointerTester;
026 import com.google.common.testing.TestLogHandler;
027 import com.google.common.util.concurrent.ServiceManager.Listener;
028  
029 import junit.framework.TestCase;
030  
031 import java.util.Arrays;
032 import java.util.Collection;
033 import java.util.List;
034 import java.util.Set;
035 import java.util.concurrent.CountDownLatch;
036 import java.util.concurrent.Executor;
037 import java.util.concurrent.TimeUnit;
038 import java.util.concurrent.TimeoutException;
039 import java.util.logging.Formatter;
040 import java.util.logging.Level;
041 import java.util.logging.LogRecord;
042 import java.util.logging.Logger;
043  
044 /**
045  * Tests for {@link ServiceManager}.
046  *
047  * @author Luke Sandberg
048  * @author Chris Nokleberg
049  */
050 public class ServiceManagerTest extends TestCase {
051  
052 private static class NoOpService extends AbstractService {
053  @Override protected void doStart() {
054  notifyStarted();
055  }
056  
057 @Override protected void doStop() {
058  notifyStopped();
059  }
060  }
061  
062 /*
063  * A NoOp service that will delay the startup and shutdown notification for a configurable amount
064  * of time.
065  */
066  private static class NoOpDelayedSerivce extends NoOpService {
067  private long delay;
068  
069 public NoOpDelayedSerivce(long delay) {
070  this.delay = delay;
071  }
072  
073 @Override protected void doStart() {
074  new Thread() {
075  @Override public void run() {
076  Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
077  notifyStarted();
078  }
079  }.start();
080  }
081  
082 @Override protected void doStop() {
083  new Thread() {
084  @Override public void run() {
085  Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
086  notifyStopped();
087  }
088  }.start();
089  }
090  }
091  
092 private static class FailStartService extends NoOpService {
093  @Override protected void doStart() {
094  notifyFailed(new IllegalStateException("failed"));
095  }
096  }
097  
098 private static class FailRunService extends NoOpService {
099  @Override protected void doStart() {
100  super.doStart();
101  notifyFailed(new IllegalStateException("failed"));
102  }
103  }
104  
105 private static class FailStopService extends NoOpService {
106  @Override protected void doStop() {
107  notifyFailed(new IllegalStateException("failed"));
108  }
109  }
110  
111 public void testServiceStartupTimes() {
112  Service a = new NoOpDelayedSerivce(150);
113  Service b = new NoOpDelayedSerivce(353);
114  ServiceManager serviceManager = new ServiceManager(asList(a, b));
115  serviceManager.startAsync().awaitHealthy();
116  ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes();
117  assertEquals(2, startupTimes.size());
118  assertTrue(startupTimes.get(a) >= 150);
119  assertTrue(startupTimes.get(b) >= 353);
120  }
121  
122 public void testServiceStartStop() {
123  Service a = new NoOpService();
124  Service b = new NoOpService();
125  ServiceManager manager = new ServiceManager(asList(a, b));
126  RecordingListener listener = new RecordingListener();
127  manager.addListener(listener);
128  assertState(manager, Service.State.NEW, a, b);
129  assertFalse(manager.isHealthy());
130  manager.startAsync().awaitHealthy();
131  assertState(manager, Service.State.RUNNING, a, b);
132  assertTrue(manager.isHealthy());
133  assertTrue(listener.healthyCalled);
134  assertFalse(listener.stoppedCalled);
135  assertTrue(listener.failedServices.isEmpty());
136  manager.stopAsync().awaitStopped();
137  assertState(manager, Service.State.TERMINATED, a, b);
138  assertFalse(manager.isHealthy());
139  assertTrue(listener.stoppedCalled);
140  assertTrue(listener.failedServices.isEmpty());
141  }
142  
143 public void testFailStart() throws Exception {
144  Service a = new NoOpService();
145  Service b = new FailStartService();
146  Service c = new NoOpService();
147  Service d = new FailStartService();
148  Service e = new NoOpService();
149  ServiceManager manager = new ServiceManager(asList(a, b, c, d, e));
150  RecordingListener listener = new RecordingListener();
151  manager.addListener(listener);
152  assertState(manager, Service.State.NEW, a, b, c, d, e);
153  try {
154  manager.startAsync().awaitHealthy();
155  fail();
156  } catch (IllegalStateException expected) {
157  }
158  assertFalse(listener.healthyCalled);
159  assertState(manager, Service.State.RUNNING, a, c, e);
160  assertEquals(ImmutableSet.of(b, d), listener.failedServices);
161  assertState(manager, Service.State.FAILED, b, d);
162  assertFalse(manager.isHealthy());
163  
164 manager.stopAsync().awaitStopped();
165  assertFalse(manager.isHealthy());
166  assertFalse(listener.healthyCalled);
167  assertTrue(listener.stoppedCalled);
168  }
169  
170 public void testFailRun() throws Exception {
171  Service a = new NoOpService();
172  Service b = new FailRunService();
173  ServiceManager manager = new ServiceManager(asList(a, b));
174  RecordingListener listener = new RecordingListener();
175  manager.addListener(listener);
176  assertState(manager, Service.State.NEW, a, b);
177  try {
178  manager.startAsync().awaitHealthy();
179  fail();
180  } catch (IllegalStateException expected) {
181  }
182  assertTrue(listener.healthyCalled);
183  assertEquals(ImmutableSet.of(b), listener.failedServices);
184  
185 manager.stopAsync().awaitStopped();
186  assertState(manager, Service.State.FAILED, b);
187  assertState(manager, Service.State.TERMINATED, a);
188  
189 assertTrue(listener.stoppedCalled);
190  }
191  
192 public void testFailStop() throws Exception {
193  Service a = new NoOpService();
194  Service b = new FailStopService();
195  Service c = new NoOpService();
196  ServiceManager manager = new ServiceManager(asList(a, b, c));
197  RecordingListener listener = new RecordingListener();
198  manager.addListener(listener);
199  
200 manager.startAsync().awaitHealthy();
201  assertTrue(listener.healthyCalled);
202  assertFalse(listener.stoppedCalled);
203  manager.stopAsync().awaitStopped();
204  
205 assertTrue(listener.stoppedCalled);
206  assertEquals(ImmutableSet.of(b), listener.failedServices);
207  assertState(manager, Service.State.FAILED, b);
208  assertState(manager, Service.State.TERMINATED, a, c);
209  }
210  
211 public void testToString() throws Exception {
212  Service a = new NoOpService();
213  Service b = new FailStartService();
214  ServiceManager manager = new ServiceManager(asList(a, b));
215  String toString = manager.toString();
216  assertTrue(toString.contains("NoOpService"));
217  assertTrue(toString.contains("FailStartService"));
218  }
219  
220 public void testTimeouts() throws Exception {
221  Service a = new NoOpDelayedSerivce(50);
222  ServiceManager manager = new ServiceManager(asList(a));
223  manager.startAsync();
224  try {
225  manager.awaitHealthy(1, TimeUnit.MILLISECONDS);
226  fail();
227  } catch (TimeoutException expected) {
228  }
229  manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown
230  
231 manager.stopAsync();
232  try {
233  manager.awaitStopped(1, TimeUnit.MILLISECONDS);
234  fail();
235  } catch (TimeoutException expected) {
236  }
237  manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown
238  }
239  
240 /**
241  * This covers a case where if the last service to stop failed then the stopped callback would
242  * never be called.
243  */
244  public void testSingleFailedServiceCallsStopped() {
245  Service a = new FailStartService();
246  ServiceManager manager = new ServiceManager(asList(a));
247  RecordingListener listener = new RecordingListener();
248  manager.addListener(listener);
249  try {
250  manager.startAsync().awaitHealthy();
251  fail();
252  } catch (IllegalStateException expected) {
253  }
254  assertTrue(listener.stoppedCalled);
255  }
256  
257 /**
258  * This covers a bug where listener.healthy would get called when a single service failed during
259  * startup (it occurred in more complicated cases also).
260  */
261  public void testFailStart_singleServiceCallsHealthy() {
262  Service a = new FailStartService();
263  ServiceManager manager = new ServiceManager(asList(a));
264  RecordingListener listener = new RecordingListener();
265  manager.addListener(listener);
266  try {
267  manager.startAsync().awaitHealthy();
268  fail();
269  } catch (IllegalStateException expected) {
270  }
271  assertFalse(listener.healthyCalled);
272  }
273  
274 /**
275  * This covers a bug where if a listener was installed that would stop the manager if any service
276  * fails and something failed during startup before service.start was called on all the services,
277  * then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to
278  * stop the timer(!).
279  */
280  public void testFailStart_stopOthers() throws TimeoutException {
281  Service a = new FailStartService();
282  Service b = new NoOpService();
283  final ServiceManager manager = new ServiceManager(asList(a, b));
284  manager.addListener(new Listener() {
285  @Override public void failure(Service service) {
286  manager.stopAsync();
287  }});
288  manager.startAsync();
289  manager.awaitStopped(10, TimeUnit.MILLISECONDS);
290  }
291  
292 private static void assertState(
293  ServiceManager manager, Service.State state, Service... services) {
294  Collection<Service> managerServices = manager.servicesByState().get(state);
295  for (Service service : services) {
296  assertEquals(service.toString(), state, service.state());
297  assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING);
298  assertTrue(managerServices + " should contain " + service, managerServices.contains(service));
299  }
300  }
301  
302 /**
303  * This is for covering a case where the ServiceManager would behave strangely if constructed
304  * with no service under management. Listeners would never fire because the ServiceManager was
305  * healthy and stopped at the same time. This test ensures that listeners fire and isHealthy
306  * makes sense.
307  */
308  public void testEmptyServiceManager() {
309  Logger logger = Logger.getLogger(ServiceManager.class.getName());
310  logger.setLevel(Level.FINEST);
311  TestLogHandler logHandler = new TestLogHandler();
312  logger.addHandler(logHandler);
313  ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
314  RecordingListener listener = new RecordingListener();
315  manager.addListener(listener, MoreExecutors.sameThreadExecutor());
316  manager.startAsync().awaitHealthy();
317  assertTrue(manager.isHealthy());
318  assertTrue(listener.healthyCalled);
319  assertFalse(listener.stoppedCalled);
320  assertTrue(listener.failedServices.isEmpty());
321  manager.stopAsync().awaitStopped();
322  assertFalse(manager.isHealthy());
323  assertTrue(listener.stoppedCalled);
324  assertTrue(listener.failedServices.isEmpty());
325  // check that our NoOpService is not directly observable via any of the inspection methods or
326  // via logging.
327  assertEquals("ServiceManager{services=[]}", manager.toString());
328  assertTrue(manager.servicesByState().isEmpty());
329  assertTrue(manager.startupTimes().isEmpty());
330  Formatter logFormatter = new Formatter() {
331  @Override public String format(LogRecord record) {
332  return formatMessage(record);
333  }
334  };
335  for (LogRecord record : logHandler.getStoredLogRecords()) {
336  assertFalse(logFormatter.format(record).contains("NoOpService"));
337  }
338  }
339  
340 /**
341  * This is for a case where a long running Listener using the sameThreadListener could deadlock
342  * another thread calling stopAsync().
343  */
344  
345 public void testListenerDeadlock() throws InterruptedException {
346  final CountDownLatch failEnter = new CountDownLatch(1);
347  Service failRunService = new AbstractService() {
348  @Override protected void doStart() {
349  new Thread() {
350  @Override public void run() {
351  notifyStarted();
352  notifyFailed(new Exception("boom"));
353  }
354  }.start();
355  }
356  @Override protected void doStop() {
357  notifyStopped();
358  }
359  };
360  final ServiceManager manager = new ServiceManager(
361  Arrays.asList(failRunService, new NoOpService()));
362  manager.addListener(new ServiceManager.Listener() {
363  @Override public void failure(Service service) {
364  failEnter.countDown();
365  // block forever!
366  Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1));
367  }
368  }, MoreExecutors.sameThreadExecutor());
369  // We do not call awaitHealthy because, due to races, that method may throw an exception. But
370  // we really just want to wait for the thread to be in the failure callback so we wait for that
371  // explicitly instead.
372  manager.startAsync();
373  failEnter.await();
374  assertFalse("State should be updated before calling listeners", manager.isHealthy());
375  // now we want to stop the services.
376  Thread stoppingThread = new Thread() {
377  @Override public void run() {
378  manager.stopAsync().awaitStopped();
379  }
380  };
381  stoppingThread.start();
382  // this should be super fast since the only non stopped service is a NoOpService
383  stoppingThread.join(1000);
384  assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive());
385  }
386  
387 /**
388  * Catches a bug where when constructing a service manager failed, later interactions with the
389  * service could cause IllegalStateExceptions inside the partially constructed ServiceManager.
390  * This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated
391  * the original error (which was not constructing ServiceManager correctly).
392  */
393  public void testPartiallyConstructedManager() {
394  Logger logger = Logger.getLogger("global");
395  logger.setLevel(Level.FINEST);
396  TestLogHandler logHandler = new TestLogHandler();
397  logger.addHandler(logHandler);
398  NoOpService service = new NoOpService();
399  service.startAsync();
400  try {
401  new ServiceManager(Arrays.asList(service));
402  fail();
403  } catch (IllegalArgumentException expected) {}
404  service.stopAsync();
405  // Nothing was logged!
406  assertEquals(0, logHandler.getStoredLogRecords().size());
407  }
408  
409 public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() {
410  // The implementation of this test is pretty sensitive to the implementation <img class="wp-smiley" style="height: 1em; max-height: 1em;" alt=":(" src="http://ifeve.com/wp-includes/images/smilies/frownie.png"> but we want to
411  // ensure that if weird things happen during construction then we get exceptions.
412  final NoOpService service1 = new NoOpService();
413  // This service will start service1 when addListener is called. This simulates service1 being
414  // started asynchronously.
415  Service service2 = new Service() {
416  final NoOpService delegate = new NoOpService();
417  @Override public final void addListener(Listener listener, Executor executor) {
418  service1.startAsync();
419  delegate.addListener(listener, executor);
420  }
421  // Delegates from here on down
422  @Override public final Service startAsync() {
423  return delegate.startAsync();
424  }
425  
426 @Override public final Service stopAsync() {
427  return delegate.stopAsync();
428  }
429  
430 @Override public final ListenableFuture<State> start() {
431  return delegate.start();
432  }
433  
434 @Override public final ListenableFuture<State> stop() {
435  return delegate.stop();
436  }
437  
438 @Override public State startAndWait() {
439  return delegate.startAndWait();
440  }
441  
442 @Override public State stopAndWait() {
443  return delegate.stopAndWait();
444  }
445  
446 @Override public final void awaitRunning() {
447  delegate.awaitRunning();
448  }
449  
450 @Override public final void awaitRunning(long timeout, TimeUnit unit)
451  throws TimeoutException {
452  delegate.awaitRunning(timeout, unit);
453  }
454  
455 @Override public final void awaitTerminated() {
456  delegate.awaitTerminated();
457  }
458  
459 @Override public final void awaitTerminated(long timeout, TimeUnit unit)
460  throws TimeoutException {
461  delegate.awaitTerminated(timeout, unit);
462  }
463  
464 @Override public final boolean isRunning() {
465  return delegate.isRunning();
466  }
467  
468 @Override public final State state() {
469  return delegate.state();
470  }
471  
472 @Override public final Throwable failureCause() {
473  return delegate.failureCause();
474  }
475  };
476  try {
477  new ServiceManager(Arrays.asList(service1, service2));
478  fail();
479  } catch (IllegalArgumentException expected) {
480  assertTrue(expected.getMessage().contains("started transitioning asynchronously"));
481  }
482  }
483  
484 /**
485  * This test is for a case where two Service.Listener callbacks for the same service would call
486  * transitionService in the wrong order due to a race. Due to the fact that it is a race this
487  * test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the
488  * race sneaks back in, and in this case flaky means something is definitely wrong.
489  *
490  * <p>Before the bug was fixed this test would fail at least 30% of the time.
491  */
492  
493 public void testTransitionRace() throws TimeoutException {
494  for (int k = 0; k < 1000; k++) {
495  List<Service> services = Lists.newArrayList();
496  for (int i = 0; i < 5; i++) {
497  services.add(new SnappyShutdownService(i));
498  }
499  ServiceManager manager = new ServiceManager(services);
500  manager.startAsync().awaitHealthy();
501  manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS);
502  }
503  }
504  
505 /**
506  * This service will shutdown very quickly after stopAsync is called and uses a background thread
507  * so that we know that the stopping() listeners will execute on a different thread than the
508  * terminated() listeners.
509  */
510  private static class SnappyShutdownService extends AbstractExecutionThreadService {
511  final int index;
512  final CountDownLatch latch = new CountDownLatch(1);
513  
514 SnappyShutdownService(int index) {
515  this.index = index;
516  }
517  
518 @Override protected void run() throws Exception {
519  latch.await();
520  }
521  
522 @Override protected void triggerShutdown() {
523  latch.countDown();
524  }
525  
526 @Override protected String serviceName() {
527  return this.getClass().getSimpleName() + "[" + index + "]";
528  }
529  }
530  
531 public void testNulls() {
532  ServiceManager manager = new ServiceManager(Arrays.<Service>asList());
533  new NullPointerTester()
534  .setDefault(ServiceManager.Listener.class, new RecordingListener())
535  .testAllPublicInstanceMethods(manager);
536  }
537  
538 private static final class RecordingListener extends ServiceManager.Listener {
539  volatile boolean healthyCalled;
540  volatile boolean stoppedCalled;
541  final Set<Service> failedServices = Sets.newConcurrentHashSet();
542  
543 @Override public void healthy() {
544  healthyCalled = true;
545  }
546  
547 @Override public void stopped() {
548  stoppedCalled = true;
549  }
550  
551 @Override public void failure(Service service) {
552  failedServices.add(service);
553  }
554  }
555 }
556 <pre>
目录
相关文章
|
6月前
|
缓存 Java Maven
深入解析Google Guava库与Spring Retry重试框架
深入解析Google Guava库与Spring Retry重试框架
|
数据可视化 JavaScript 前端开发
Google开源了可视化编程框架Visual Blocks for ML
Visual Blocks for ML是一个由Google开发的开源可视化编程框架。它使你能够在易于使用的无代码图形编辑器中创建ML管道。
238 0
|
7月前
|
JSON Android开发 数据格式
Android框架-Google官方Gson解析,android开发实验报告总结
Android框架-Google官方Gson解析,android开发实验报告总结
|
7月前
|
设计模式 前端开发 JavaScript
AngularJS是一款由Google收购的JavaScript结构框架
【5月更文挑战第2天】AngularJS是Google收购的JavaScript框架,用于构建动态Web应用,基于MVC模式,强调模块化和双向数据绑定。它简化了视图与模型的同步,通过语义化标签和依赖注入提升开发效率。适用于复杂单页面应用(SPA),但不适合DOM操作密集型或性能要求极高的场景。
73 0
|
7月前
google测试框架
google测试框架
44 0
|
关系型数据库 MySQL API
Go语言微服务框架 - 6.用Google风格的API接口打通MySQL操作
随着RPC与MySQL的打通,整个框架已经开始打通了数据的出入口。 接下来,我们就尝试着实现通过RPC请求操作MySQL数据库,打通整个链路,真正地让这个平台实现可用。
51 0
|
JSON Cloud Native 网络协议
gRPC简介: Google的高性能RPC框架
gRPC简介: Google的高性能RPC框架
266 0
|
XML 存储 SQL
Google Architecture Components应用框架初探
Google Architecture Components应用框架初探
328 0
Google Architecture Components应用框架初探
|
Web App开发 存储 前端开发
如何使用 Google CrUX 分析和比较 JS 框架的性能
在美国本土流量前 100 万的站点中(按流量统计),Vue 的性能追平了 React。
176 0
如何使用 Google CrUX 分析和比较 JS 框架的性能
|
XML JSON API
Google官方Fragment页面框架Navigation和XPage开源框架的使用对比
Google官方Fragment页面框架Navigation和XPage开源框架的使用对比
490 0