package net.catpad.infobus;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
* This is the abstraction of a service thread.
* The service thread is a daemon, because it provides services for other threads. Thus,
* the JVM will exit when the only remaining threads are service threads (which means
* there are no more threads to provide services for).
*
* The lifecycle of a service thread is as follows:
* 1) ServiceLauncher will start a service using the startService(serviceId) method;
* 2) Service thread is blocked on the InfoBus's getItem() method until an item arrives;
* 3) When there is an item posted for this service, this item's execute() method will be
* called and executed in the context of this service thread;
* 4) If the item's task fails gracefully, the CannotExecuteCommQueueItemException will be thrown
* and the service will continue to wait for other items;
* 5) If an uncaught exception is thrown and the service will be unexpectedly terminated,
* ServiceLauncher will try to restart the service;
* 6) The service will stop when one of the following methods of ServiceLauncher are called:
* a) shutdownImmediately() will interrupt all services executing their curent tasks and
* ServiceLauncher will not try to restart them;
* b) shutdownGracefully() will post a ShutdownServiceThreadCommQueueItem to each one of
* the services making them to execute all pending items in their InfoBus queues and then
* interrupting their work.
*
* NOTE: clients of the service thread should never stop the service intentionally
* (and, accordingly, there is no public method to do so). The purpose of the service is
* to run indefinitely until the application exits. If there is no task for the service thread
* to perform, the thread will be blocked indefinitely on its InfoBus queue.
* Thus, the lack of tasks for a service is practically equal to the situation when the thread
* is not running.
*
* NOTE: clients should never (and have no means to) create service threads explicitly.
* The service can be started using the startService(serviceId) method of ServiceLauncher.
*
* @author Michael Gertelman
*/
final class ServiceThread extends Thread {
private InfoBus infoBus = null;
private ServiceId serviceId = null;
private Logger logger = null;
private AtomicBoolean doExit = new AtomicBoolean(false);
/**
* The constructor is package prive. Only ServiceLauncher can create service threads.
*
* @param serviceId
* @param infoBus
* @param logger (can be null)
*/
ServiceThread(ServiceId serviceId, InfoBus infoBus, Logger logger) {
this.serviceId = serviceId;
this.infoBus = infoBus;
this.logger = logger;
setDaemon(true);
}
private void logInfo(String msg) {
if (logger != null) {
logger.info(msg);
}
}
private void logSevere(String msg) {
if (logger != null) {
logger.severe(msg);
}
}
public void run() {
logInfo("Service thread " + serviceId.toString() + " has been started");
try {
// Run forever
while (true) {
CommQueueItem item = null;
if (doExit.get() == true) {
logInfo("Service thread " + serviceId.toString()
+ " has been terminated");
return;
}
// Pick a communication item from the InfoBus;
// Note: getItem() method blocks until a new item
// arrives or an InterruptedException is thrown
item = infoBus.getItem(serviceId);
// The new commmunication item was picked up from the InfoBus;
// Let's execute it: this is where the actual logic of the
// service is contained
try {
item.execute(serviceId);
} catch (CannotExecuteCommQueueItemException e) {
Throwable t = e.getCause();
if (t != null) {
logSevere(serviceId
+ ": Exception while executing communication queue item :"
+ e.getMessage() + "\nCaused by: "
+ t.getMessage() + "\n");
StackTraceElement[] ste = t.getStackTrace();
StringBuffer buf = new StringBuffer();
for (int i = 0; i < ste.length; i++) {
buf.append(ste[i].toString()).append("\n");
}
buf.append("\n\n");
logSevere(buf.toString());
} else {
logSevere(serviceId
+ ": Exception while executing communication queue item :"
+ e.getMessage());
}
}
} // while (true)
} catch (InterruptedException e) {
// The queue was interrupted - end work
logInfo("Service thread " + serviceId.toString()
+ " has been interrupted");
return;
}
} // run
public String toString() {
return serviceId.toString();
}
/**
* @return Service Id of this thread
*/
public ServiceId getServiceId() {
return serviceId;
}
/**
* This is the package private method. It is called by the ShutdownServiceThreadCommQueueItem
* when a user called the shutdownServiceThreadGracefully method of ServiceLauncher
*/
void doExit() {
doExit.set(true);
}
boolean shouldExit() {
return doExit.get();
}
}
|