概述
Guava包里的Service接口用于封装一个服务对象的运行状态、包括start和stop等方法。例如web服务器,RPC服务器、计时器等可以实现这个接口。对此类服务的状态管理并不轻松、需要对服务的开启/关闭进行妥善管理、特别是在多线程环境下尤为复杂。Guava包提供了一些基础类帮助你管理复杂的状态转换逻辑和同步细节。
使用一个服务
一个服务正常生命周期有:
- Service.State.NEW
- Service.State.STARTING
- Service.State.RUNNING
- Service.State.STOPPING
- Service.State.TERMINATED
服务一旦被停止就无法再重新启动了。如果服务在starting、running、stopping状态出现问题、会进入Service.State.FAILED.状态。调用 startAsync()方法可以异步开启一个服务,同时返回this对象形成方法调用链。注意:只有在当前服务的状态是NEW时才能调用startAsync()方法,因此最好在应用中有一个统一的地方初始化相关服务。停止一个服务也是类似的、使用异步方法stopAsync() 。但是不像startAsync(),多次调用这个方法是安全的。这是为了方便处理关闭服务时候的锁竞争问题。
Service也提供了一些方法用于等待服务状态转换的完成:
通过 addListener()方法异步添加监听器。此方法允许你添加一个 Service.Listener 、它会在每次服务状态转换的时候被调用。注意:最好在服务启动之前添加Listener(这时的状态是NEW)、否则之前已发生的状态转换事件是无法在新添加的Listener上被重新触发的。
同步使用awaitRunning()。这个方法不能被打断、不强制捕获异常、一旦服务启动就会返回。如果服务没有成功启动,会抛出IllegalStateException异常。同样的, awaitTerminated() 方法会等待服务达到终止状态(TERMINATED 或者 FAILED)。两个方法都有重载方法允许传入超时时间。
Service 接口本身实现起来会比较复杂、且容易碰到一些捉摸不透的问题。因此我们不推荐直接实现这个接口。而是请继承Guava包里已经封装好的基础抽象类。每个基础类支持一种特定的线程模型。
基础实现类
AbstractIdleService
AbstractIdleService 类简单实现了Service接口、其在running状态时不会执行任何动作–因此在running时也不需要启动线程–但需要处理开启/关闭动作。要实现一个此类的服务,只需继承AbstractIdleService类,然后自己实现startUp() 和shutDown()方法就可以了。
1 |
protected void startUp() { |
2 |
servlets.add( new GcStatsServlet()); |
3 |
} |
4 |
protected void shutDown() {} |
如上面的例子、由于任何请求到GcStatsServlet时已经会有现成线程处理了,所以在服务运行时就不需要做什么额外动作了。
AbstractExecutionThreadService
AbstractExecutionThreadService 通过单线程处理启动、运行、和关闭等操作。你必须重载run()方法,同时需要能响应停止服务的请求。具体的实现可以在一个循环内做处理:
1 |
public void run() { |
2 |
while (isRunning()) { |
3 |
// perform a unit of work |
4 |
} |
5 |
} |
另外,你还可以重载triggerShutdown()方法让run()方法结束返回。
重载startUp()和shutDown()方法是可选的,不影响服务本身状态的管理
01 |
protected void startUp() { |
02 |
dispatcher.listenForConnections(port, queue); |
03 |
} |
04 |
protected void run() { |
05 |
Connection connection; |
06 |
while ((connection = queue.take() != POISON)) { |
07 |
process(connection); |
08 |
} |
09 |
} |
10 |
protected void triggerShutdown() { |
11 |
dispatcher.stopListeningForConnections(queue); |
12 |
queue.put(POISON); |
13 |
} |
start()内部会调用startUp()方法,创建一个线程、然后在线程内调用run()方法。stop()会调用 triggerShutdown()方法并且等待线程终止。
AbstractScheduledService
AbstractScheduledService类用于在运行时处理一些周期性的任务。子类可以实现 runOneIteration()方法定义一个周期执行的任务,以及相应的startUp()和shutDown()方法。为了能够描述执行周期,你需要实现scheduler()方法。通常情况下,你可以使用AbstractScheduledService.Scheduler类提供的两种调度器:newFixedRateSchedule(initialDelay, delay, TimeUnit) 和newFixedDelaySchedule(initialDelay, delay, TimeUnit),类似于JDK并发包中ScheduledExecutorService类提供的两种调度方式。如要自定义schedules则可以使用 CustomScheduler类来辅助实现;具体用法见javadoc。
AbstractService
如需要自定义的线程管理、可以通过扩展 AbstractService类来实现。一般情况下、使用上面的几个实现类就已经满足需求了,但如果在服务执行过程中有一些特定的线程处理需求、则建议继承AbstractService类。
继承AbstractService方法必须实现两个方法.
- doStart(): 首次调用startAsync()时会同时调用doStart(),doStart()内部需要处理所有的初始化工作、如果启动成功则调用notifyStarted()方法;启动失败则调用notifyFailed()
- doStop(): 首次调用stopAsync()会同时调用doStop(),doStop()要做的事情就是停止服务,如果停止成功则调用 notifyStopped()方法;停止失败则调用 notifyFailed()方法。
doStart和doStop方法的实现需要考虑下性能,尽可能的低延迟。如果初始化的开销较大,如读文件,打开网络连接,或者其他任何可能引起阻塞的操作,建议移到另外一个单独的线程去处理。
使用ServiceManager
除了对Service接口提供基础的实现类,Guava还提供了 ServiceManager类使得涉及到多个Service集合的操作更加容易。通过实例化ServiceManager类来创建一个Service集合,你可以通过以下方法来管理它们:
- startAsync() : 将启动所有被管理的服务。如果当前服务的状态都是NEW的话、那么你只能调用该方法一次、这跟 Service#startAsync()是一样的。
- stopAsync() :将停止所有被管理的服务。
- addListener :会添加一个ServiceManager.Listener,在服务状态转换中会调用该Listener
- awaitHealthy() :会等待所有的服务达到Running状态
- awaitStopped():会等待所有服务达到终止状态
检测类的方法有:
- isHealthy() :如果所有的服务处于Running状态、会返回True
- servicesByState():以状态为索引返回当前所有服务的快照
- startupTimes() :返回一个Map对象,记录被管理的服务启动的耗时、以毫秒为单位,同时Map默认按启动时间排序。
我们建议整个服务的生命周期都能通过ServiceManager来管理,不过即使状态转换是通过其他机制触发的、也不影响ServiceManager方法的正确执行。例如:当一个服务不是通过startAsync()、而是其他机制启动时,listeners 仍然可以被正常调用、awaitHealthy()也能够正常工作。ServiceManager 唯一强制的要求是当其被创建时所有的服务必须处于New状态。
附:TestCase、也可以作为练习Demo
ServiceTest
01 |
</pre> |
02 |
/* |
03 |
* Copyright (C) 2013 The Guava Authors |
04 |
* |
05 |
* Licensed under the Apache License, Version 2.0 (the "License"); |
06 |
* you may not use this file except in compliance with the License. |
07 |
* You may obtain a copy of the License at |
08 |
* |
10 |
* |
11 |
* Unless required by applicable law or agreed to in writing, software |
12 |
* distributed under the License is distributed on an "AS IS" BASIS, |
13 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 |
* See the License for the specific language governing permissions and |
15 |
* limitations under the License. |
16 |
*/ |
17 |
18 |
package com.google.common.util.concurrent; |
19 |
20 |
import static com.google.common.util.concurrent.Service.State.FAILED; |
21 |
import static com.google.common.util.concurrent.Service.State.NEW; |
22 |
import static com.google.common.util.concurrent.Service.State.RUNNING; |
23 |
import static com.google.common.util.concurrent.Service.State.STARTING; |
24 |
import static com.google.common.util.concurrent.Service.State.STOPPING; |
25 |
import static com.google.common.util.concurrent.Service.State.TERMINATED; |
26 |
27 |
import junit.framework.TestCase; |
28 |
29 |
/** |
30 |
* Unit tests for {@link Service} |
31 |
*/ |
32 |
public class ServiceTest extends TestCase { |
33 |
34 |
/** Assert on the comparison ordering of the State enum since we guarantee it. */ |
35 |
public void testStateOrdering() { |
36 |
// List every valid (direct) state transition. |
37 |
assertLessThan(NEW, STARTING); |
38 |
assertLessThan(NEW, TERMINATED); |
39 |
40 |
assertLessThan(STARTING, RUNNING); |
41 |
assertLessThan(STARTING, STOPPING); |
42 |
assertLessThan(STARTING, FAILED); |
43 |
44 |
assertLessThan(RUNNING, STOPPING); |
45 |
assertLessThan(RUNNING, FAILED); |
46 |
47 |
assertLessThan(STOPPING, FAILED); |
48 |
assertLessThan(STOPPING, TERMINATED); |
49 |
} |
50 |
51 |
private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) { |
52 |
if (a.compareTo(b) >= 0 ) { |
53 |
fail(String.format( "Expected %s to be less than %s" , a, b)); |
54 |
} |
55 |
} |
56 |
} |
57 |
<pre> |
AbstractIdleServiceTest
001 |
/* |
002 |
* Copyright (C) 2009 The Guava Authors |
003 |
* |
004 |
* Licensed under the Apache License, Version 2.0 (the "License"); |
005 |
* you may not use this file except in compliance with the License. |
006 |
* You may obtain a copy of the License at |
007 |
* |
009 |
* |
010 |
* Unless required by applicable law or agreed to in writing, software |
011 |
* distributed under the License is distributed on an "AS IS" BASIS, |
012 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
013 |
* See the License for the specific language governing permissions and |
014 |
* limitations under the License. |
015 |
*/ |
016 |
017 |
package com.google.common.util.concurrent; |
018 |
019 |
import static org.truth0.Truth.ASSERT; |
020 |
021 |
import com.google.common.collect.Lists; |
022 |
023 |
import junit.framework.TestCase; |
024 |
025 |
import java.util.List; |
026 |
import java.util.concurrent.Executor; |
027 |
import java.util.concurrent.TimeUnit; |
028 |
import java.util.concurrent.TimeoutException; |
029 |
030 |
/** |
031 |
* Tests for {@link AbstractIdleService}. |
032 |
* |
033 |
* @author Chris Nokleberg |
034 |
* @author Ben Yu |
035 |
*/ |
036 |
public class AbstractIdleServiceTest extends TestCase { |
037 |
038 |
// Functional tests using real thread. We only verify publicly visible state. |
039 |
// Interaction assertions are done by the single-threaded unit tests. |
040 |
041 |
public static class FunctionalTest extends TestCase { |
042 |
043 |
private static class DefaultService extends AbstractIdleService { |
044 |
@Override protected void startUp() throws Exception {} |
045 |
@Override protected void shutDown() throws Exception {} |
046 |
} |
047 |
048 |
public void testServiceStartStop() throws Exception { |
049 |
AbstractIdleService service = new DefaultService(); |
050 |
service.startAsync().awaitRunning(); |
051 |
assertEquals(Service.State.RUNNING, service.state()); |
052 |
service.stopAsync().awaitTerminated(); |
053 |
assertEquals(Service.State.TERMINATED, service.state()); |
054 |
} |
055 |
056 |
public void testStart_failed() throws Exception { |
057 |
final Exception exception = new Exception( "deliberate" ); |
058 |
AbstractIdleService service = new DefaultService() { |
059 |
@Override protected void startUp() throws Exception { |
060 |
throw exception; |
061 |
} |
062 |
}; |
063 |
try { |
064 |
service.startAsync().awaitRunning(); |
065 |
fail(); |
066 |
} catch (RuntimeException e) { |
067 |
assertSame(exception, e.getCause()); |
068 |
} |
069 |
assertEquals(Service.State.FAILED, service.state()); |
070 |
} |
071 |
072 |
public void testStop_failed() throws Exception { |
073 |
final Exception exception = new Exception( "deliberate" ); |
074 |
AbstractIdleService service = new DefaultService() { |
075 |
@Override protected void shutDown() throws Exception { |
076 |
throw exception; |
077 |
} |
078 |
}; |
079 |
service.startAsync().awaitRunning(); |
080 |
try { |
081 |
service.stopAsync().awaitTerminated(); |
082 |
fail(); |
083 |
} catch (RuntimeException e) { |
084 |
assertSame(exception, e.getCause()); |
085 |
} |
086 |
assertEquals(Service.State.FAILED, service.state()); |
087 |
} |
088 |
} |
089 |
090 |
public void testStart() { |
091 |
TestService service = new TestService(); |
092 |
assertEquals( 0 , service.startUpCalled); |
093 |
service.startAsync().awaitRunning(); |
094 |
assertEquals( 1 , service.startUpCalled); |
095 |
assertEquals(Service.State.RUNNING, service.state()); |
096 |
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder(); |
097 |
} |
098 |
099 |
public void testStart_failed() { |
100 |
final Exception exception = new Exception( "deliberate" ); |
101 |
TestService service = new TestService() { |
102 |
@Override protected void startUp() throws Exception { |
103 |
super .startUp(); |
104 |
throw exception; |
105 |
} |
106 |
}; |
107 |
assertEquals( 0 , service.startUpCalled); |
108 |
try { |
109 |
service.startAsync().awaitRunning(); |
110 |
fail(); |
111 |
} catch (RuntimeException e) { |
112 |
assertSame(exception, e.getCause()); |
113 |
} |
114 |
assertEquals( 1 , service.startUpCalled); |
115 |
assertEquals(Service.State.FAILED, service.state()); |
116 |
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder(); |
117 |
} |
118 |
119 |
public void testStop_withoutStart() { |
120 |
TestService service = new TestService(); |
121 |
service.stopAsync().awaitTerminated(); |
122 |
assertEquals( 0 , service.startUpCalled); |
123 |
assertEquals( 0 , service.shutDownCalled); |
124 |
assertEquals(Service.State.TERMINATED, service.state()); |
125 |
ASSERT.that(service.transitionStates).isEmpty(); |
126 |
} |
127 |
128 |
public void testStop_afterStart() { |
129 |
TestService service = new TestService(); |
130 |
service.startAsync().awaitRunning(); |
131 |
assertEquals( 1 , service.startUpCalled); |
132 |
assertEquals( 0 , service.shutDownCalled); |
133 |
service.stopAsync().awaitTerminated(); |
134 |
assertEquals( 1 , service.startUpCalled); |
135 |
assertEquals( 1 , service.shutDownCalled); |
136 |
assertEquals(Service.State.TERMINATED, service.state()); |
137 |
ASSERT.that(service.transitionStates) |
138 |
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder(); |
139 |
} |
140 |
141 |
public void testStop_failed() { |
142 |
final Exception exception = new Exception( "deliberate" ); |
143 |
TestService service = new TestService() { |
144 |
@Override protected void shutDown() throws Exception { |
145 |
super .shutDown(); |
146 |
throw exception; |
147 |
} |
148 |
}; |
149 |
service.startAsync().awaitRunning(); |
150 |
assertEquals( 1 , service.startUpCalled); |
151 |
assertEquals( 0 , service.shutDownCalled); |
152 |
try { |
153 |
service.stopAsync().awaitTerminated(); |
154 |
fail(); |
155 |
} catch (RuntimeException e) { |
156 |
assertSame(exception, e.getCause()); |
157 |
} |
158 |
assertEquals( 1 , service.startUpCalled); |
159 |
assertEquals( 1 , service.shutDownCalled); |
160 |
assertEquals(Service.State.FAILED, service.state()); |
161 |
ASSERT.that(service.transitionStates) |
162 |
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder(); |
163 |
} |
164 |
165 |
public void testServiceToString() { |
166 |
AbstractIdleService service = new TestService(); |
167 |
assertEquals( "TestService [NEW]" , service.toString()); |
168 |
service.startAsync().awaitRunning(); |
169 |
assertEquals( "TestService [RUNNING]" , service.toString()); |
170 |
service.stopAsync().awaitTerminated(); |
171 |
assertEquals( "TestService [TERMINATED]" , service.toString()); |
172 |
} |
173 |
174 |
public void testTimeout() throws Exception { |
175 |
// Create a service whose executor will never run its commands |
176 |
Service service = new TestService() { |
177 |
@Override protected Executor executor() { |
178 |
return new Executor() { |
179 |
@Override public void execute(Runnable command) {} |
180 |
}; |
181 |
} |
182 |
}; |
183 |
try { |
184 |
service.startAsync().awaitRunning( 1 , TimeUnit.MILLISECONDS); |
185 |
fail( "Expected timeout" ); |
186 |
} catch (TimeoutException e) { |
187 |
ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString()); |
188 |
} |
189 |
} |
190 |
191 |
private static class TestService extends AbstractIdleService { |
192 |
int startUpCalled = 0 ; |
193 |
int shutDownCalled = 0 ; |
194 |
final List<State> transitionStates = Lists.newArrayList(); |
195 |
196 |
@Override protected void startUp() throws Exception { |
197 |
assertEquals( 0 , startUpCalled); |
198 |
assertEquals( 0 , shutDownCalled); |
199 |
startUpCalled++; |
200 |
assertEquals(State.STARTING, state()); |
201 |
} |
202 |
203 |
@Override protected void shutDown() throws Exception { |
204 |
assertEquals( 1 , startUpCalled); |
205 |
assertEquals( 0 , shutDownCalled); |
206 |
shutDownCalled++; |
207 |
assertEquals(State.STOPPING, state()); |
208 |
} |
209 |
210 |
@Override protected Executor executor() { |
211 |
transitionStates.add(state()); |
212 |
return MoreExecutors.sameThreadExecutor(); |
213 |
} |
214 |
} |
215 |
} |
216 |
217 |
<pre> |
AbstractScheduledServiceTest
001 |
</pre> |
002 |
/* |
003 |
* Copyright (C) 2011 The Guava Authors |
004 |
* |
005 |
* Licensed under the Apache License, Version 2.0 (the "License"); |
006 |
* you may not use this file except in compliance with the License. |
007 |
* You may obtain a copy of the License at |
008 |
* |
010 |
* |
011 |
* Unless required by applicable law or agreed to in writing, software |
012 |
* distributed under the License is distributed on an "AS IS" BASIS, |
013 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
014 |
* See the License for the specific language governing permissions and |
015 |
* limitations under the License. |
016 |
*/ |
017 |
018 |
package com.google.common.util.concurrent; |
019 |
020 |
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; |
021 |
import com.google.common.util.concurrent.Service.State; |
022 |
023 |
import junit.framework.TestCase; |
024 |
025 |
import java.util.concurrent.CountDownLatch; |
026 |
import java.util.concurrent.CyclicBarrier; |
027 |
import java.util.concurrent.ExecutionException; |
028 |
import java.util.concurrent.Executors; |
029 |
import java.util.concurrent.Future; |
030 |
import java.util.concurrent.ScheduledExecutorService; |
031 |
import java.util.concurrent.ScheduledFuture; |
032 |
import java.util.concurrent.ScheduledThreadPoolExecutor; |
033 |
import java.util.concurrent.TimeUnit; |
034 |
import java.util.concurrent.atomic.AtomicBoolean; |
035 |
import java.util.concurrent.atomic.AtomicInteger; |
036 |
037 |
/** |
038 |
* Unit test for {@link AbstractScheduledService}. |
039 |
* |
040 |
* @author Luke Sandberg |
041 |
*/ |
042 |
043 |
public class AbstractScheduledServiceTest extends TestCase { |
044 |
045 |
volatile Scheduler configuration = Scheduler.newFixedDelaySchedule( 0 , 10 , TimeUnit.MILLISECONDS); |
046 |
volatile ScheduledFuture<?> future = null ; |
047 |
048 |
volatile boolean atFixedRateCalled = false ; |
049 |
volatile boolean withFixedDelayCalled = false ; |
050 |
volatile boolean scheduleCalled = false ; |
051 |
052 |
final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor( 10 ) { |
053 |
@Override |
054 |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, |
055 |
long delay, TimeUnit unit) { |
056 |
return future = super .scheduleWithFixedDelay(command, initialDelay, delay, unit); |
057 |
} |
058 |
}; |
059 |
060 |
public void testServiceStartStop() throws Exception { |
061 |
NullService service = new NullService(); |
062 |
service.startAsync().awaitRunning(); |
063 |
assertFalse(future.isDone()); |
064 |
service.stopAsync().awaitTerminated(); |
065 |
assertTrue(future.isCancelled()); |
066 |
} |
067 |
068 |
private class NullService extends AbstractScheduledService { |
069 |
@Override protected void runOneIteration() throws Exception {} |
070 |
@Override protected Scheduler scheduler() { return configuration; } |
071 |
@Override protected ScheduledExecutorService executor() { return executor; } |
072 |
} |
073 |
074 |
public void testFailOnExceptionFromRun() throws Exception { |
075 |
TestService service = new TestService(); |
076 |
service.runException = new Exception(); |
077 |
service.startAsync().awaitRunning(); |
078 |
service.runFirstBarrier.await(); |
079 |
service.runSecondBarrier.await(); |
080 |
try { |
081 |
future.get(); |
082 |
fail(); |
083 |
} catch (ExecutionException e) { |
084 |
// An execution exception holds a runtime exception (from throwables.propogate) that holds our |
085 |
// original exception. |
086 |
assertEquals(service.runException, e.getCause().getCause()); |
087 |
} |
088 |
assertEquals(service.state(), Service.State.FAILED); |
089 |
} |
090 |
091 |
public void testFailOnExceptionFromStartUp() { |
092 |
TestService service = new TestService(); |
093 |
service.startUpException = new Exception(); |
094 |
try { |
095 |
service.startAsync().awaitRunning(); |
096 |
fail(); |
097 |
} catch (IllegalStateException e) { |
098 |
assertEquals(service.startUpException, e.getCause()); |
099 |
} |
100 |
assertEquals( 0 , service.numberOfTimesRunCalled.get()); |
101 |
assertEquals(Service.State.FAILED, service.state()); |
102 |
} |
103 |
104 |
public void testFailOnExceptionFromShutDown() throws Exception { |
105 |
TestService service = new TestService(); |
106 |
service.shutDownException = new Exception(); |
107 |
service.startAsync().awaitRunning(); |
108 |
service.runFirstBarrier.await(); |
109 |
service.stopAsync(); |
110 |
service.runSecondBarrier.await(); |
111 |
try { |
112 |
service.awaitTerminated(); |
113 |
fail(); |
114 |
} catch (IllegalStateException e) { |
115 |
assertEquals(service.shutDownException, e.getCause()); |
116 |
} |
117 |
assertEquals(Service.State.FAILED, service.state()); |
118 |
} |
119 |
120 |
public void testRunOneIterationCalledMultipleTimes() throws Exception { |
121 |
TestService service = new TestService(); |
122 |
service.startAsync().awaitRunning(); |
123 |
for ( int i = 1 ; i < 10 ; i++) { |
124 |
service.runFirstBarrier.await(); |
125 |
assertEquals(i, service.numberOfTimesRunCalled.get()); |
126 |
service.runSecondBarrier.await(); |
127 |
} |
128 |
service.runFirstBarrier.await(); |
129 |
service.stopAsync(); |
130 |
service.runSecondBarrier.await(); |
131 |
service.stopAsync().awaitTerminated(); |
132 |
} |
133 |
134 |
public void testExecutorOnlyCalledOnce() throws Exception { |
135 |
TestService service = new TestService(); |
136 |
service.startAsync().awaitRunning(); |
137 |
// It should be called once during startup. |
138 |
assertEquals( 1 , service.numberOfTimesExecutorCalled.get()); |
139 |
for ( int i = 1 ; i < 10 ; i++) { |
140 |
service.runFirstBarrier.await(); |
141 |
assertEquals(i, service.numberOfTimesRunCalled.get()); |
142 |
service.runSecondBarrier.await(); |
143 |
} |
144 |
service.runFirstBarrier.await(); |
145 |
service.stopAsync(); |
146 |
service.runSecondBarrier.await(); |
147 |
service.stopAsync().awaitTerminated(); |
148 |
// Only called once overall. |
149 |
assertEquals( 1 , service.numberOfTimesExecutorCalled.get()); |
150 |
} |
151 |
152 |
public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { |
153 |
final CountDownLatch terminationLatch = new CountDownLatch( 1 ); |
154 |
AbstractScheduledService service = new AbstractScheduledService() { |
155 |
volatile ScheduledExecutorService executorService; |
156 |
@Override protected void runOneIteration() throws Exception {} |
157 |
158 |
@Override protected ScheduledExecutorService executor() { |
159 |
if (executorService == null ) { |
160 |
executorService = super .executor(); |
161 |
// Add a listener that will be executed after the listener that shuts down the executor. |
162 |
addListener( new Listener() { |
163 |
@Override public void terminated(State from) { |
164 |
terminationLatch.countDown(); |
165 |
} |
166 |
}, MoreExecutors.sameThreadExecutor()); |
167 |
} |
168 |
return executorService; |
169 |
} |
170 |
171 |
@Override protected Scheduler scheduler() { |
172 |
return Scheduler.newFixedDelaySchedule( 0 , 1 , TimeUnit.MILLISECONDS); |
173 |
}}; |
174 |
175 |
service.startAsync(); |
176 |
assertFalse(service.executor().isShutdown()); |
177 |
service.awaitRunning(); |
178 |
service.stopAsync(); |
179 |
terminationLatch.await(); |
180 |
assertTrue(service.executor().isShutdown()); |
181 |
assertTrue(service.executor().awaitTermination( 100 , TimeUnit.MILLISECONDS)); |
182 |
} |
183 |
184 |
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { |
185 |
final CountDownLatch failureLatch = new CountDownLatch( 1 ); |
186 |
AbstractScheduledService service = new AbstractScheduledService() { |
187 |
volatile ScheduledExecutorService executorService; |
188 |
@Override protected void runOneIteration() throws Exception {} |
189 |
190 |
@Override protected void startUp() throws Exception { |
191 |
throw new Exception( "Failed" ); |
192 |
} |
193 |
194 |
@Override protected ScheduledExecutorService executor() { |
195 |
if (executorService == null ) { |
196 |
executorService = super .executor(); |
197 |
// Add a listener that will be executed after the listener that shuts down the executor. |
198 |
addListener( new Listener() { |
199 |
@Override public void failed(State from, Throwable failure) { |
200 |
failureLatch.countDown(); |
201 |
} |
202 |
}, MoreExecutors.sameThreadExecutor()); |
203 |
} |
204 |
return executorService; |
205 |
} |
206 |
207 |
@Override protected Scheduler scheduler() { |
208 |
return Scheduler.newFixedDelaySchedule( 0 , 1 , TimeUnit.MILLISECONDS); |
209 |
}}; |
210 |
211 |
try { |
212 |
service.startAsync().awaitRunning(); |
213 |
fail( "Expected service to fail during startup" ); |
214 |
} catch (IllegalStateException expected) {} |
215 |
failureLatch.await(); |
216 |
assertTrue(service.executor().isShutdown()); |
217 |
assertTrue(service.executor().awaitTermination( 100 , TimeUnit.MILLISECONDS)); |
218 |
} |
219 |
220 |
public void testSchedulerOnlyCalledOnce() throws Exception { |
221 |
TestService service = new TestService(); |
222 |
service.startAsync().awaitRunning(); |
223 |
// It should be called once during startup. |
224 |
assertEquals( 1 , service.numberOfTimesSchedulerCalled.get()); |
225 |
for ( int i = 1 ; i < 10 ; i++) { |
226 |
service.runFirstBarrier.await(); |
227 |
assertEquals(i, service.numberOfTimesRunCalled.get()); |
228 |
service.runSecondBarrier.await(); |
229 |
} |
230 |
service.runFirstBarrier.await(); |
231 |
service.stopAsync(); |
232 |
service.runSecondBarrier.await(); |
233 |
service.awaitTerminated(); |
234 |
// Only called once overall. |
235 |
assertEquals( 1 , service.numberOfTimesSchedulerCalled.get()); |
236 |
} |
237 |
238 |
private class TestService extends AbstractScheduledService { |
239 |
CyclicBarrier runFirstBarrier = new CyclicBarrier( 2 ); |
240 |
CyclicBarrier runSecondBarrier = new CyclicBarrier( 2 ); |
241 |
242 |
volatile boolean startUpCalled = false ; |
243 |
volatile boolean shutDownCalled = false ; |
244 |
AtomicInteger numberOfTimesRunCalled = new AtomicInteger( 0 ); |
245 |
AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger( 0 ); |
246 |
AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger( 0 ); |
247 |
volatile Exception runException = null ; |
248 |
volatile Exception startUpException = null ; |
249 |
volatile Exception shutDownException = null ; |
250 |
251 |
@Override |
252 |
protected void runOneIteration() throws Exception { |
253 |
assertTrue(startUpCalled); |
254 |
assertFalse(shutDownCalled); |
255 |
numberOfTimesRunCalled.incrementAndGet(); |
256 |
assertEquals(State.RUNNING, state()); |
257 |
runFirstBarrier.await(); |
258 |
runSecondBarrier.await(); |
259 |
if (runException != null ) { |
260 |
throw runException; |
261 |
} |
262 |
} |
263 |
264 |
@Override |
265 |
protected void startUp() throws Exception { |
266 |
assertFalse(startUpCalled); |
267 |
assertFalse(shutDownCalled); |
268 |
startUpCalled = true ; |
269 |
assertEquals(State.STARTING, state()); |
270 |
if (startUpException != null ) { |
271 |
throw startUpException; |
272 |
} |
273 |
} |
274 |
275 |
@Override |
276 |
protected void shutDown() throws Exception { |
277 |
assertTrue(startUpCalled); |
278 |
assertFalse(shutDownCalled); |
279 |
shutDownCalled = true ; |
280 |
if (shutDownException != null ) { |
281 |
throw shutDownException; |
282 |
} |
283 |
} |
284 |
285 |
@Override |
286 |
protected ScheduledExecutorService executor() { |
287 |
numberOfTimesExecutorCalled.incrementAndGet(); |
288 |
return executor; |
289 |
} |
290 |
291 |
@Override |
292 |
protected Scheduler scheduler() { |
293 |
numberOfTimesSchedulerCalled.incrementAndGet(); |
294 |
return configuration; |
295 |
} |
296 |
} |
297 |
298 |
public static class SchedulerTest extends TestCase { |
299 |
// These constants are arbitrary and just used to make sure that the correct method is called |
300 |
// with the correct parameters. |
301 |
private static final int initialDelay = 10 ; |
302 |
private static final int delay = 20 ; |
303 |
private static final TimeUnit unit = TimeUnit.MILLISECONDS; |
304 |
305 |
// Unique runnable object used for comparison. |
306 |
final Runnable testRunnable = new Runnable() { @Override public void run() {}}; |
307 |
boolean called = false ; |
308 |
309 |
private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, |
310 |
long delay, TimeUnit unit) { |
311 |
assertFalse(called); // only called once. |
312 |
called = true ; |
313 |
assertEquals(SchedulerTest.initialDelay, initialDelay); |
314 |
assertEquals(SchedulerTest.delay, delay); |
315 |
assertEquals(SchedulerTest.unit, unit); |
316 |
assertEquals(testRunnable, command); |
317 |
} |
318 |
319 |
public void testFixedRateSchedule() { |
320 |
Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); |
321 |
schedule.schedule( null , new ScheduledThreadPoolExecutor( 1 ) { |
322 |
@Override |
323 |
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, |
324 |
long period, TimeUnit unit) { |
325 |
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); |
326 |
return null ; |
327 |
} |
328 |
}, testRunnable); |
329 |
assertTrue(called); |
330 |
} |
331 |
332 |
public void testFixedDelaySchedule() { |
333 |
Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit); |
334 |
schedule.schedule( null , new ScheduledThreadPoolExecutor( 10 ) { |
335 |
@Override |
336 |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, |
337 |
long delay, TimeUnit unit) { |
338 |
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); |
339 |
return null ; |
340 |
} |
341 |
}, testRunnable); |
342 |
assertTrue(called); |
343 |
} |
344 |
345 |
private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { |
346 |
public AtomicInteger scheduleCounter = new AtomicInteger( 0 ); |
347 |
@Override |
348 |
protected Schedule getNextSchedule() throws Exception { |
349 |
scheduleCounter.incrementAndGet(); |
350 |
return new Schedule( 0 , TimeUnit.SECONDS); |
351 |
} |
352 |
} |
353 |
354 |
public void testCustomSchedule_startStop() throws Exception { |
355 |
final CyclicBarrier firstBarrier = new CyclicBarrier( 2 ); |
356 |
final CyclicBarrier secondBarrier = new CyclicBarrier( 2 ); |
357 |
final AtomicBoolean shouldWait = new AtomicBoolean( true ); |
358 |
Runnable task = new Runnable() { |
359 |
@Override public void run() { |
360 |
try { |
361 |
if (shouldWait.get()) { |
362 |
firstBarrier.await(); |
363 |
secondBarrier.await(); |
364 |
} |
365 |
} catch (Exception e) { |
366 |
throw new RuntimeException(e); |
367 |
} |
368 |
} |
369 |
}; |
370 |
TestCustomScheduler scheduler = new TestCustomScheduler(); |
371 |
Future<?> future = scheduler.schedule( null , Executors.newScheduledThreadPool( 10 ), task); |
372 |
firstBarrier.await(); |
373 |
assertEquals( 1 , scheduler.scheduleCounter.get()); |
374 |
secondBarrier.await(); |
375 |
firstBarrier.await(); |
376 |
assertEquals( 2 , scheduler.scheduleCounter.get()); |
377 |
shouldWait.set( false ); |
378 |
secondBarrier.await(); |
379 |
future.cancel( false ); |
380 |
} |
381 |
382 |
public void testCustomSchedulerServiceStop() throws Exception { |
383 |
TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); |
384 |
service.startAsync().awaitRunning(); |
385 |
service.firstBarrier.await(); |
386 |
assertEquals( 1 , service.numIterations.get()); |
387 |
service.stopAsync(); |
388 |
service.secondBarrier.await(); |
389 |
service.awaitTerminated(); |
390 |
// Sleep for a while just to ensure that our task wasn't called again. |
391 |
Thread.sleep(unit.toMillis( 3 * delay)); |
392 |
assertEquals( 1 , service.numIterations.get()); |
393 |
} |
394 |
395 |
public void testBig() throws Exception { |
396 |
TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { |
397 |
@Override protected Scheduler scheduler() { |
398 |
return new AbstractScheduledService.CustomScheduler() { |
399 |
@Override |
400 |
protected Schedule getNextSchedule() throws Exception { |
401 |
// Explicitly yield to increase the probability of a pathological scheduling. |
402 |
Thread.yield(); |
403 |
return new Schedule( 0 , TimeUnit.SECONDS); |
404 |
} |
405 |
}; |
406 |
} |
407 |
}; |
408 |
service.useBarriers = false ; |
409 |
service.startAsync().awaitRunning(); |
410 |
Thread.sleep( 50 ); |
411 |
service.useBarriers = true ; |
412 |
service.firstBarrier.await(); |
413 |
int numIterations = service.numIterations.get(); |
414 |
service.stopAsync(); |
415 |
service.secondBarrier.await(); |
416 |
service.awaitTerminated(); |
417 |
assertEquals(numIterations, service.numIterations.get()); |
418 |
} |
419 |
420 |
private static class TestAbstractScheduledCustomService extends AbstractScheduledService { |
421 |
final AtomicInteger numIterations = new AtomicInteger( 0 ); |
422 |
volatile boolean useBarriers = true ; |
423 |
final CyclicBarrier firstBarrier = new CyclicBarrier( 2 ); |
424 |
final CyclicBarrier secondBarrier = new CyclicBarrier( 2 ); |
425 |
426 |
@Override protected void runOneIteration() throws Exception { |
427 |
numIterations.incrementAndGet(); |
428 |
if (useBarriers) { |
429 |
firstBarrier.await(); |
430 |
secondBarrier.await(); |
431 |
} |
432 |
} |
433 |
434 |
@Override protected ScheduledExecutorService executor() { |
435 |
// use a bunch of threads so that weird overlapping schedules are more likely to happen. |
436 |
return Executors.newScheduledThreadPool( 10 ); |
437 |
} |
438 |
439 |
@Override protected void startUp() throws Exception {} |
440 |
441 |
@Override protected void shutDown() throws Exception {} |
442 |
443 |
@Override protected Scheduler scheduler() { |
444 |
return new CustomScheduler() { |
445 |
@Override |
446 |
protected Schedule getNextSchedule() throws Exception { |
447 |
return new Schedule(delay, unit); |
448 |
}}; |
449 |
} |
450 |
} |
451 |
452 |
public void testCustomSchedulerFailure() throws Exception { |
453 |
TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); |
454 |
service.startAsync().awaitRunning(); |
455 |
for ( int i = 1 ; i < 4 ; i++) { |
456 |
service.firstBarrier.await(); |
457 |
assertEquals(i, service.numIterations.get()); |
458 |
service.secondBarrier.await(); |
459 |
} |
460 |
Thread.sleep( 1000 ); |
461 |
try { |
462 |
service.stopAsync().awaitTerminated( 100 , TimeUnit.SECONDS); |
463 |
fail(); |
464 |
} catch (IllegalStateException e) { |
465 |
assertEquals(State.FAILED, service.state()); |
466 |
} |
467 |
} |
468 |
469 |
private static class TestFailingCustomScheduledService extends AbstractScheduledService { |
470 |
final AtomicInteger numIterations = new AtomicInteger( 0 ); |
471 |
final CyclicBarrier firstBarrier = new CyclicBarrier( 2 ); |
472 |
final CyclicBarrier secondBarrier = new CyclicBarrier( 2 ); |
473 |
474 |
@Override protected void runOneIteration() throws Exception { |
475 |
numIterations.incrementAndGet(); |
476 |
firstBarrier.await(); |
477 |
secondBarrier.await(); |
478 |
} |
479 |
480 |
@Override protected ScheduledExecutorService executor() { |
481 |
// use a bunch of threads so that weird overlapping schedules are more likely to happen. |
482 |
return Executors.newScheduledThreadPool( 10 ); |
483 |
} |
484 |
485 |
@Override protected Scheduler scheduler() { |
486 |
return new CustomScheduler() { |
487 |
@Override |
488 |
protected Schedule getNextSchedule() throws Exception { |
489 |
if (numIterations.get() > 2 ) { |
490 |
throw new IllegalStateException( "Failed" ); |
491 |
} |
492 |
return new Schedule(delay, unit); |
493 |
}}; |
494 |
} |
495 |
} |
496 |
} |
497 |
} |
498 |
<pre> |
AbstractServiceTest
001 |
</pre> |
002 |
/* |
003 |
* Copyright (C) 2009 The Guava Authors |
004 |
* |
005 |
* Licensed under the Apache License, Version 2.0 (the "License"); |
006 |
* you may not use this file except in compliance with the License. |
007 |
* You may obtain a copy of the License at |
008 |
* |
010 |
* |
011 |
* Unless required by applicable law or agreed to in writing, software |
012 |
* distributed under the License is distributed on an "AS IS" BASIS, |
013 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
014 |
* See the License for the specific language governing permissions and |
015 |
* limitations under the License. |
016 |
*/ |
017 |
018 |
package com.google.common.util.concurrent; |
019 |
020 |
import static java.lang.Thread.currentThread; |
021 |
import static java.util.concurrent.TimeUnit.SECONDS; |
022 |
023 |
import com.google.common.collect.ImmutableList; |
024 |
import com.google.common.collect.Iterables; |
025 |
import com.google.common.collect.Lists; |
026 |
import com.google.common.util.concurrent.Service.Listener; |
027 |
import com.google.common.util.concurrent.Service.State; |
028 |
029 |
import junit.framework.TestCase; |
030 |
031 |
import java.lang.Thread.UncaughtExceptionHandler; |
032 |
import java.util.List; |
033 |
import java.util.concurrent.CountDownLatch; |
034 |
import java.util.concurrent.TimeUnit; |
035 |
import java.util.concurrent.atomic.AtomicInteger; |
036 |
import java.util.concurrent.atomic.AtomicReference; |
037 |
038 |
import javax.annotation.concurrent.GuardedBy; |
039 |
040 |
/** |
041 |
* Unit test for {@link AbstractService}. |
042 |
* |
043 |
* @author Jesse Wilson |
044 |
*/ |
045 |
public class AbstractServiceTest extends TestCase { |
046 |
047 |
private Thread executionThread; |
048 |
private Throwable thrownByExecutionThread; |
049 |
050 |
public void testNoOpServiceStartStop() throws Exception { |
051 |
NoOpService service = new NoOpService(); |
052 |
RecordingListener listener = RecordingListener.record(service); |
053 |
054 |
assertEquals(State.NEW, service.state()); |
055 |
assertFalse(service.isRunning()); |
056 |
assertFalse(service.running); |
057 |
058 |
service.startAsync(); |
059 |
assertEquals(State.RUNNING, service.state()); |
060 |
assertTrue(service.isRunning()); |
061 |
assertTrue(service.running); |
062 |
063 |
service.stopAsync(); |
064 |
assertEquals(State.TERMINATED, service.state()); |
065 |
assertFalse(service.isRunning()); |
066 |
assertFalse(service.running); |
067 |
assertEquals( |
068 |
ImmutableList.of( |
069 |
State.STARTING, |
070 |
State.RUNNING, |
071 |
State.STOPPING, |
072 |
State.TERMINATED), |
073 |
listener.getStateHistory()); |
074 |
} |
075 |
076 |
public void testNoOpServiceStartAndWaitStopAndWait() throws Exception { |
077 |
NoOpService service = new NoOpService(); |
078 |
079 |
service.startAsync().awaitRunning(); |
080 |
assertEquals(State.RUNNING, service.state()); |
081 |
082 |
service.stopAsync().awaitTerminated(); |
083 |
assertEquals(State.TERMINATED, service.state()); |
084 |
} |
085 |
086 |
public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception { |
087 |
NoOpService service = new NoOpService(); |
088 |
089 |
service.startAsync().awaitRunning(); |
090 |
assertEquals(State.RUNNING, service.state()); |
091 |
092 |
service.stopAsync().awaitTerminated(); |
093 |
assertEquals(State.TERMINATED, service.state()); |
094 |
} |
095 |
096 |
public void testNoOpServiceStopIdempotence() throws Exception { |
097 |
NoOpService service = new NoOpService(); |
098 |
RecordingListener listener = RecordingListener.record(service); |
099 |
service.startAsync().awaitRunning(); |
100 |
assertEquals(State.RUNNING, service.state()); |
101 |
102 |
service.stopAsync(); |
103 |
service.stopAsync(); |
104 |
assertEquals(State.TERMINATED, service.state()); |
105 |
assertEquals( |
106 |
ImmutableList.of( |
107 |
State.STARTING, |
108 |
State.RUNNING, |
109 |
State.STOPPING, |
110 |
State.TERMINATED), |
111 |
listener.getStateHistory()); |
112 |
} |
113 |
114 |
public void testNoOpServiceStopIdempotenceAfterWait() throws Exception { |
115 |
NoOpService service = new NoOpService(); |
116 |
117 |
service.startAsync().awaitRunning(); |
118 |
119 |
service.stopAsync().awaitTerminated(); |
120 |
service.stopAsync(); |
121 |
assertEquals(State.TERMINATED, service.state()); |
122 |
} |
123 |
124 |
public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception { |
125 |
NoOpService service = new NoOpService(); |
126 |
127 |
service.startAsync().awaitRunning(); |
128 |
assertEquals(State.RUNNING, service.state()); |
129 |
130 |
service.stopAsync().awaitTerminated(); |
131 |
service.stopAsync().awaitTerminated(); |
132 |
assertEquals(State.TERMINATED, service.state()); |
133 |
} |
134 |
135 |
public void testNoOpServiceStartStopAndWaitUninterruptible() |
136 |
throws Exception { |
137 |
NoOpService service = new NoOpService(); |
138 |
139 |
currentThread().interrupt(); |
140 |
try { |
141 |
service.startAsync().awaitRunning(); |
142 |
assertEquals(State.RUNNING, service.state()); |
143 |
144 |
service.stopAsync().awaitTerminated(); |
145 |
assertEquals(State.TERMINATED, service.state()); |
146 |
147 |
assertTrue(currentThread().isInterrupted()); |
148 |
} finally { |
149 |
Thread.interrupted(); // clear interrupt for future tests |
150 |
} |
151 |
} |
152 |
153 |
private static class NoOpService extends AbstractService { |
154 |
boolean running = false; |
155 |
156 |
@Override protected void doStart() { |
157 |
assertFalse(running); |
158 |
running = true; |
159 |
notifyStarted(); |
160 |
} |
161 |
162 |
@Override protected void doStop() { |
163 |
assertTrue(running); |
164 |
running = false; |
165 |
notifyStopped(); |
166 |
} |
167 |
} |
168 |
169 |
public void testManualServiceStartStop() throws Exception { |
170 |
ManualSwitchedService service = new ManualSwitchedService(); |
171 |
RecordingListener listener = RecordingListener.record(service); |
172 |
173 |
service.startAsync(); |
174 |
assertEquals(State.STARTING, service.state()); |
175 |
assertFalse(service.isRunning()); |
176 |
assertTrue(service.doStartCalled); |
177 |
178 |
service.notifyStarted(); // usually this would be invoked by another thread |
179 |
assertEquals(State.RUNNING, service.state()); |
180 |
assertTrue(service.isRunning()); |
181 |
182 |
service.stopAsync(); |
183 |
assertEquals(State.STOPPING, service.state()); |
184 |
assertFalse(service.isRunning()); |
185 |
assertTrue(service.doStopCalled); |
186 |
187 |
service.notifyStopped(); // usually this would be invoked by another thread |
188 |
assertEquals(State.TERMINATED, service.state()); |
189 |
assertFalse(service.isRunning()); |
190 |
assertEquals( |
191 |
ImmutableList.of( |
192 |
State.STARTING, |
193 |
State.RUNNING, |
194 |
State.STOPPING, |
195 |
State.TERMINATED), |
196 |
listener.getStateHistory()); |
197 |
198 |
} |
199 |
200 |
public void testManualServiceNotifyStoppedWhileRunning() throws Exception { |
201 |
ManualSwitchedService service = new ManualSwitchedService(); |
202 |
RecordingListener listener = RecordingListener.record(service); |
203 |
204 |
service.startAsync(); |
205 |
service.notifyStarted(); |
206 |
service.notifyStopped(); |
207 |
assertEquals(State.TERMINATED, service.state()); |
208 |
assertFalse(service.isRunning()); |
209 |
assertFalse(service.doStopCalled); |
210 |
211 |
assertEquals( |
212 |
ImmutableList.of( |
213 |
State.STARTING, |
214 |
State.RUNNING, |
215 |
State.TERMINATED), |
216 |
listener.getStateHistory()); |
217 |
} |
218 |
219 |
public void testManualServiceStopWhileStarting() throws Exception { |
220 |
ManualSwitchedService service = new ManualSwitchedService(); |
221 |
RecordingListener listener = RecordingListener.record(service); |
222 |
223 |
service.startAsync(); |
224 |
assertEquals(State.STARTING, service.state()); |
225 |
assertFalse(service.isRunning()); |
226 |
assertTrue(service.doStartCalled); |
227 |
228 |
service.stopAsync(); |
229 |
assertEquals(State.STOPPING, service.state()); |
230 |
assertFalse(service.isRunning()); |
231 |
assertFalse(service.doStopCalled); |
232 |
233 |
service.notifyStarted(); |
234 |
assertEquals(State.STOPPING, service.state()); |
235 |
assertFalse(service.isRunning()); |
236 |
assertTrue(service.doStopCalled); |
237 |
238 |
service.notifyStopped(); |
239 |
assertEquals(State.TERMINATED, service.state()); |
240 |
assertFalse(service.isRunning()); |
241 |
assertEquals( |
242 |
ImmutableList.of( |
243 |
State.STARTING, |
244 |
State.STOPPING, |
245 |
State.TERMINATED), |
246 |
listener.getStateHistory()); |
247 |
} |
248 |
249 |
/** |
250 |
* This tests for a bug where if {@link Service#stopAsync()} was called while the service was |
251 |
* {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get |
252 |
* called multiple times. |
253 |
*/ |
254 |
public void testManualServiceStopMultipleTimesWhileStarting() throws Exception { |
255 |
ManualSwitchedService service = new ManualSwitchedService(); |
256 |
final AtomicInteger stopppingCount = new AtomicInteger(); |
257 |
service.addListener(new Listener() { |
258 |
@Override public void stopping(State from) { |
259 |
stopppingCount.incrementAndGet(); |
260 |
} |
261 |
}, MoreExecutors.sameThreadExecutor()); |
262 |
263 |
service.startAsync(); |
264 |
service.stopAsync(); |
265 |
assertEquals(1, stopppingCount.get()); |
266 |
service.stopAsync(); |
267 |
assertEquals(1, stopppingCount.get()); |
268 |
} |
269 |
270 |
public void testManualServiceStopWhileNew() throws Exception { |
271 |
ManualSwitchedService service = new ManualSwitchedService(); |
272 |
RecordingListener listener = RecordingListener.record(service); |
273 |
274 |
service.stopAsync(); |
275 |
assertEquals(State.TERMINATED, service.state()); |
276 |
assertFalse(service.isRunning()); |
277 |
assertFalse(service.doStartCalled); |
278 |
assertFalse(service.doStopCalled); |
279 |
assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory()); |
280 |
} |
281 |
282 |
public void testManualServiceFailWhileStarting() throws Exception { |
283 |
ManualSwitchedService service = new ManualSwitchedService(); |
284 |
RecordingListener listener = RecordingListener.record(service); |
285 |
service.startAsync(); |
286 |
service.notifyFailed(EXCEPTION); |
287 |
assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory()); |
288 |
} |
289 |
290 |
public void testManualServiceFailWhileRunning() throws Exception { |
291 |
ManualSwitchedService service = new ManualSwitchedService(); |
292 |
RecordingListener listener = RecordingListener.record(service); |
293 |
service.startAsync(); |
294 |
service.notifyStarted(); |
295 |
service.notifyFailed(EXCEPTION); |
296 |
assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED), |
297 |
listener.getStateHistory()); |
298 |
} |
299 |
300 |
public void testManualServiceFailWhileStopping() throws Exception { |
301 |
ManualSwitchedService service = new ManualSwitchedService(); |
302 |
RecordingListener listener = RecordingListener.record(service); |
303 |
service.startAsync(); |
304 |
service.notifyStarted(); |
305 |
service.stopAsync(); |
306 |
service.notifyFailed(EXCEPTION); |
307 |
assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED), |
308 |
listener.getStateHistory()); |
309 |
} |
310 |
311 |
public void testManualServiceUnrequestedStop() { |
312 |
ManualSwitchedService service = new ManualSwitchedService(); |
313 |
314 |
service.startAsync(); |
315 |
316 |
service.notifyStarted(); |
317 |
assertEquals(State.RUNNING, service.state()); |
318 |
assertTrue(service.isRunning()); |
319 |
assertFalse(service.doStopCalled); |
320 |
321 |
service.notifyStopped(); |
322 |
assertEquals(State.TERMINATED, service.state()); |
323 |
assertFalse(service.isRunning()); |
324 |
assertFalse(service.doStopCalled); |
325 |
} |
326 |
327 |
/** |
328 |
* The user of this service should call {@link #notifyStarted} and {@link |
329 |
* #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}. |
330 |
*/ |
331 |
private static class ManualSwitchedService extends AbstractService { |
332 |
boolean doStartCalled = false; |
333 |
boolean doStopCalled = false; |
334 |
335 |
@Override protected void doStart() { |
336 |
assertFalse(doStartCalled); |
337 |
doStartCalled = true; |
338 |
} |
339 |
340 |
@Override protected void doStop() { |
341 |
assertFalse(doStopCalled); |
342 |
doStopCalled = true; |
343 |
} |
344 |
} |
345 |
346 |
public void testAwaitTerminated() throws Exception { |
347 |
final NoOpService service = new NoOpService(); |
348 |
Thread waiter = new Thread() { |
349 |
@Override public void run() { |
350 |
service.awaitTerminated(); |
351 |
} |
352 |
}; |
353 |
waiter.start(); |
354 |
service.startAsync().awaitRunning(); |
355 |
assertEquals(State.RUNNING, service.state()); |
356 |
service.stopAsync(); |
357 |
waiter.join(100); // ensure that the await in the other thread is triggered |
358 |
assertFalse(waiter.isAlive()); |
359 |
} |
360 |
361 |
public void testAwaitTerminated_FailedService() throws Exception { |
362 |
final ManualSwitchedService service = new ManualSwitchedService(); |
363 |
final AtomicReference<Throwable> exception = Atomics.newReference(); |
364 |
Thread waiter = new Thread() { |
365 |
@Override public void run() { |
366 |
try { |
367 |
service.awaitTerminated(); |
368 |
fail("Expected an IllegalStateException"); |
369 |
} catch (Throwable t) { |
370 |
exception.set(t); |
371 |
} |
372 |
} |
373 |
}; |
374 |
waiter.start(); |
375 |
service.startAsync(); |
376 |
service.notifyStarted(); |
377 |
assertEquals(State.RUNNING, service.state()); |
378 |
service.notifyFailed(EXCEPTION); |
379 |
assertEquals(State.FAILED, service.state()); |
380 |
waiter.join(100); |
381 |
assertFalse(waiter.isAlive()); |
382 |
assertTrue(exception.get() instanceof IllegalStateException); |
383 |
assertEquals(EXCEPTION, exception.get().getCause()); |
384 |
} |
385 |
386 |
public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable { |
387 |
ThreadedService service = new ThreadedService(); |
388 |
RecordingListener listener = RecordingListener.record(service); |
389 |
service.startAsync().awaitRunning(); |
390 |
assertEquals(State.RUNNING, service.state()); |
391 |
392 |
service.awaitRunChecks(); |
393 |
394 |
service.stopAsync().awaitTerminated(); |
395 |
assertEquals(State.TERMINATED, service.state()); |
396 |
397 |
throwIfSet(thrownByExecutionThread); |
398 |
assertEquals( |
399 |
ImmutableList.of( |
400 |
State.STARTING, |
401 |
State.RUNNING, |
402 |
State.STOPPING, |
403 |
State.TERMINATED), |
404 |
listener.getStateHistory()); |
405 |
} |
406 |
407 |
public void testThreadedServiceStopIdempotence() throws Throwable { |
408 |
ThreadedService service = new ThreadedService(); |
409 |
410 |
service.startAsync().awaitRunning(); |
411 |
assertEquals(State.RUNNING, service.state()); |
412 |
413 |
service.awaitRunChecks(); |
414 |
415 |
service.stopAsync(); |
416 |
service.stopAsync().awaitTerminated(); |
417 |
assertEquals(State.TERMINATED, service.state()); |
418 |
419 |
throwIfSet(thrownByExecutionThread); |
420 |
} |
421 |
422 |
public void testThreadedServiceStopIdempotenceAfterWait() |
423 |
throws Throwable { |
424 |
ThreadedService service = new ThreadedService(); |
425 |
426 |
service.startAsync().awaitRunning(); |
427 |
assertEquals(State.RUNNING, service.state()); |
428 |
429 |
service.awaitRunChecks(); |
430 |
431 |
service.stopAsync().awaitTerminated(); |
432 |
service.stopAsync(); |
433 |
assertEquals(State.TERMINATED, service.state()); |
434 |
435 |
executionThread.join(); |
436 |
437 |
throwIfSet(thrownByExecutionThread); |
438 |
} |
439 |
440 |
public void testThreadedServiceStopIdempotenceDoubleWait() |
441 |
throws Throwable { |
442 |
ThreadedService service = new ThreadedService(); |
443 |
444 |
service.startAsync().awaitRunning(); |
445 |
assertEquals(State.RUNNING, service.state()); |
446 |
447 |
service.awaitRunChecks(); |
448 |
449 |
service.stopAsync().awaitTerminated(); |
450 |
service.stopAsync().awaitTerminated(); |
451 |
assertEquals(State.TERMINATED, service.state()); |
452 |
453 |
throwIfSet(thrownByExecutionThread); |
454 |
} |
455 |
456 |
public void testManualServiceFailureIdempotence() { |
457 |
ManualSwitchedService service = new ManualSwitchedService(); |
458 |
RecordingListener.record(service); |
459 |
service.startAsync(); |
460 |
service.notifyFailed(new Exception("1")); |
461 |
service.notifyFailed(new Exception("2")); |
462 |
assertEquals("1", service.failureCause().getMessage()); |
463 |
try { |
464 |
service.awaitRunning(); |
465 |
fail(); |
466 |
} catch (IllegalStateException e) { |
467 |
assertEquals("1", e.getCause().getMessage()); |
468 |
} |
469 |
} |
470 |
471 |
private class ThreadedService extends AbstractService { |
472 |
final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1); |
473 |
474 |
/* |
475 |
* The main test thread tries to stop() the service shortly after |
476 |
* confirming that it is running. Meanwhile, the service itself is trying |
477 |
* to confirm that it is running. If the main thread's stop() call happens |
478 |
* before it has the chance, the test will fail. To avoid this, the main |
479 |
* thread calls this method, which waits until the service has performed |
480 |
* its own "running" check. |
481 |
*/ |
482 |
void awaitRunChecks() throws InterruptedException { |
483 |
assertTrue( "Service thread hasn't finished its checks. " |
484 |
+ "Exception status (possibly stale): " + thrownByExecutionThread, |
485 |
hasConfirmedIsRunning.await( 10 , SECONDS)); |
486 |
} |
487 |
488 |
@Override protected void doStart() { |
489 |
assertEquals(State.STARTING, state()); |
490 |
invokeOnExecutionThreadForTest( new Runnable() { |
491 |
@Override public void run() { |
492 |
assertEquals(State.STARTING, state()); |
493 |
notifyStarted(); |
494 |
assertEquals(State.RUNNING, state()); |
495 |
hasConfirmedIsRunning.countDown(); |
496 |
} |
497 |
}); |
498 |
} |
499 |
500 |
@Override protected void doStop() { |
501 |
assertEquals(State.STOPPING, state()); |
502 |
invokeOnExecutionThreadForTest( new Runnable() { |
503 |
@Override public void run() { |
504 |
assertEquals(State.STOPPING, state()); |
505 |
notifyStopped(); |
506 |
assertEquals(State.TERMINATED, state()); |
507 |
} |
508 |
}); |
509 |
} |
510 |
} |
511 |
512 |
private void invokeOnExecutionThreadForTest(Runnable runnable) { |
513 |
executionThread = new Thread(runnable); |
514 |
executionThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler() { |
515 |
@Override |
516 |
public void uncaughtException(Thread thread, Throwable e) { |
517 |
thrownByExecutionThread = e; |
518 |
} |
519 |
}); |
520 |
executionThread.start(); |
521 |
} |
522 |
523 |
private static void throwIfSet(Throwable t) throws Throwable { |
524 |
if (t != null ) { |
525 |
throw t; |
526 |
} |
527 |
} |
528 |
529 |
public void testStopUnstartedService() throws Exception { |
530 |
NoOpService service = new NoOpService(); |
531 |
RecordingListener listener = RecordingListener.record(service); |
532 |
533 |
service.stopAsync(); |
534 |
assertEquals(State.TERMINATED, service.state()); |
535 |
536 |
try { |
537 |
service.startAsync(); |
538 |
fail(); |
539 |
} catch (IllegalStateException expected) {} |
540 |
assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory())); |
541 |
} |
542 |
543 |
public void testFailingServiceStartAndWait() throws Exception { |
544 |
StartFailingService service = new StartFailingService(); |
545 |
RecordingListener listener = RecordingListener.record(service); |
546 |
547 |
try { |
548 |
service.startAsync().awaitRunning(); |
549 |
fail(); |
550 |
} catch (IllegalStateException e) { |
551 |
assertEquals(EXCEPTION, service.failureCause()); |
552 |
assertEquals(EXCEPTION, e.getCause()); |
553 |
} |
554 |
assertEquals( |
555 |
ImmutableList.of( |
556 |
State.STARTING, |
557 |
State.FAILED), |
558 |
listener.getStateHistory()); |
559 |
} |
560 |
561 |
public void testFailingServiceStopAndWait_stopFailing() throws Exception { |
562 |
StopFailingService service = new StopFailingService(); |
563 |
RecordingListener listener = RecordingListener.record(service); |
564 |
565 |
service.startAsync().awaitRunning(); |
566 |
try { |
567 |
service.stopAsync().awaitTerminated(); |
568 |
fail(); |
569 |
} catch (IllegalStateException e) { |
570 |
assertEquals(EXCEPTION, service.failureCause()); |
571 |
assertEquals(EXCEPTION, e.getCause()); |
572 |
} |
573 |
assertEquals( |
574 |
ImmutableList.of( |
575 |
State.STARTING, |
576 |
State.RUNNING, |
577 |
State.STOPPING, |
578 |
State.FAILED), |
579 |
listener.getStateHistory()); |
580 |
} |
581 |
582 |
public void testFailingServiceStopAndWait_runFailing() throws Exception { |
583 |
RunFailingService service = new RunFailingService(); |
584 |
RecordingListener listener = RecordingListener.record(service); |
585 |
586 |
service.startAsync(); |
587 |
try { |
588 |
service.awaitRunning(); |
589 |
fail(); |
590 |
} catch (IllegalStateException e) { |
591 |
assertEquals(EXCEPTION, service.failureCause()); |
592 |
assertEquals(EXCEPTION, e.getCause()); |
593 |
} |
594 |
assertEquals( |
595 |
ImmutableList.of( |
596 |
State.STARTING, |
597 |
State.RUNNING, |
598 |
State.FAILED), |
599 |
listener.getStateHistory()); |
600 |
} |
601 |
602 |
public void testThrowingServiceStartAndWait() throws Exception { |
603 |
StartThrowingService service = new StartThrowingService(); |
604 |
RecordingListener listener = RecordingListener.record(service); |
605 |
606 |
try { |
607 |
service.startAsync().awaitRunning(); |
608 |
fail(); |
609 |
} catch (IllegalStateException e) { |
610 |
assertEquals(service.exception, service.failureCause()); |
611 |
assertEquals(service.exception, e.getCause()); |
612 |
} |
613 |
assertEquals( |
614 |
ImmutableList.of( |
615 |
State.STARTING, |
616 |
State.FAILED), |
617 |
listener.getStateHistory()); |
618 |
} |
619 |
620 |
public void testThrowingServiceStopAndWait_stopThrowing() throws Exception { |
621 |
StopThrowingService service = new StopThrowingService(); |
622 |
RecordingListener listener = RecordingListener.record(service); |
623 |
624 |
service.startAsync().awaitRunning(); |
625 |
try { |
626 |
service.stopAsync().awaitTerminated(); |
627 |
fail(); |
628 |
} catch (IllegalStateException e) { |
629 |
assertEquals(service.exception, service.failureCause()); |
630 |
assertEquals(service.exception, e.getCause()); |
631 |
} |
632 |
assertEquals( |
633 |
ImmutableList.of( |
634 |
State.STARTING, |
635 |
State.RUNNING, |
636 |
State.STOPPING, |
637 |
State.FAILED), |
638 |
listener.getStateHistory()); |
639 |
} |
640 |
641 |
public void testThrowingServiceStopAndWait_runThrowing() throws Exception { |
642 |
RunThrowingService service = new RunThrowingService(); |
643 |
RecordingListener listener = RecordingListener.record(service); |
644 |
645 |
service.startAsync(); |
646 |
try { |
647 |
service.awaitTerminated(); |
648 |
fail(); |
649 |
} catch (IllegalStateException e) { |
650 |
assertEquals(service.exception, service.failureCause()); |
651 |
assertEquals(service.exception, e.getCause()); |
652 |
} |
653 |
assertEquals( |
654 |
ImmutableList.of( |
655 |
State.STARTING, |
656 |
State.RUNNING, |
657 |
State.FAILED), |
658 |
listener.getStateHistory()); |
659 |
} |
660 |
661 |
public void testFailureCause_throwsIfNotFailed() { |
662 |
StopFailingService service = new StopFailingService(); |
663 |
try { |
664 |
service.failureCause(); |
665 |
fail(); |
666 |
} catch (IllegalStateException e) { |
667 |
// expected |
668 |
} |
669 |
service.startAsync().awaitRunning(); |
670 |
try { |
671 |
service.failureCause(); |
672 |
fail(); |
673 |
} catch (IllegalStateException e) { |
674 |
// expected |
675 |
} |
676 |
try { |
677 |
service.stopAsync().awaitTerminated(); |
678 |
fail(); |
679 |
} catch (IllegalStateException e) { |
680 |
assertEquals(EXCEPTION, service.failureCause()); |
681 |
assertEquals(EXCEPTION, e.getCause()); |
682 |
} |
683 |
} |
684 |
685 |
public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException { |
686 |
final StartFailingService service = new StartFailingService(); |
687 |
service.startAsync(); |
688 |
assertEquals(State.FAILED, service.state()); |
689 |
service.addListener( new RecordingListener(service), MoreExecutors.sameThreadExecutor()); |
690 |
Thread thread = new Thread() { |
691 |
@Override public void run() { |
692 |
// Internally stopAsync() grabs a lock, this could be any such method on AbstractService. |
693 |
service.stopAsync(); |
694 |
} |
695 |
}; |
696 |
thread.start(); |
697 |
thread.join( 100 ); |
698 |
assertFalse(thread + " is deadlocked" , thread.isAlive()); |
699 |
} |
700 |
701 |
public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception { |
702 |
final NoOpThreadedService service = new NoOpThreadedService(); |
703 |
service.addListener( new Listener() { |
704 |
@Override public void running() { |
705 |
service.awaitRunning(); |
706 |
} |
707 |
}, MoreExecutors.sameThreadExecutor()); |
708 |
service.startAsync().awaitRunning( 10 , TimeUnit.MILLISECONDS); |
709 |
service.stopAsync(); |
710 |
} |
711 |
712 |
public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception { |
713 |
final NoOpThreadedService service = new NoOpThreadedService(); |
714 |
service.addListener( new Listener() { |
715 |
@Override public void terminated(State from) { |
716 |
service.stopAsync().awaitTerminated(); |
717 |
} |
718 |
}, MoreExecutors.sameThreadExecutor()); |
719 |
service.startAsync().awaitRunning(); |
720 |
721 |
Thread thread = new Thread() { |
722 |
@Override public void run() { |
723 |
service.stopAsync().awaitTerminated(); |
724 |
} |
725 |
}; |
726 |
thread.start(); |
727 |
thread.join( 100 ); |
728 |
assertFalse(thread + " is deadlocked" , thread.isAlive()); |
729 |
} |
730 |
731 |
private static class NoOpThreadedService extends AbstractExecutionThreadService { |
732 |
final CountDownLatch latch = new CountDownLatch( 1 ); |
733 |
@Override protected void run() throws Exception { |
734 |
latch.await(); |
735 |
} |
736 |
@Override protected void triggerShutdown() { |
737 |
latch.countDown(); |
738 |
} |
739 |
} |
740 |
741 |
private static class StartFailingService extends AbstractService { |
742 |
@Override protected void doStart() { |
743 |
notifyFailed(EXCEPTION); |
744 |
} |
745 |
746 |
@Override protected void doStop() { |
747 |
fail(); |
748 |
} |
749 |
} |
750 |
751 |
private static class RunFailingService extends AbstractService { |
752 |
@Override protected void doStart() { |
753 |
notifyStarted(); |
754 |
notifyFailed(EXCEPTION); |
755 |
} |
756 |
757 |
@Override protected void doStop() { |
758 |
fail(); |
759 |
} |
760 |
} |
761 |
762 |
private static class StopFailingService extends AbstractService { |
763 |
@Override protected void doStart() { |
764 |
notifyStarted(); |
765 |
} |
766 |
767 |
@Override protected void doStop() { |
768 |
notifyFailed(EXCEPTION); |
769 |
} |
770 |
} |
771 |
772 |
private static class StartThrowingService extends AbstractService { |
773 |
774 |
final RuntimeException exception = new RuntimeException( "deliberate" ); |
775 |
776 |
@Override protected void doStart() { |
777 |
throw exception; |
778 |
} |
779 |
780 |
@Override protected void doStop() { |
781 |
fail(); |
782 |
} |
783 |
} |
784 |
785 |
private static class RunThrowingService extends AbstractService { |
786 |
787 |
final RuntimeException exception = new RuntimeException( "deliberate" ); |
788 |
789 |
@Override protected void doStart() { |
790 |
notifyStarted(); |
791 |
throw exception; |
792 |
} |
793 |
794 |
@Override protected void doStop() { |
795 |
fail(); |
796 |
} |
797 |
} |
798 |
799 |
private static class StopThrowingService extends AbstractService { |
800 |
801 |
final RuntimeException exception = new RuntimeException( "deliberate" ); |
802 |
803 |
@Override protected void doStart() { |
804 |
notifyStarted(); |
805 |
} |
806 |
807 |
@Override protected void doStop() { |
808 |
throw exception; |
809 |
} |
810 |
} |
811 |
812 |
private static class RecordingListener extends Listener { |
813 |
static RecordingListener record(Service service) { |
814 |
RecordingListener listener = new RecordingListener(service); |
815 |
service.addListener(listener, MoreExecutors.sameThreadExecutor()); |
816 |
return listener; |
817 |
} |
818 |
819 |
final Service service; |
820 |
821 |
RecordingListener(Service service) { |
822 |
this .service = service; |
823 |
} |
824 |
825 |
@GuardedBy ( "this" ) |
826 |
final List<State> stateHistory = Lists.newArrayList(); |
827 |
final CountDownLatch completionLatch = new CountDownLatch( 1 ); |
828 |
829 |
ImmutableList<State> getStateHistory() throws Exception { |
830 |
completionLatch.await(); |
831 |
synchronized ( this ) { |
832 |
return ImmutableList.copyOf(stateHistory); |
833 |
} |
834 |
} |
835 |
836 |
@Override public synchronized void starting() { |
837 |
assertTrue(stateHistory.isEmpty()); |
838 |
assertNotSame(State.NEW, service.state()); |
839 |
stateHistory.add(State.STARTING); |
840 |
} |
841 |
842 |
@Override public synchronized void running() { |
843 |
assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory)); |
844 |
stateHistory.add(State.RUNNING); |
845 |
service.awaitRunning(); |
846 |
assertNotSame(State.STARTING, service.state()); |
847 |
} |
848 |
849 |
@Override public synchronized void stopping(State from) { |
850 |
assertEquals(from, Iterables.getLast(stateHistory)); |
851 |
stateHistory.add(State.STOPPING); |
852 |
if (from == State.STARTING) { |
853 |
try { |
854 |
service.awaitRunning(); |
855 |
fail(); |
856 |
} catch (IllegalStateException expected) { |
857 |
assertNull(expected.getCause()); |
858 |
assertTrue(expected.getMessage().equals( |
859 |
"Expected the service to be RUNNING, but was STOPPING" )); |
860 |
} |
861 |
} |
862 |
assertNotSame(from, service.state()); |
863 |
} |
864 |
865 |
@Override public synchronized void terminated(State from) { |
866 |
assertEquals(from, Iterables.getLast(stateHistory, State.NEW)); |
867 |
stateHistory.add(State.TERMINATED); |
868 |
assertEquals(State.TERMINATED, service.state()); |
869 |
if (from == State.NEW) { |
870 |
try { |
871 |
service.awaitRunning(); |
872 |
fail(); |
873 |
} catch (IllegalStateException expected) { |
874 |
assertNull(expected.getCause()); |
875 |
assertTrue(expected.getMessage().equals( |
876 |
"Expected the service to be RUNNING, but was TERMINATED" )); |
877 |
} |
878 |
} |
879 |
completionLatch.countDown(); |
880 |
} |
881 |
882 |
@Override public synchronized void failed(State from, Throwable failure) { |
883 |
assertEquals(from, Iterables.getLast(stateHistory)); |
884 |
stateHistory.add(State.FAILED); |
885 |
assertEquals(State.FAILED, service.state()); |
886 |
assertEquals(failure, service.failureCause()); |
887 |
if (from == State.STARTING) { |
888 |
try { |
889 |
service.awaitRunning(); |
890 |
fail(); |
891 |
} catch (IllegalStateException e) { |
892 |
assertEquals(failure, e.getCause()); |
893 |
} |
894 |
} |
895 |
try { |
896 |
service.awaitTerminated(); |
897 |
fail(); |
898 |
} catch (IllegalStateException e) { |
899 |
assertEquals(failure, e.getCause()); |
900 |
} |
901 |
completionLatch.countDown(); |
902 |
} |
903 |
} |
904 |
905 |
public void testNotifyStartedWhenNotStarting() { |
906 |
AbstractService service = new DefaultService(); |
907 |
try { |
908 |
service.notifyStarted(); |
909 |
fail(); |
910 |
} catch (IllegalStateException expected) {} |
911 |
} |
912 |
913 |
public void testNotifyStoppedWhenNotRunning() { |
914 |
AbstractService service = new DefaultService(); |
915 |
try { |
916 |
service.notifyStopped(); |
917 |
fail(); |
918 |
} catch (IllegalStateException expected) {} |
919 |
} |
920 |
921 |
public void testNotifyFailedWhenNotStarted() { |
922 |
AbstractService service = new DefaultService(); |
923 |
try { |
924 |
service.notifyFailed( new Exception()); |
925 |
fail(); |
926 |
} catch (IllegalStateException expected) {} |
927 |
} |
928 |
929 |
public void testNotifyFailedWhenTerminated() { |
930 |
NoOpService service = new NoOpService(); |
931 |
service.startAsync().awaitRunning(); |
932 |
service.stopAsync().awaitTerminated(); |
933 |
try { |
934 |
service.notifyFailed( new Exception()); |
935 |
fail(); |
936 |
} catch (IllegalStateException expected) {} |
937 |
} |
938 |
939 |
private static class DefaultService extends AbstractService { |
940 |
@Override protected void doStart() {} |
941 |
@Override protected void doStop() {} |
942 |
} |
943 |
944 |
private static final Exception EXCEPTION = new Exception(); |
945 |
} |
946 |
<pre> |
ServiceManagerTest
001 |
</pre> |
002 |
/* |
003 |
* Copyright (C) 2012 The Guava Authors |
004 |
* |
005 |
* Licensed under the Apache License, Version 2.0 (the "License"); |
006 |
* you may not use this file except in compliance with the License. |
007 |
* You may obtain a copy of the License at |
008 |
* |
010 |
* |
011 |
* Unless required by applicable law or agreed to in writing, software |
012 |
* distributed under the License is distributed on an "AS IS" BASIS, |
013 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
014 |
* See the License for the specific language governing permissions and |
015 |
* limitations under the License. |
016 |
*/ |
017 |
package com.google.common.util.concurrent; |
018 |
019 |
import static java.util.Arrays.asList; |
020 |
021 |
import com.google.common.collect.ImmutableMap; |
022 |
import com.google.common.collect.ImmutableSet; |
023 |
import com.google.common.collect.Lists; |
024 |
import com.google.common.collect.Sets; |
025 |
import com.google.common.testing.NullPointerTester; |
026 |
import com.google.common.testing.TestLogHandler; |
027 |
import com.google.common.util.concurrent.ServiceManager.Listener; |
028 |
029 |
import junit.framework.TestCase; |
030 |
031 |
import java.util.Arrays; |
032 |
import java.util.Collection; |
033 |
import java.util.List; |
034 |
import java.util.Set; |
035 |
import java.util.concurrent.CountDownLatch; |
036 |
import java.util.concurrent.Executor; |
037 |
import java.util.concurrent.TimeUnit; |
038 |
import java.util.concurrent.TimeoutException; |
039 |
import java.util.logging.Formatter; |
040 |
import java.util.logging.Level; |
041 |
import java.util.logging.LogRecord; |
042 |
import java.util.logging.Logger; |
043 |
044 |
/** |
045 |
* Tests for {@link ServiceManager}. |
046 |
* |
047 |
* @author Luke Sandberg |
048 |
* @author Chris Nokleberg |
049 |
*/ |
050 |
public class ServiceManagerTest extends TestCase { |
051 |
052 |
private static class NoOpService extends AbstractService { |
053 |
@Override protected void doStart() { |
054 |
notifyStarted(); |
055 |
} |
056 |
057 |
@Override protected void doStop() { |
058 |
notifyStopped(); |
059 |
} |
060 |
} |
061 |
062 |
/* |
063 |
* A NoOp service that will delay the startup and shutdown notification for a configurable amount |
064 |
* of time. |
065 |
*/ |
066 |
private static class NoOpDelayedSerivce extends NoOpService { |
067 |
private long delay; |
068 |
069 |
public NoOpDelayedSerivce(long delay) { |
070 |
this.delay = delay; |
071 |
} |
072 |
073 |
@Override protected void doStart() { |
074 |
new Thread() { |
075 |
@Override public void run() { |
076 |
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); |
077 |
notifyStarted(); |
078 |
} |
079 |
}.start(); |
080 |
} |
081 |
082 |
@Override protected void doStop() { |
083 |
new Thread() { |
084 |
@Override public void run() { |
085 |
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); |
086 |
notifyStopped(); |
087 |
} |
088 |
}.start(); |
089 |
} |
090 |
} |
091 |
092 |
private static class FailStartService extends NoOpService { |
093 |
@Override protected void doStart() { |
094 |
notifyFailed(new IllegalStateException("failed")); |
095 |
} |
096 |
} |
097 |
098 |
private static class FailRunService extends NoOpService { |
099 |
@Override protected void doStart() { |
100 |
super.doStart(); |
101 |
notifyFailed(new IllegalStateException("failed")); |
102 |
} |
103 |
} |
104 |
105 |
private static class FailStopService extends NoOpService { |
106 |
@Override protected void doStop() { |
107 |
notifyFailed(new IllegalStateException("failed")); |
108 |
} |
109 |
} |
110 |
111 |
public void testServiceStartupTimes() { |
112 |
Service a = new NoOpDelayedSerivce(150); |
113 |
Service b = new NoOpDelayedSerivce(353); |
114 |
ServiceManager serviceManager = new ServiceManager(asList(a, b)); |
115 |
serviceManager.startAsync().awaitHealthy(); |
116 |
ImmutableMap<Service, Long> startupTimes = serviceManager.startupTimes(); |
117 |
assertEquals(2, startupTimes.size()); |
118 |
assertTrue(startupTimes.get(a) >= 150); |
119 |
assertTrue(startupTimes.get(b) >= 353); |
120 |
} |
121 |
122 |
public void testServiceStartStop() { |
123 |
Service a = new NoOpService(); |
124 |
Service b = new NoOpService(); |
125 |
ServiceManager manager = new ServiceManager(asList(a, b)); |
126 |
RecordingListener listener = new RecordingListener(); |
127 |
manager.addListener(listener); |
128 |
assertState(manager, Service.State.NEW, a, b); |
129 |
assertFalse(manager.isHealthy()); |
130 |
manager.startAsync().awaitHealthy(); |
131 |
assertState(manager, Service.State.RUNNING, a, b); |
132 |
assertTrue(manager.isHealthy()); |
133 |
assertTrue(listener.healthyCalled); |
134 |
assertFalse(listener.stoppedCalled); |
135 |
assertTrue(listener.failedServices.isEmpty()); |
136 |
manager.stopAsync().awaitStopped(); |
137 |
assertState(manager, Service.State.TERMINATED, a, b); |
138 |
assertFalse(manager.isHealthy()); |
139 |
assertTrue(listener.stoppedCalled); |
140 |
assertTrue(listener.failedServices.isEmpty()); |
141 |
} |
142 |
143 |
public void testFailStart() throws Exception { |
144 |
Service a = new NoOpService(); |
145 |
Service b = new FailStartService(); |
146 |
Service c = new NoOpService(); |
147 |
Service d = new FailStartService(); |
148 |
Service e = new NoOpService(); |
149 |
ServiceManager manager = new ServiceManager(asList(a, b, c, d, e)); |
150 |
RecordingListener listener = new RecordingListener(); |
151 |
manager.addListener(listener); |
152 |
assertState(manager, Service.State.NEW, a, b, c, d, e); |
153 |
try { |
154 |
manager.startAsync().awaitHealthy(); |
155 |
fail(); |
156 |
} catch (IllegalStateException expected) { |
157 |
} |
158 |
assertFalse(listener.healthyCalled); |
159 |
assertState(manager, Service.State.RUNNING, a, c, e); |
160 |
assertEquals(ImmutableSet.of(b, d), listener.failedServices); |
161 |
assertState(manager, Service.State.FAILED, b, d); |
162 |
assertFalse(manager.isHealthy()); |
163 |
164 |
manager.stopAsync().awaitStopped(); |
165 |
assertFalse(manager.isHealthy()); |
166 |
assertFalse(listener.healthyCalled); |
167 |
assertTrue(listener.stoppedCalled); |
168 |
} |
169 |
170 |
public void testFailRun() throws Exception { |
171 |
Service a = new NoOpService(); |
172 |
Service b = new FailRunService(); |
173 |
ServiceManager manager = new ServiceManager(asList(a, b)); |
174 |
RecordingListener listener = new RecordingListener(); |
175 |
manager.addListener(listener); |
176 |
assertState(manager, Service.State.NEW, a, b); |
177 |
try { |
178 |
manager.startAsync().awaitHealthy(); |
179 |
fail(); |
180 |
} catch (IllegalStateException expected) { |
181 |
} |
182 |
assertTrue(listener.healthyCalled); |
183 |
assertEquals(ImmutableSet.of(b), listener.failedServices); |
184 |
185 |
manager.stopAsync().awaitStopped(); |
186 |
assertState(manager, Service.State.FAILED, b); |
187 |
assertState(manager, Service.State.TERMINATED, a); |
188 |
189 |
assertTrue(listener.stoppedCalled); |
190 |
} |
191 |
192 |
public void testFailStop() throws Exception { |
193 |
Service a = new NoOpService(); |
194 |
Service b = new FailStopService(); |
195 |
Service c = new NoOpService(); |
196 |
ServiceManager manager = new ServiceManager(asList(a, b, c)); |
197 |
RecordingListener listener = new RecordingListener(); |
198 |
manager.addListener(listener); |
199 |
200 |
manager.startAsync().awaitHealthy(); |
201 |
assertTrue(listener.healthyCalled); |
202 |
assertFalse(listener.stoppedCalled); |
203 |
manager.stopAsync().awaitStopped(); |
204 |
205 |
assertTrue(listener.stoppedCalled); |
206 |
assertEquals(ImmutableSet.of(b), listener.failedServices); |
207 |
assertState(manager, Service.State.FAILED, b); |
208 |
assertState(manager, Service.State.TERMINATED, a, c); |
209 |
} |
210 |
211 |
public void testToString() throws Exception { |
212 |
Service a = new NoOpService(); |
213 |
Service b = new FailStartService(); |
214 |
ServiceManager manager = new ServiceManager(asList(a, b)); |
215 |
String toString = manager.toString(); |
216 |
assertTrue(toString.contains("NoOpService")); |
217 |
assertTrue(toString.contains("FailStartService")); |
218 |
} |
219 |
220 |
public void testTimeouts() throws Exception { |
221 |
Service a = new NoOpDelayedSerivce(50); |
222 |
ServiceManager manager = new ServiceManager(asList(a)); |
223 |
manager.startAsync(); |
224 |
try { |
225 |
manager.awaitHealthy(1, TimeUnit.MILLISECONDS); |
226 |
fail(); |
227 |
} catch (TimeoutException expected) { |
228 |
} |
229 |
manager.awaitHealthy(100, TimeUnit.MILLISECONDS); // no exception thrown |
230 |
231 |
manager.stopAsync(); |
232 |
try { |
233 |
manager.awaitStopped(1, TimeUnit.MILLISECONDS); |
234 |
fail(); |
235 |
} catch (TimeoutException expected) { |
236 |
} |
237 |
manager.awaitStopped(100, TimeUnit.MILLISECONDS); // no exception thrown |
238 |
} |
239 |
240 |
/** |
241 |
* This covers a case where if the last service to stop failed then the stopped callback would |
242 |
* never be called. |
243 |
*/ |
244 |
public void testSingleFailedServiceCallsStopped() { |
245 |
Service a = new FailStartService(); |
246 |
ServiceManager manager = new ServiceManager(asList(a)); |
247 |
RecordingListener listener = new RecordingListener(); |
248 |
manager.addListener(listener); |
249 |
try { |
250 |
manager.startAsync().awaitHealthy(); |
251 |
fail(); |
252 |
} catch (IllegalStateException expected) { |
253 |
} |
254 |
assertTrue(listener.stoppedCalled); |
255 |
} |
256 |
257 |
/** |
258 |
* This covers a bug where listener.healthy would get called when a single service failed during |
259 |
* startup (it occurred in more complicated cases also). |
260 |
*/ |
261 |
public void testFailStart_singleServiceCallsHealthy() { |
262 |
Service a = new FailStartService(); |
263 |
ServiceManager manager = new ServiceManager(asList(a)); |
264 |
RecordingListener listener = new RecordingListener(); |
265 |
manager.addListener(listener); |
266 |
try { |
267 |
manager.startAsync().awaitHealthy(); |
268 |
fail(); |
269 |
} catch (IllegalStateException expected) { |
270 |
} |
271 |
assertFalse(listener.healthyCalled); |
272 |
} |
273 |
274 |
/** |
275 |
* This covers a bug where if a listener was installed that would stop the manager if any service |
276 |
* fails and something failed during startup before service.start was called on all the services, |
277 |
* then awaitStopped would deadlock due to an IllegalStateException that was thrown when trying to |
278 |
* stop the timer(!). |
279 |
*/ |
280 |
public void testFailStart_stopOthers() throws TimeoutException { |
281 |
Service a = new FailStartService(); |
282 |
Service b = new NoOpService(); |
283 |
final ServiceManager manager = new ServiceManager(asList(a, b)); |
284 |
manager.addListener(new Listener() { |
285 |
@Override public void failure(Service service) { |
286 |
manager.stopAsync(); |
287 |
}}); |
288 |
manager.startAsync(); |
289 |
manager.awaitStopped(10, TimeUnit.MILLISECONDS); |
290 |
} |
291 |
292 |
private static void assertState( |
293 |
ServiceManager manager, Service.State state, Service... services) { |
294 |
Collection<Service> managerServices = manager.servicesByState().get(state); |
295 |
for (Service service : services) { |
296 |
assertEquals(service.toString(), state, service.state()); |
297 |
assertEquals(service.toString(), service.isRunning(), state == Service.State.RUNNING); |
298 |
assertTrue(managerServices + " should contain " + service, managerServices.contains(service)); |
299 |
} |
300 |
} |
301 |
302 |
/** |
303 |
* This is for covering a case where the ServiceManager would behave strangely if constructed |
304 |
* with no service under management. Listeners would never fire because the ServiceManager was |
305 |
* healthy and stopped at the same time. This test ensures that listeners fire and isHealthy |
306 |
* makes sense. |
307 |
*/ |
308 |
public void testEmptyServiceManager() { |
309 |
Logger logger = Logger.getLogger(ServiceManager.class.getName()); |
310 |
logger.setLevel(Level.FINEST); |
311 |
TestLogHandler logHandler = new TestLogHandler(); |
312 |
logger.addHandler(logHandler); |
313 |
ServiceManager manager = new ServiceManager(Arrays.<Service>asList()); |
314 |
RecordingListener listener = new RecordingListener(); |
315 |
manager.addListener(listener, MoreExecutors.sameThreadExecutor()); |
316 |
manager.startAsync().awaitHealthy(); |
317 |
assertTrue(manager.isHealthy()); |
318 |
assertTrue(listener.healthyCalled); |
319 |
assertFalse(listener.stoppedCalled); |
320 |
assertTrue(listener.failedServices.isEmpty()); |
321 |
manager.stopAsync().awaitStopped(); |
322 |
assertFalse(manager.isHealthy()); |
323 |
assertTrue(listener.stoppedCalled); |
324 |
assertTrue(listener.failedServices.isEmpty()); |
325 |
// check that our NoOpService is not directly observable via any of the inspection methods or |
326 |
// via logging. |
327 |
assertEquals("ServiceManager{services=[]}", manager.toString()); |
328 |
assertTrue(manager.servicesByState().isEmpty()); |
329 |
assertTrue(manager.startupTimes().isEmpty()); |
330 |
Formatter logFormatter = new Formatter() { |
331 |
@Override public String format(LogRecord record) { |
332 |
return formatMessage(record); |
333 |
} |
334 |
}; |
335 |
for (LogRecord record : logHandler.getStoredLogRecords()) { |
336 |
assertFalse(logFormatter.format(record).contains("NoOpService")); |
337 |
} |
338 |
} |
339 |
340 |
/** |
341 |
* This is for a case where a long running Listener using the sameThreadListener could deadlock |
342 |
* another thread calling stopAsync(). |
343 |
*/ |
344 |
345 |
public void testListenerDeadlock() throws InterruptedException { |
346 |
final CountDownLatch failEnter = new CountDownLatch(1); |
347 |
Service failRunService = new AbstractService() { |
348 |
@Override protected void doStart() { |
349 |
new Thread() { |
350 |
@Override public void run() { |
351 |
notifyStarted(); |
352 |
notifyFailed(new Exception("boom")); |
353 |
} |
354 |
}.start(); |
355 |
} |
356 |
@Override protected void doStop() { |
357 |
notifyStopped(); |
358 |
} |
359 |
}; |
360 |
final ServiceManager manager = new ServiceManager( |
361 |
Arrays.asList(failRunService, new NoOpService())); |
362 |
manager.addListener(new ServiceManager.Listener() { |
363 |
@Override public void failure(Service service) { |
364 |
failEnter.countDown(); |
365 |
// block forever! |
366 |
Uninterruptibles.awaitUninterruptibly(new CountDownLatch(1)); |
367 |
} |
368 |
}, MoreExecutors.sameThreadExecutor()); |
369 |
// We do not call awaitHealthy because, due to races, that method may throw an exception. But |
370 |
// we really just want to wait for the thread to be in the failure callback so we wait for that |
371 |
// explicitly instead. |
372 |
manager.startAsync(); |
373 |
failEnter.await(); |
374 |
assertFalse("State should be updated before calling listeners", manager.isHealthy()); |
375 |
// now we want to stop the services. |
376 |
Thread stoppingThread = new Thread() { |
377 |
@Override public void run() { |
378 |
manager.stopAsync().awaitStopped(); |
379 |
} |
380 |
}; |
381 |
stoppingThread.start(); |
382 |
// this should be super fast since the only non stopped service is a NoOpService |
383 |
stoppingThread.join(1000); |
384 |
assertFalse("stopAsync has deadlocked!.", stoppingThread.isAlive()); |
385 |
} |
386 |
387 |
/** |
388 |
* Catches a bug where when constructing a service manager failed, later interactions with the |
389 |
* service could cause IllegalStateExceptions inside the partially constructed ServiceManager. |
390 |
* This ISE wouldn't actually bubble up but would get logged by ExecutionQueue. This obfuscated |
391 |
* the original error (which was not constructing ServiceManager correctly). |
392 |
*/ |
393 |
public void testPartiallyConstructedManager() { |
394 |
Logger logger = Logger.getLogger("global"); |
395 |
logger.setLevel(Level.FINEST); |
396 |
TestLogHandler logHandler = new TestLogHandler(); |
397 |
logger.addHandler(logHandler); |
398 |
NoOpService service = new NoOpService(); |
399 |
service.startAsync(); |
400 |
try { |
401 |
new ServiceManager(Arrays.asList(service)); |
402 |
fail(); |
403 |
} catch (IllegalArgumentException expected) {} |
404 |
service.stopAsync(); |
405 |
// Nothing was logged! |
406 |
assertEquals(0, logHandler.getStoredLogRecords().size()); |
407 |
} |
408 |
409 |
public void testPartiallyConstructedManager_transitionAfterAddListenerBeforeStateIsReady() { |
410 |
// The implementation of this test is pretty sensitive to the implementation <img class="wp-smiley" style="height: 1em; max-height: 1em;" alt=":(" src="http://ifeve.com/wp-includes/images/smilies/frownie.png"> but we want to |
411 |
// ensure that if weird things happen during construction then we get exceptions. |
412 |
final NoOpService service1 = new NoOpService(); |
413 |
// This service will start service1 when addListener is called. This simulates service1 being |
414 |
// started asynchronously. |
415 |
Service service2 = new Service() { |
416 |
final NoOpService delegate = new NoOpService(); |
417 |
@Override public final void addListener(Listener listener, Executor executor) { |
418 |
service1.startAsync(); |
419 |
delegate.addListener(listener, executor); |
420 |
} |
421 |
// Delegates from here on down |
422 |
@Override public final Service startAsync() { |
423 |
return delegate.startAsync(); |
424 |
} |
425 |
426 |
@Override public final Service stopAsync() { |
427 |
return delegate.stopAsync(); |
428 |
} |
429 |
430 |
@Override public final ListenableFuture<State> start() { |
431 |
return delegate.start(); |
432 |
} |
433 |
434 |
@Override public final ListenableFuture<State> stop() { |
435 |
return delegate.stop(); |
436 |
} |
437 |
438 |
@Override public State startAndWait() { |
439 |
return delegate.startAndWait(); |
440 |
} |
441 |
442 |
@Override public State stopAndWait() { |
443 |
return delegate.stopAndWait(); |
444 |
} |
445 |
446 |
@Override public final void awaitRunning() { |
447 |
delegate.awaitRunning(); |
448 |
} |
449 |
450 |
@Override public final void awaitRunning(long timeout, TimeUnit unit) |
451 |
throws TimeoutException { |
452 |
delegate.awaitRunning(timeout, unit); |
453 |
} |
454 |
455 |
@Override public final void awaitTerminated() { |
456 |
delegate.awaitTerminated(); |
457 |
} |
458 |
459 |
@Override public final void awaitTerminated(long timeout, TimeUnit unit) |
460 |
throws TimeoutException { |
461 |
delegate.awaitTerminated(timeout, unit); |
462 |
} |
463 |
464 |
@Override public final boolean isRunning() { |
465 |
return delegate.isRunning(); |
466 |
} |
467 |
468 |
@Override public final State state() { |
469 |
return delegate.state(); |
470 |
} |
471 |
472 |
@Override public final Throwable failureCause() { |
473 |
return delegate.failureCause(); |
474 |
} |
475 |
}; |
476 |
try { |
477 |
new ServiceManager(Arrays.asList(service1, service2)); |
478 |
fail(); |
479 |
} catch (IllegalArgumentException expected) { |
480 |
assertTrue(expected.getMessage().contains("started transitioning asynchronously")); |
481 |
} |
482 |
} |
483 |
484 |
/** |
485 |
* This test is for a case where two Service.Listener callbacks for the same service would call |
486 |
* transitionService in the wrong order due to a race. Due to the fact that it is a race this |
487 |
* test isn't guaranteed to expose the issue, but it is at least likely to become flaky if the |
488 |
* race sneaks back in, and in this case flaky means something is definitely wrong. |
489 |
* |
490 |
* <p>Before the bug was fixed this test would fail at least 30% of the time. |
491 |
*/ |
492 |
493 |
public void testTransitionRace() throws TimeoutException { |
494 |
for (int k = 0; k < 1000; k++) { |
495 |
List<Service> services = Lists.newArrayList(); |
496 |
for (int i = 0; i < 5; i++) { |
497 |
services.add(new SnappyShutdownService(i)); |
498 |
} |
499 |
ServiceManager manager = new ServiceManager(services); |
500 |
manager.startAsync().awaitHealthy(); |
501 |
manager.stopAsync().awaitStopped(1, TimeUnit.SECONDS); |
502 |
} |
503 |
} |
504 |
505 |
/** |
506 |
* This service will shutdown very quickly after stopAsync is called and uses a background thread |
507 |
* so that we know that the stopping() listeners will execute on a different thread than the |
508 |
* terminated() listeners. |
509 |
*/ |
510 |
private static class SnappyShutdownService extends AbstractExecutionThreadService { |
511 |
final int index; |
512 |
final CountDownLatch latch = new CountDownLatch( 1 ); |
513 |
514 |
SnappyShutdownService( int index) { |
515 |
this .index = index; |
516 |
} |
517 |
518 |
@Override protected void run() throws Exception { |
519 |
latch.await(); |
520 |
} |
521 |
522 |
@Override protected void triggerShutdown() { |
523 |
latch.countDown(); |
524 |
} |
525 |
526 |
@Override protected String serviceName() { |
527 |
return this .getClass().getSimpleName() + "[" + index + "]" ; |
528 |
} |
529 |
} |
530 |
531 |
public void testNulls() { |
532 |
ServiceManager manager = new ServiceManager(Arrays.<Service>asList()); |
533 |
new NullPointerTester() |
534 |
.setDefault(ServiceManager.Listener. class , new RecordingListener()) |
535 |
.testAllPublicInstanceMethods(manager); |
536 |
} |
537 |
538 |
private static final class RecordingListener extends ServiceManager.Listener { |
539 |
volatile boolean healthyCalled; |
540 |
volatile boolean stoppedCalled; |
541 |
final Set<Service> failedServices = Sets.newConcurrentHashSet(); |
542 |
543 |
@Override public void healthy() { |
544 |
healthyCalled = true ; |
545 |
} |
546 |
547 |
@Override public void stopped() { |
548 |
stoppedCalled = true ; |
549 |
} |
550 |
551 |
@Override public void failure(Service service) { |
552 |
failedServices.add(service); |
553 |
} |
554 |
} |
555 |
} |
556 |
<pre> |