package net.catpad.infobus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* This class maintains the service thread pool of the application.
*
* NOTE: none of this class public methods except getInfoBus() method
* should be called from classes inheriting from CommQueueItem, i.e.
* from the items (classes) containing the application logic and executing in the
* service thread contexts.
*
* The thread pool implements the failover architecture: when a service thread fails
* on an uncaught exception, the thread pool of ServiceLauncher will try to restart it.
*
* @see ServiceThread for much more details about service threads.
*
* @author Michael Gertelman
*
*/
public final class ServiceLauncher {
/**
* This inner class will be responsible for the creation of all the service threads
* and keeping them alive in the thread pool.
*/
class ServiceThreadPoolExecutor extends ThreadPoolExecutor {
ServiceThreadPoolExecutor() {
super(Integer.MAX_VALUE, // Core Pool Size - always create a new thread
Integer.MAX_VALUE, // Maximum Pool Size
Long.MAX_VALUE, // Keep Alive Time - run forever (never terminate anything)
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), // Work Queue - hands off tasks to threads without otherwise holding them
Executors.defaultThreadFactory(), // Thread Factory
new ThreadPoolExecutor.AbortPolicy()); // the handler throws a runtime RejectedExecutionException upon task rejection
}
/**
* Allow the logging before executing the new service.
* This method can be used for some additional pre-execution activities.
*/
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
logInfo("Thread [" + r.toString() + "] is going to start");
}
/**
* This method is crucual for keeping the application alive even
* after an uncaught exception.
* When a service thread throws an exception that is not treated
* and left uncaught, the thread is terminated and afterExecute() method
* is called. The ServiceThreadPoolExecutor will then try to restart the failed service.
*/
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (r != null) {
logSevere("Thread [" + r.toString() + "] has been terminated");
// Remove failed task from the executor's internal queue
// NOTE: this is important, otherwise we can have an indefinitely
// growing queue and OutOfMemory error if the failures are frequent
remove(r);
} else {
logSevere("Thread [" + r.toString() + " can't be restarted");
return;
}
if (t != null) {
logSevere("Termination caused by: " + t.getMessage() + "\n");
StackTraceElement[] ste = t.getStackTrace();
StringBuffer buf = new StringBuffer();
for (int i = 0; i < ste.length; i++) {
buf.append(ste[i].toString()).append("\n");
}
buf.append("\n\n");
logSevere(buf.toString());
}
// And restart the service
try {
ServiceThread serviceThread = (ServiceThread)r;
ServiceId id = serviceThread.getServiceId();
// Remove this service from the service thread map before
// trying to restart it again
serviceThreads.remove(id);
// If the thread was stopped intentionally - don't restart it
if (serviceThread.shouldExit()) {
logInfo(id.toString() + " was stopped intentionally; Exiting.");
return;
}
logInfo("Restarting " + id.toString() + " after an uncaught exception...");
try {
startService(id);
} catch (ServiceThreadAlreadyRunningException e) {
// This cannot happen because we have already deleted this service
// from the service thread map
}
} catch (ClassCastException e) { // This should never happen, but just in case
logSevere("Terminated thread is not of type ServiceThread - cannot restart");
}
}
protected void terminated() {
super.terminated();
logInfo("ServiceThreadPoolExecutor has been terminated");
}
} // ServiceThreadPoolExecutor class
private Logger logger;
/**
* Central InfoBus of the application
*/
private InfoBus infoBus;
/**
* All service threads created so far
*/
private Map<ServiceId,ServiceThread> serviceThreads;
/**
* Thread pool maintaining all service threads
*/
private ServiceThreadPoolExecutor threadPoolExecutor;
/**
* Create ServiceLauncher without a logger.
* No log information will be outputted.
*
*/
public ServiceLauncher() {
this(null);
}
/**
* Create ServiceLauncher with a logger.
* @param logger
*/
public ServiceLauncher(Logger logger) {
this.logger = logger;
serviceThreads = Collections.synchronizedMap(new HashMap<ServiceId,ServiceThread>());
infoBus = new InfoBus();
threadPoolExecutor = new ServiceThreadPoolExecutor();
}
/**
* Creates and starts a service thread given its unique id
*
* @param id
*/
public synchronized void startService(ServiceId id) throws ServiceThreadAlreadyRunningException {
ServiceThread serviceThread = serviceThreads.get(id);
if (serviceThread != null) {
throw new ServiceThreadAlreadyRunningException(id);
}
serviceThread = new ServiceThread(id, infoBus, logger);
threadPoolExecutor.execute(serviceThread);
serviceThreads.put(id, serviceThread);
}
/**
* This methods helps when one needs to start many services without having to name
* each one of them specifically. The services will get their ids as an array of
* consecutive integer starting from "from" and until the "to".
* All created services will be immediately started and the list of the created
* service ids will be returned.
* Overall the (to - from + 1) services will be created and started.
*
* @param from where to start giving ids to the new services
* @param to where to stop giving ids to the new services
* @param namePrefix the prefix wich will be added to the name of the service plus its index
* @return a list of the created service ids
* @throws ServiceThreadAlreadyRunningException
*/
public List<ServiceId> startServices(int from, int to, String namePrefix)
throws ServiceThreadAlreadyRunningException {
ArrayList<ServiceId> ids = new ArrayList<ServiceId>();
for (int i = from; i <= to; i++) {
ServiceId id = new ServiceId(i, namePrefix + i);
ids.add(id);
startService(id);
}
return ids;
}
/**
* Allows the thread calling shutdownImmediately() or shutdownGracefully()
* to wait until all service threads finish their execution.
* The method will block indefinitely until all service threads exit.
*
* @throws InterruptedException
*/
public void join() throws InterruptedException {
threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
/**
* The service thread whose id is passed as a parameter will be
* shut down immediately (after executing its current item)
* regardless of how many items remain in its communication queue.
*
* This method is package private. No client should ever stop a service.
*
* @param service id
*/
synchronized void stopServiceImmediately(ServiceId id) {
ServiceThread serviceThread = serviceThreads.get(id);
if (serviceThread != null) {
serviceThread.doExit();
}
serviceThreads.remove(serviceThread);
serviceThread.interrupt();
}
/**
* The service thread whose id is passed as a parameter will be
* shut down only after it finishes executing all items remaining
* in its communication queue;
* Note that the service thread will be eventually removed from
* the service thread map when the ShutdownServiceThreadCommQueueItem
* executes the shutdownServiceThreadImmediately method.
*
* This method is package private. No client should ever stop a service.
*
* @param id
*/
void stopServiceGracefully(ServiceId id) {
infoBus.postItem(id, new ShutdownServiceThreadCommQueueItem(this));
}
/**
* Tries to shut down all service threads immediately, i.e. after
* each thread finishes executing its current item. It is guaranteed
* that all service threads will be shut down after this method is called,
* but there is absolutely no guarantee that all the currently performing
* tasks will be completed.
*/
public void shutdownImmediately() {
logInfo("Immediate shutdown process was initiated");
Set<ServiceId> keys = serviceThreads.keySet();
for (ServiceId id : keys) {
ServiceThread serviceThread = serviceThreads.get(id);
if (serviceThread == null) {
continue;
}
stopServiceImmediately(id);
}
threadPoolExecutor.shutdownNow();
}
/**
* Gracefully shuts down all service threads, i.e. each service thread
* will execute all items remaining in its communication queue
* before exiting.
* NOTE: it is not guaranteed that all the threads will be actually shut down
* after this method is called. If some of them are performing some indefinitely
* long tasks, they will never be stopped by this method. So, use it only when it is
* well known that all the tasks are relatively small, or use it together with
* shutdownImmediately() method, which will be called after some time is passed and
* this method produced no desired results.
*/
public void shutdownGracefully() {
logInfo("Graceful shutdown process was initiated");
Set<ServiceId> keys = serviceThreads.keySet();
// We have to clone the key set to avoid ConcurrentModificationException
// if the iterator fast-fails
Object[] keysClone = keys.toArray();
for (Object obj : keysClone) {
ServiceId id = (ServiceId)obj;
ServiceThread serviceThread = serviceThreads.get(id);
if (serviceThread == null) {
continue;
}
stopServiceGracefully(id);
}
threadPoolExecutor.shutdown();
}
/**
* Returns the central InfoBus of the application. Using this method
* all communication items may post items to one another
*
* @return InfoBus
*/
public InfoBus getInfoBus() {
return infoBus;
}
private void logInfo(String msg) {
if (logger != null) {
logger.info(msg);
}
}
private void logSevere(String msg) {
if (logger != null) {
logger.severe(msg);
}
}
}
|