package net.catpad.infobus;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Asynchronous communication queue. All message passing in the system is
* performed by means of this queue.
* Note: this class is package private - no one is supposed to use it
* except the InfoBus itself.
*
* @author Michael Gertelman
*
*/
final class CommQueue<Item extends CommQueueItem> {
private final LinkedBlockingQueue<Item> queue;
CommQueue() {
queue = new LinkedBlockingQueue<Item>();
}
/**
* This method inserts a new item into the queue and returns immediately
*/
void putItem(Item item) {
try {
queue.put(item);
} catch (InterruptedException e1) {
// can never happen since the queue has unlimited capacity
} catch (NullPointerException e2) {
// null item is inserted - just skip it
}
}
/**
* This method will not return until:
* a) there is a new item in the queue;
* b) the waiting was interrupted by InterrupedException
*/
Item getItem() throws InterruptedException {
while (true) {
// Try to take an item from the queue;
// Wait indefinitely or until an InterruptedException
Item item = queue.take();
// If the item is null just continue waiting
if (item == null) {
continue;
}
// Otherwise, return the item
return item;
} // while
}
int size() {
return queue.size();
}
void clear() {
queue.clear();
}
}
|