package net.catpad.infobus;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;


/**
 * This class maintains the service thread pool of the application.
 
 * NOTE: none of this class public methods except getInfoBus() method
 * should be called from classes inheriting from CommQueueItem, i.e.
 * from the items (classes) containing the application logic and executing in the
 * service thread contexts.
 
 * The thread pool implements the failover architecture: when a service thread fails
 * on an uncaught exception, the thread pool of ServiceLauncher will try to restart it.
 
 @see ServiceThread for much more details about service threads.
 
 @author Michael Gertelman
 *
 */
public final class ServiceLauncher {

  /**
   * This inner class will be responsible for the creation of all the service threads
   * and keeping them alive in the thread pool.
   */
  class ServiceThreadPoolExecutor extends ThreadPoolExecutor {

    ServiceThreadPoolExecutor() {
    
       super(Integer.MAX_VALUE, // Core Pool Size - always create a new thread 
           Integer.MAX_VALUE, // Maximum Pool Size 
           Long.MAX_VALUE,    // Keep Alive Time - run forever (never terminate anything)
           TimeUnit.SECONDS,  
                 new SynchronousQueue<Runnable>(),       // Work Queue - hands off tasks to threads without otherwise holding them
                 Executors.defaultThreadFactory(),       // Thread Factory 
                 new ThreadPoolExecutor.AbortPolicy());  // the handler throws a runtime RejectedExecutionException upon task rejection 
 
    }

    /**
     * Allow the logging before executing the new service.
     * This method can be used for some additional pre-execution activities.
     */
    protected void beforeExecute(Thread t, Runnable r) {
      super.beforeExecute(t, r);
      logInfo("Thread [" + r.toString() "] is going to start");
    }

    /**
     * This method is crucual for keeping the application alive even
     * after an uncaught exception.
     * When a service thread throws an exception that is not treated
     * and left uncaught, the thread is terminated and afterExecute() method
     * is called. The ServiceThreadPoolExecutor will then try to restart the failed service.     
     */
  
    protected void afterExecute(Runnable r, Throwable t) {      
      super.afterExecute(r, t);
      
      if (r != null) {
        logSevere("Thread [" + r.toString() "] has been terminated");
        
        // Remove failed task from the executor's internal queue
        // NOTE: this is important, otherwise we can have an indefinitely
        // growing queue and OutOfMemory error if the failures are frequent
        remove(r);
        
      else {
        logSevere("Thread [" + r.toString() " can't be restarted");
        return;        
      }
      
      if (t != null) {
        logSevere("Termination caused by: " + t.getMessage() "\n");
        StackTraceElement[] ste = t.getStackTrace();
        StringBuffer buf = new StringBuffer();
        for (int i = 0; i < ste.length; i++) {
          buf.append(ste[i].toString()).append("\n");
        }
        buf.append("\n\n");
        logSevere(buf.toString());
      }
            
      // And restart the service      
      try {
                
        ServiceThread serviceThread = (ServiceThread)r;
        ServiceId id = serviceThread.getServiceId();
        
        // Remove this service from the service thread map before
        // trying to restart it again
        serviceThreads.remove(id);        
        
        // If the thread was stopped intentionally - don't restart it
        if (serviceThread.shouldExit()) {
          logInfo(id.toString() " was stopped intentionally; Exiting.");
          return;
        }
        
        logInfo("Restarting " + id.toString() " after an uncaught exception...");        
        try {
          startService(id);
        catch (ServiceThreadAlreadyRunningException e) {
          // This cannot happen because we have already deleted this service
          // from the service thread map
        }
        
      catch (ClassCastException e) { // This should never happen, but just in case
        logSevere("Terminated thread is not of type ServiceThread - cannot restart");
      }
    }
    
    protected void terminated() {
      super.terminated();
      logInfo("ServiceThreadPoolExecutor has been terminated");
    }  
  
  // ServiceThreadPoolExecutor class
  
    
  private Logger logger;
  
  /**
   * Central InfoBus of the application
   */
  private InfoBus infoBus;
  
  /**
   * All service threads created so far
   */
  private Map<ServiceId,ServiceThread> serviceThreads;
  
  /**
   * Thread pool maintaining all service threads
   */
  private ServiceThreadPoolExecutor threadPoolExecutor;
  
  /**
   * Create ServiceLauncher without a logger. 
   * No log information will be outputted.
   *
   */
  public ServiceLauncher() {
    this(null);
  }
  
  /**
   * Create ServiceLauncher with a logger.
   @param logger
   */
  public ServiceLauncher(Logger logger) {
    this.logger = logger;
    
    serviceThreads = Collections.synchronizedMap(new HashMap<ServiceId,ServiceThread>());
    infoBus = new InfoBus();
    threadPoolExecutor = new ServiceThreadPoolExecutor();
  }
  
  /**
   * Creates and starts a service thread given its unique id
   
   @param id
   */
  public synchronized void startService(ServiceId idthrows ServiceThreadAlreadyRunningException {
    ServiceThread serviceThread = serviceThreads.get(id);
    if (serviceThread != null) {
      throw new ServiceThreadAlreadyRunningException(id);
    }
    serviceThread = new ServiceThread(id, infoBus, logger);
    threadPoolExecutor.execute(serviceThread);
    serviceThreads.put(id, serviceThread);
  }    
    
  /**
   * This methods helps when one needs to start many services without having to name
   * each one of them specifically. The services will get their ids as an array of
   * consecutive integer starting from "from" and until the "to".
   * All created services will be immediately started and the list of the created
   * service ids will be returned.
   * Overall the (to - from + 1) services will be created and started.
   
   @param from where to start giving ids to the new services
   @param to where to stop giving ids to the new services 
   @param namePrefix the prefix wich will be added to the name of the service plus its index
   @return a list of the created service ids
   @throws ServiceThreadAlreadyRunningException
   */
  public List<ServiceId> startServices(int from, int to, String namePrefix
                        throws ServiceThreadAlreadyRunningException {
    
    ArrayList<ServiceId> ids = new ArrayList<ServiceId>();
    for (int i = from; i <= to; i++) {
      ServiceId id = new ServiceId(i, namePrefix + i);
      ids.add(id);
      startService(id);
    }
    
    return ids;
  }
  
  /**
   * Allows the thread calling shutdownImmediately() or shutdownGracefully()
   * to wait until all service threads finish their execution.
   * The method will block indefinitely until all service threads exit.
   
   @throws InterruptedException
   */
  public void join() throws InterruptedException {    
    threadPoolExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
  }
  
  /**
   * The service thread whose id is passed as a parameter will be
   * shut down immediately (after executing its current item)
   * regardless of how many items remain in its communication queue.
   
   * This method is package private. No client should ever stop a service.
   
   @param service id
   */
  synchronized void stopServiceImmediately(ServiceId id) {
    ServiceThread serviceThread = serviceThreads.get(id);
    if (serviceThread != null) {
      serviceThread.doExit();
    }
    serviceThreads.remove(serviceThread);
    
    serviceThread.interrupt();
  }

  /**
   * The service thread whose id is passed as a parameter will be
   * shut down only after it finishes executing all items remaining
   * in its communication queue;
   * Note that the service thread will be eventually removed from 
   * the service thread map when the ShutdownServiceThreadCommQueueItem
   * executes the shutdownServiceThreadImmediately method.
   
   * This method is package private. No client should ever stop a service.
   
   @param id
   */
  void stopServiceGracefully(ServiceId id) {
    infoBus.postItem(id, new ShutdownServiceThreadCommQueueItem(this));
  }
  
  /**
   * Tries to shut down all service threads immediately, i.e. after
   * each thread finishes executing its current item. It is guaranteed
   * that all service threads will be shut down after this method is called,
   * but there is absolutely no guarantee that all the currently performing
   * tasks will be completed.
   */
  public void shutdownImmediately() {
    
    logInfo("Immediate shutdown process was initiated");
    
    Set<ServiceId> keys = serviceThreads.keySet();
    for (ServiceId id : keys) {
      ServiceThread serviceThread = serviceThreads.get(id);
      if (serviceThread == null) {
        continue;
      }
      stopServiceImmediately(id);
    }    
    
    threadPoolExecutor.shutdownNow();
  }
  
  /**
   * Gracefully shuts down all service threads, i.e. each service thread
   * will execute all items remaining in its communication queue 
   * before exiting.
   * NOTE: it is not guaranteed that all the threads will be actually shut down
   * after this method is called. If some of them are performing some indefinitely
   * long tasks, they will never be stopped by this method. So, use it only when it is
   * well known that all the tasks are relatively small, or use it together with 
   * shutdownImmediately() method, which will be called after some time is passed and
   * this method produced no desired results.
   */
  public void shutdownGracefully() {
    
    logInfo("Graceful shutdown process was initiated");
    
    Set<ServiceId> keys = serviceThreads.keySet();    
    // We have to clone the key set to avoid ConcurrentModificationException
    // if the iterator fast-fails
    Object[] keysClone = keys.toArray();
    
    for (Object obj : keysClone) {
      ServiceId id = (ServiceId)obj;
      ServiceThread serviceThread = serviceThreads.get(id);
      if (serviceThread == null) {
        continue;
      }
      stopServiceGracefully(id);
    }
    
    threadPoolExecutor.shutdown();
  }
  
    /**
     * Returns the central InfoBus of the application. Using this method
     * all communication items may post items to one another
     
     @return InfoBus
     */
  public InfoBus getInfoBus() {
    return infoBus;
  }    

  private void logInfo(String msg) {
    if (logger != null) {
      logger.info(msg);
    }
  }

  private void logSevere(String msg) {
    if (logger != null) {
      logger.severe(msg);
    }
  }
  
}