| 1 | /* __copyright_begin__ |
| 2 | Copyright 2011 Dan Hagberg |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | __copyright_end__ */ |
| 16 | package net.digitaltsunami.tmeter.action; |
| 17 | |
| 18 | import java.util.HashSet; |
| 19 | import java.util.Set; |
| 20 | import java.util.concurrent.ExecutorService; |
| 21 | import java.util.concurrent.Executors; |
| 22 | import java.util.concurrent.LinkedBlockingQueue; |
| 23 | import java.util.concurrent.ThreadFactory; |
| 24 | |
| 25 | import net.digitaltsunami.tmeter.Timer; |
| 26 | import net.digitaltsunami.tmeter.TimerShell; |
| 27 | |
| 28 | /** |
| 29 | * Maintains and controls chain of {@link TimerAction} instances used to perform |
| 30 | * post completion processing of {@link Timer} instances. |
| 31 | * <p> |
| 32 | * All processing is handled on a separate thread to minimize the impact to the |
| 33 | * timed processing thread. |
| 34 | * <p> |
| 35 | * Each timer is processed by each {@link TimerAction} in a chain, but the order |
| 36 | * is not guaranteed. |
| 37 | * <p> |
| 38 | * Processing of {@link TimerAction}s can be stopped by clearing the action list |
| 39 | * using {@link #clearActions()}. This will remove the chain of |
| 40 | * {@link ActionChain} instances, but elements currently being processed will |
| 41 | * continue. |
| 42 | * <p> |
| 43 | * If any of the {@link TimerAction}s in the chain maintain state (e.g., |
| 44 | * counts), these can be reset by invoking {@link ActionChain#reset()}. This |
| 45 | * will cause each action in the chain to invoke {@link TimerAction#reset()} |
| 46 | * <p> |
| 47 | * <strong>Shutdown Processing</strong> |
| 48 | * <p> |
| 49 | * As the actions are handled on a separate thread, the timers may not have been |
| 50 | * processed when the timed application is shutdown. The default processing |
| 51 | * requires that the user manually shut down the thread. Until this occurs, the |
| 52 | * virtual machine will not shut down. Alternatively, a |
| 53 | * {@link ActionChainShutdownType} can be provided that will instruct the action |
| 54 | * chain to terminate on shutdown. |
| 55 | * <p> |
| 56 | verify that this is correct with both return from main and system.exit |
| 57 | * Shutdown types and corresponding actions are: |
| 58 | * <li> {@link ActionChainShutdownType#TERMINATE_AFTER_COMPLETION}: The thread |
| 59 | * will attempt to clear its queue prior to exit. |
| 60 | * <li> {@link ActionChainShutdownType#TERMINATE_MANUALLY}: The action chain will |
| 61 | * block shutdown of the virtual machine until instructed to shutdown. Use |
| 62 | * {@link #shutdown()} or {@link #clearActions()} to manually terminate chain |
| 63 | * processing. At this point, the thread will attempt to clear its queue prior |
| 64 | * to terminating. If the queue contents should be discarded and the thread |
| 65 | * terminated immediately, invoke {@link #shutdownNow()} with a value of true to |
| 66 | * force immediate termination. |
| 67 | * <li> {@link ActionChainShutdownType#TERMINATE_IMMEDIATELY}: The thread will |
| 68 | * terminate immediately and will not complete processing. This includes actions |
| 69 | * in mid processing. |
| 70 | * <p> |
| 71 | * |
| 72 | * @author dhagberg |
| 73 | * |
| 74 | */ |
| 75 | public class ActionChain { |
| 76 | |
| 77 | /** |
| 78 | * Queued timer instances to be processed by TimerAction list. |
| 79 | */ |
| 80 | private LinkedBlockingQueue<Timer> actionQueue; |
| 81 | |
| 82 | /** |
| 83 | * Executor to process actionQueue |
| 84 | */ |
| 85 | private ExecutorService queueProcessor; |
| 86 | |
| 87 | /** |
| 88 | * Root action in potential chain of actions. |
| 89 | */ |
| 90 | private TimerAction rootAction; |
| 91 | |
| 92 | private final ActionChainShutdownType shutdownType; |
| 93 | |
| 94 | /** |
| 95 | * Create an instance of an action chain with the default shutdown behavior, |
| 96 | * which to attempt to clear all currently queued timers prior to shutting |
| 97 | * down. |
| 98 | */ |
| 99 | public ActionChain() { |
| 100 | this(ActionChainShutdownType.TERMINATE_AFTER_COMPLETION); |
| 101 | } |
| 102 | |
| 103 | /** |
| 104 | * Create an instance of an action chain with an override of the shutdown |
| 105 | * behavior. |
| 106 | * |
| 107 | * @param shutdownType |
| 108 | * type of processing to complete when shutting down the action |
| 109 | * chain. |
| 110 | */ |
| 111 | public ActionChain(ActionChainShutdownType shutdownType) { |
| 112 | this.shutdownType = shutdownType; |
| 113 | } |
| 114 | |
| 115 | /** |
| 116 | * Create an instance of an action chain with the default shutdown behavior, |
| 117 | * which to attempt to clear all currently queued timers prior to shutting |
| 118 | * down. |
| 119 | * |
| 120 | * @param action |
| 121 | * initial action in the chain. |
| 122 | */ |
| 123 | public ActionChain(TimerAction action) { |
| 124 | this(action, ActionChainShutdownType.TERMINATE_AFTER_COMPLETION); |
| 125 | } |
| 126 | |
| 127 | /** |
| 128 | * Create an instance of an action chain with an override of the shutdown |
| 129 | * behavior. |
| 130 | * |
| 131 | * @param action |
| 132 | * initial action in the chain. |
| 133 | * @param shutdownType |
| 134 | * type of processing to complete when shutting down the action |
| 135 | * chain. |
| 136 | */ |
| 137 | public ActionChain(TimerAction action, ActionChainShutdownType shutdownType) { |
| 138 | this.shutdownType = shutdownType; |
| 139 | this.rootAction = action; |
| 140 | createQueueProcessor(); |
| 141 | } |
| 142 | |
| 143 | /** |
| 144 | * Submit a completed timer for post processing. If the timer has not yet |
| 145 | * completed, it will not be submitted. |
| 146 | */ |
| 147 | public void submitCompletedTimer(Timer completedTimer) { |
| 148 | if (hasActionList() && completedTimer.isStopped()) { |
| 149 | actionQueue.add(completedTimer); |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | /** |
| 154 | * Invokes {@link TimerAction#reset()} on each action within the chain. |
| 155 | * <p> |
| 156 | * Action chain processing will continue. To avoid this, shutdown the |
| 157 | * queue prior to reset. |
| 158 | */ |
| 159 | public void reset() { |
| 160 | TimerAction tempRoot = rootAction; |
| 161 | if (tempRoot != null) { |
| 162 | tempRoot.resetState(); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | /** |
| 167 | * Shutdown processing of the action chain, discarding all timer currently |
| 168 | * on the queue. |
| 169 | */ |
| 170 | public void shutdownNow() { |
| 171 | if (queueProcessor != null) { |
| 172 | queueProcessor.shutdownNow(); |
| 173 | } |
| 174 | } |
| 175 | |
| 176 | /** |
| 177 | * Shutdown processing of the action chain with the option of finishing |
| 178 | * current tasks. |
| 179 | * <p> |
| 180 | * All timers currently on the queue may be processed depending on the |
| 181 | * {@link ActionChainShutdownType} provided when creating the action chain. |
| 182 | * See the class comments for more information regarding shutdown |
| 183 | * processing. and the virtual machine is shutdown. |
| 184 | */ |
| 185 | public void shutdown() { |
| 186 | Timer t = new TimerShell("EndProcesing"); |
| 187 | t.stop(); |
| 188 | submitCompletedTimer(t); |
| 189 | } |
| 190 | |
| 191 | /** |
| 192 | * Clear the timer action chain. This will clear shutdown the action chain |
| 193 | * prior to clearing the actions. As the actions will be cleared, timers |
| 194 | * currently in the queue will not be processed. |
| 195 | */ |
| 196 | public void clearActions() { |
| 197 | shutdownNow(); |
| 198 | rootAction = null; |
| 199 | } |
| 200 | |
| 201 | /** |
| 202 | * Add the provided {@link TimerAction} to the current chain of |
| 203 | * {@link TimerAction} instances. |
| 204 | * <p> |
| 205 | * This method is synchronized as it may create the root node, but other |
| 206 | * methods that check the root are not synchronized to reduce context |
| 207 | * switching; therefore, if a timer completes before a timer action is |
| 208 | * added, then it may be missed. This should not happen in the normal |
| 209 | * execution as the setup should be completed prior to starting tasks. |
| 210 | */ |
| 211 | public synchronized TimerAction addAction(TimerAction action) { |
| 212 | if (rootAction == null) { |
| 213 | rootAction = action; |
| 214 | createQueueProcessor(); |
| 215 | return action; |
| 216 | } |
| 217 | return rootAction.addAction(action); |
| 218 | } |
| 219 | |
| 220 | /** |
| 221 | * Returns true if an action list is defined to process completed timers. |
| 222 | * |
| 223 | * @return |
| 224 | */ |
| 225 | public boolean hasActionList() { |
| 226 | return rootAction != null; |
| 227 | } |
| 228 | |
| 229 | /** |
| 230 | * Create the queue and queue processor thread. The thread will continue |
| 231 | * until an instance of {@link TimerShell} is found on the queue at which |
| 232 | * point it will terminate. |
| 233 | */ |
| 234 | private void createQueueProcessor() { |
| 235 | actionQueue = new LinkedBlockingQueue<Timer>(); |
| 236 | queueProcessor = Executors.newSingleThreadExecutor(new ThreadFactory() { |
| 237 | @Override |
| 238 | public Thread newThread(Runnable target) { |
| 239 | final Thread thread = new Thread(target); |
| 240 | switch (shutdownType) { |
| 241 | case TERMINATE_IMMEDIATELY: |
| 242 | thread.setDaemon(true); |
| 243 | break; |
| 244 | |
| 245 | case TERMINATE_AFTER_COMPLETION: |
| 246 | thread.setDaemon(true); |
| 247 | Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { |
| 248 | |
| 249 | @Override |
| 250 | public void run() { |
| 251 | System.out.printf("In shutdown hook. %d remaining in queue\n", actionQueue.size()); |
| 252 | ActionChain.this.shutdown(); |
| 253 | try { thread.join(); } catch (InterruptedException ignore) { } |
| 254 | } |
| 255 | })); |
| 256 | break; |
| 257 | |
| 258 | case TERMINATE_MANUALLY: // Included for completeness. |
| 259 | default: |
| 260 | break; |
| 261 | } |
| 262 | return thread; |
| 263 | } |
| 264 | }); |
| 265 | queueProcessor.execute(new Runnable() { |
| 266 | |
| 267 | @Override |
| 268 | public void run() { |
| 269 | while (true) { |
| 270 | Timer timer; |
| 271 | try { |
| 272 | timer = actionQueue.take(); |
| 273 | if (timer instanceof TimerShell) { |
| 274 | // Shut down queue processor if TimerShell is placed |
| 275 | // on queue. |
| 276 | queueProcessor.shutdownNow(); |
| 277 | return; |
| 278 | } |
| 279 | } catch (InterruptedException e) { |
| 280 | Thread.interrupted(); |
| 281 | return; |
| 282 | } |
| 283 | // Place the instance in another variable to prevent it |
| 284 | // being cleared in between checking and using. |
| 285 | TimerAction currentRoot = rootAction; |
| 286 | if (currentRoot != null) { |
| 287 | currentRoot.timerComplete(timer); |
| 288 | } |
| 289 | } |
| 290 | } |
| 291 | }); |
| 292 | } |
| 293 | |
| 294 | /** |
| 295 | * Return a set of all actions currently in the chain. |
| 296 | * |
| 297 | * @return |
| 298 | */ |
| 299 | public Set<TimerAction> getActions() { |
| 300 | Set<TimerAction> actions = new HashSet<TimerAction>(); |
| 301 | TimerAction currentAction = rootAction; |
| 302 | while (currentAction != null) { |
| 303 | actions.add(currentAction); |
| 304 | currentAction = currentAction.nextAction; |
| 305 | } |
| 306 | return actions; |
| 307 | } |
| 308 | |
| 309 | } |