Let's say we have N queues with packets waiting for processing. Queues are indexed from 0..N-1. Queue no 0 is the highest priority. Queue no N-1 is the lowest priority. Packets are being added to all queues asynchronously all the time and there is one thread reading packets from the queues.
How to efficiently empty all the queues in a single thread and make sure packets from the highest priority queues are always processed as soon as possible.
// Queues are kept in a map as below.
// EnumMap<Priority, LinkedBlockingQueue<Packet>> queues
while (! stopped) {
try {
// Your code starts here:
................
// Your code ends here.
// Let's make sure there was nothing added to any queue in the meantime
synchronized (queues) {
boolean added = false;
for (LinkedBlockingQueue<Packet> tmpqueue : queues.values()) {
if (added = (tmpqueue.size() > 0)) {
break;
}
}
if (!added) {
queues.wait();
}
}
} catch (Exception e) {
log.log(Level.SEVERE, "[" + getName() +
"] Exception during packet processing: " + packet.toString(), e);
} // end of try-catch
} // end of while (! stopped)
Any suggestions are very welcomed.
Comments
Current implementation
Current implementation looks like this:
// First take the first - highest priority queue... int queue_idx = 0; do { LinkedBlockingQueue<Packet> queue = queues.get(pr_cache[queue_idx]); // Check higher priority queues first: for (int i = 0; i < queue_idx; i++) { if (queues.get(pr_cache[i]).size() > 0) { queue_idx = i; queue = queues.get(pr_cache[queue_idx]); break; } } // Now process next waiting packet if ((packet = queue.poll()) != null) { processPacket(packet); } else { ++queue_idx; } } while (packet != null || queue_idx < pr_cache.length);tramadol hcl >:-OOO accutane
tramadol hcl >:-OOO accutane online dnk ultram tchb prednisone online 431632
Using no-arg poll in a loop
Using no-arg poll in a loop is inefficient, as it will make the loop spin very fast when all queues are empty. You'll want to have some place where the execution blocks once all queues are empty, waiting for new elements to process. Since those new elements can show up in any one queue, you can not use the blocking methods in a specific BlockingQueue but you need instead to use wait() on some object that gets a notify() when an element is added to any one queue. Feel free to have a look at my implementation, linked above :)
accutane online 434 accutane
accutane online
434 accutane 123932 valium 1694 ambien sleep eating =-PPP
buy valium online cheap 8)
buy valium online cheap 8) online pharmacy ultram mozilla 9813 order prednisone 0782 cialis sales uylou
Thank you for your comments.
Thank you for your comments. However if you look at the "Starting point" section you can see there is a 'wait' instruction. It is executed once ALL queues are found to be empty. The code from my comment goes to the place between "Your code starts here" and "Your code ends here". Therefore spinning quickly and eating up CPU is not an issue in this case. (I hope).
Now I see, I did read the
Now I see, I did read the code a bit carelessly before.
What an interesting problem.
What an interesting problem. I would suggest that you create a separate queue class that provides the functionality you need with put() and take() methods. That way the queue doesn't need to have knowledge about the specifics of the consumer. For example you might want to use multiple consumer threads that reads data off the queue at a later date.
My take on such a class, feel free to use it any way you want:
http://resare.com/noa/ref/MultiPrioQueue.java
It uses a single lock and is written with readability as the primary goal, but it preforms quite okay anyway, handling 10 million elements in about 20 secs from 100 different writer threads on my dual core MacBook Pro.
If you need any additional features, please let me know :)
Thanks for this. This is
Thanks for this. This is very interesting indeed. In fact the implementation is quite similar to my current implementation but it is encapsulated inside a separate class. There is one problem however with your code which I tried to avoid in my solution.
Let's say you have 2 priority queues: HIGH and NORMAL. You are now reading from the NORMAL queue - the variable lowestNonEmpty points to this queue and this queue is not empty. This queue is still filled up by some thread but in the meantime another thread adds something to the HIGH priority queue. It may happen that the HIGH priority queue may not be emptied for a long time, in theory never....
Since the take() method is
Since the take() method is exited once for each packet processed, some other thread can get scheduled with a put() invocation between two take() invocations. Each put() invocation can possibly update the lowestNonEmpty variable and point the next take() invocation to queue with higher priority.
One thing I found out when coding a test case for my class is that locking performance doesn't matter all that much. I wrote the simplest solution I could come up with with one global lock for both public methods, and it still performs in a way that would make me very surprised if it would ever become something of interest to optimize.
For the sake of readability however, I would suggest replacing LinkedBlockingQueue with a simple LinkedList, since the blocking or locking aspects of that class is not used.
buy accutane :O aciphex
buy accutane :O aciphex phentermine pharmacy washington dc
158 generic brands for ultram 8-OO prednisone 314
Indeed, with the put()
Indeed, with the put() method updating the lowestNonEmpty variable it would be the most optimal solution I have seen so far. For sure it would work better than my current code. Thanks for that. I will replace my code with your suggestion if you have no objections.
Thanks a lot.
tramadol for dogs 5454
tramadol for dogs 5454 tramadol %-OOO accutane online zwfey order prednisone
=-]]
Here's a bit simpler code
Here's a bit simpler code that should do the same thing (I didn't test it)
do {
for(int i = 0; i < pr_cache.length; i++) {
packet = queues.get(pr_cache[i]).poll();
if(packet != null) {
processPacket(packet);
break;
}
}
} while (packet != null)
poll does size check without synchronization, so this is essentially the same code..
Matti, thank you for your
Matti, thank you for your code. This is highly efficient loop and I am sure it would perform well. There are however 2 problems with this code I tried to avoid in my implementation. Let's say you have 3 queues with priorities: HIGH, NORMAL and LOW.
2 first queues have like 100 elements waiting for processing while the last one is empty. Your loop exists after checking the last queue even though there are more packets waiting in other queues. Of course this would be caught later on by the code which puts the thread in 'wait' state so your loop would be executed again but it all together could be less efficient.
The second problem is that your code offers fair processing policy. Which means all queues are emptied with exactly the same speed and there is really no priority for processing elements in HIGH priority queue over LOW priority queue assuming they both are full. The goal however is to always process higher priority elements first, then switch to lower priority queue.