package net.catpad.infobus;
import java.util.HashMap;
/**
* This class is the heart of the application messaging mechanism.
* All communication between various threads in the system works
* by means of the asynchronous communication queues (@see CommQueue).
* These queues are created and maintained inside the InfoBus.
*
* This way of communication guarantees that all messages sent from any
* thread to any other thread will be kept in the appropriate queue until
* they are taken when a thread listening to this queue becomes interested
* in the message. Moreover, the sender thread does not need to worry about
* its recipient's thread existence.
*
* Important note: the thread sending a message is never blocked.
* The thread receiving a message is always blocked until the message becomes
* available. In general, the fact that a thread is willing to receive a message,
* indicates that this thread has nothing more to do and is ready to be blocked
* indefinitely until the new message arrives.
*
* @author Michael Gertelman
*
*/
public final class InfoBus {
private HashMap<ServiceId, CommQueue<? extends CommQueueItem>> queues = null;
/**
* The constuctor of this class is package private.
* Only ServiceLauncher can create InfoBus.
*/
InfoBus() {
queues = new HashMap<ServiceId, CommQueue<? extends CommQueueItem>>(10);
}
/**
* This method returns immediately and it is guaranteed that the message
* will be inserted in the queue: even if the queue does not exist yet,
* it will be created.
*
* This method should NEVER be called by the RECIPIENT service itself,
* ONLY be the SENDER service.
*
* @param <Item>
* @param recipientService: the class of the service we want
* to send a message to
* @param item
*/
@SuppressWarnings("unchecked")
public <Item extends CommQueueItem> void postItem(
ServiceId recipientService, Item item) {
CommQueue<Item> queue = null;
synchronized (queues) {
queue = (CommQueue<Item>) queues.get(recipientService);
if (queue == null) {
queue = new CommQueue<Item>();
queues.put(recipientService, queue);
}
// Note: putItem() is INSIDE the synchronized block because
// 1) this function returns immediately and
// 2) there can be several simultaneous calls to this function
queue.putItem(item);
}
}
/**
* This method blocks until there is a message in the recipient's service queue.
* If the queue of this service does not exist, it will be created.
* If the function throws InterruptedException the service should finish its work.
*
* This method should ONLY be called by the RECIPIENT service itself,
* NEVER by the SENDER service.
*
* NOTE: this method is package private, i.e. it cannot be called by an outside client.
* It is called only from the ServiceThread class.
*
* @param <Item>
* @param recipientService: the class of the service which is interested
* in the message
* @return Communication Queue Item
* @throws InterrupedException when the queue is interrupted.
*/
@SuppressWarnings("unchecked")
<Item extends CommQueueItem> Item getItem(ServiceId recipientService)
throws InterruptedException {
CommQueue<Item> queue = null;
synchronized (queues) {
queue = (CommQueue<Item>) queues.get(recipientService);
if (queue == null) {
queue = new CommQueue<Item>();
queues.put(recipientService, queue);
}
}
// Note: getItem() is OUTSIDE of the synchronized block because
// 1) this function blocks indefinitely and
// 2) there can be only one recipient service
Item item = queue.getItem();
return item;
}
/**
* Returns the current size of the queue of a given service thread.
* @param <Item>
* @param serviceId
* @return queue size
*/
@SuppressWarnings("unchecked")
public <Item extends CommQueueItem> int getQueueSize(ServiceId serviceId) {
CommQueue<Item> queue = null;
synchronized (queues) {
queue = (CommQueue<Item>) queues.get(serviceId);
if (queue == null) {
return 0;
}
return queue.size();
}
}
/**
* Removes all pending items from a given service thread's queue.
* @param <Item>
* @param id
*/
@SuppressWarnings("unchecked")
public <Item extends CommQueueItem> void clearItems(ServiceId id) {
CommQueue<Item> queue = null;
synchronized (queues) {
queue = (CommQueue<Item>) queues.get(id);
if (queue == null) {
return; // no need to create a new queue
}
queue.clear();
}
}
}
|