package net.catpad.infobus;

import java.util.HashMap;


/**
 * This class is the heart of the application messaging mechanism.
 * All communication between various threads in the system works
 * by means of the asynchronous communication queues (@see CommQueue).
 * These queues are created and maintained inside the InfoBus.
 
 * This way of communication guarantees that all messages sent from any 
 * thread to any other thread will be kept in the appropriate queue until
 * they are taken when a thread listening to this queue becomes interested
 * in the message. Moreover, the sender thread does not need to worry about
 * its recipient's thread existence.
 
 * Important note: the thread sending a message is never blocked.
 * The thread receiving a message is always blocked until the message becomes
 * available. In general, the fact that a thread is willing to receive a message,
 * indicates that this thread has nothing more to do and is ready to be blocked
 * indefinitely until the new message arrives.
 
 @author Michael Gertelman
 *
 */
public final class InfoBus {
  
  private HashMap<ServiceId, CommQueue<? extends CommQueueItem>> queues = null;

  /**
   * The constuctor of this class is package private. 
   * Only ServiceLauncher can create InfoBus.
   */
  InfoBus() {
    queues = new HashMap<ServiceId, CommQueue<? extends CommQueueItem>>(10);
  }
  

  /**
   * This method returns immediately and it is guaranteed that the message
   * will be inserted in the queue: even if the queue does not exist yet,
   * it will be created.
   
   * This method should NEVER be called by the RECIPIENT service itself,
   * ONLY be the SENDER service.
   
   @param <Item>
   @param recipientService: the class of the service we want
   * to send a message to
   @param item
   */
  @SuppressWarnings("unchecked")
  public <Item extends CommQueueItem> void postItem(
      ServiceId recipientService, Item item) {

    CommQueue<Item> queue = null;

    synchronized (queues) {
      queue = (CommQueue<Item>queues.get(recipientService);
      if (queue == null) {
        queue = new CommQueue<Item>();
        queues.put(recipientService, queue);
      }
      // Note: putItem() is INSIDE the synchronized block because
      // 1) this function returns immediately and
      // 2) there can be several simultaneous calls to this function
      queue.putItem(item);      
    }
  }

  /**
   * This method blocks until there is a message in the recipient's service queue.
   * If the queue of this service does not exist, it will be created.
   * If the function throws InterruptedException the service should finish its work.
   
   * This method should ONLY be called by the RECIPIENT service itself,
   * NEVER by the SENDER service.
   
   * NOTE: this method is package private, i.e. it cannot be called by an outside client.
   * It is called only from the ServiceThread class.
   
   @param <Item>
   @param recipientService: the class of the service which is interested
   * in the message
   @return Communication Queue Item
   @throws InterrupedException when the queue is interrupted.
   */
  @SuppressWarnings("unchecked")
  <Item extends CommQueueItem> Item getItem(ServiceId recipientService)
      throws InterruptedException {

    CommQueue<Item> queue = null;

    synchronized (queues) {
      queue = (CommQueue<Item>queues.get(recipientService);
      if (queue == null) {
        queue = new CommQueue<Item>();
        queues.put(recipientService, queue);
      }
    }
    // Note: getItem() is OUTSIDE of the synchronized block because
    // 1) this function blocks indefinitely and
    // 2) there can be only one recipient service
    Item item = queue.getItem();

    return item;
  }

  /**
   * Returns the current size of the queue of a given service thread.
   @param <Item>
   @param serviceId
   @return queue size
   */
  @SuppressWarnings("unchecked")
  public <Item extends CommQueueItem> int getQueueSize(ServiceId serviceId) {
    
    CommQueue<Item> queue = null;
    
    synchronized (queues) {
      queue = (CommQueue<Item>queues.get(serviceId);
      if (queue == null) {
        return 0;
      }
      return queue.size();
    }
  }

  /**
   * Removes all pending items from a given service thread's queue.
   @param <Item>
   @param id
   */
  @SuppressWarnings("unchecked")
  public <Item extends CommQueueItem> void clearItems(ServiceId id) {
    
    CommQueue<Item> queue = null;
    
    synchronized (queues) {
      queue = (CommQueue<Item>queues.get(id);
      if (queue == null) {
        return// no need to create a new queue
      }
      queue.clear();
    }    
  }
}