Queues processing problem, any suggestions?

Problem description

Let's say we have N queues with packets waiting for processing. Queues are indexed from 0..N-1. Queue no 0 is the highest priority. Queue no N-1 is the lowest priority. Packets are being added to all queues asynchronously all the time and there is one thread reading packets from the queues.

Question

How to efficiently empty all the queues in a single thread and make sure packets from the highest priority queues are always processed as soon as possible.

Starting point

// Queues are kept in a map as below.
// EnumMap<Priority, LinkedBlockingQueue<Packet>> queues
while (! stopped) {
  try {
    // Your code starts here:
    ................
    // Your code ends here.
    // Let's make sure there was nothing added to any queue in the meantime
    synchronized (queues) {
      boolean added = false;
      for (LinkedBlockingQueue<Packet> tmpqueue : queues.values()) {
        if (added = (tmpqueue.size() > 0)) {
          break;
        }
      }
      if (!added) {
        queues.wait();
      }
    }
  } catch (Exception e) {
    log.log(Level.SEVERE, "[" + getName() +
      "] Exception during packet processing: " + packet.toString(), e);
  } // end of try-catch
} // end of while (! stopped)

Any suggestions are very welcomed.

 

Comments

 Current implementation

 Current implementation looks like this:

// First take the first - highest priority queue...
int queue_idx = 0;
do {
  LinkedBlockingQueue<Packet> queue = queues.get(pr_cache[queue_idx]);
  // Check higher priority queues first:
  for (int i = 0; i < queue_idx; i++) {
    if (queues.get(pr_cache[i]).size() > 0) {
      queue_idx = i;
      queue = queues.get(pr_cache[queue_idx]);
      break;
    }
  }
  // Now process next waiting packet
  if ((packet = queue.poll()) != null) {
    processPacket(packet);
  } else {
    ++queue_idx;
  }
} while (packet != null || queue_idx < pr_cache.length);

 

tramadol hcl >:-OOO accutane

Using no-arg poll in a loop

Using no-arg poll in a loop is inefficient, as it will make the loop spin very fast when all queues are empty. You'll want to have some place where the execution blocks once all queues are empty, waiting for new elements to process. Since those new elements can show up in any one queue, you can not use the blocking methods in a specific BlockingQueue but you need instead to use wait() on some object that gets a notify() when an element is added to any one queue. Feel free to have a look at my implementation, linked above :)

accutane online 434 accutane

Thank you for your comments.

Thank you for your comments. However if you look at the "Starting point" section you can see there is a 'wait' instruction. It is executed once ALL queues are found to be empty. The code from my comment goes to the place between "Your code starts here" and "Your code ends here". Therefore spinning quickly and eating up CPU is not an issue in this case. (I hope).

 Now I see, I did read the

 Now I see, I did read the code a bit carelessly before.

What an interesting problem.

What an interesting problem. I would suggest that you create a separate queue class that provides the functionality you need with put() and take() methods. That way the queue doesn't need to have knowledge about the specifics of the consumer. For example you might want to use multiple consumer threads that reads data off the queue at a later date.

My take on such a class, feel free to use it any way you want:

http://resare.com/noa/ref/MultiPrioQueue.java

It uses a single lock and is written with readability as the primary goal, but it preforms quite okay anyway, handling 10 million elements in about 20 secs from 100 different writer threads on my dual core MacBook Pro.

If you need any additional features, please let me know :)

 Thanks for this. This is

 Thanks for this. This is very interesting indeed. In fact the implementation is quite similar to my current implementation but it is encapsulated inside a separate class. There is one problem however with your code which I tried to avoid in my solution.

Let's say you have 2 priority queues: HIGH and NORMAL. You are now reading from the NORMAL queue - the variable lowestNonEmpty points to this queue and this queue is not empty. This queue is still filled up by some thread but in the meantime another thread adds something to the HIGH priority queue. It may happen that the HIGH priority queue may not be emptied for a long time, in theory never....

Since the take() method is

Since the take() method is exited once for each packet processed, some other thread can get scheduled with a put() invocation between two take() invocations. Each put() invocation can possibly update the lowestNonEmpty variable and point the next take() invocation to queue with higher priority.

One thing I found out when coding a test case for my class is that locking performance doesn't matter all that much. I wrote the simplest solution I could come up with with one global lock for both public methods, and it still performs in a way that would make me very surprised if it would ever become something of interest to optimize.

For the sake of readability however, I would suggest replacing LinkedBlockingQueue with a simple LinkedList, since the blocking or locking aspects of that class is not used.

Indeed, with the put()

Indeed, with the put() method updating the lowestNonEmpty variable it would be the most optimal solution I have seen so far. For sure it would work better than my current code. Thanks for that. I will replace my code with your suggestion if you have no objections.

Thanks a lot.

 

tramadol for dogs 5454

Here's a bit simpler code

Here's a bit simpler code that should do the same thing (I didn't test it)

do {
 for(int i = 0; i < pr_cache.length; i++) {
   packet = queues.get(pr_cache[i]).poll(); 
   if(packet != null) {
     processPacket(packet);
     break;
   }
 }
} while (packet != null)

poll does size check without synchronization, so this is essentially the same code..

Matti, thank you for your

Matti, thank you for your code. This is highly efficient loop and I am sure it would perform well. There are however 2 problems with this code I tried to avoid in my implementation. Let's say you have 3 queues with priorities: HIGH, NORMAL and LOW.

2 first queues have like 100 elements waiting for processing while the last one is empty. Your loop exists after checking the last queue even though there are more packets waiting in other queues. Of course this would be caught later on by the code which puts the thread in 'wait' state so your loop would be executed again but it all together could be less efficient.

The second problem is that your code offers fair processing policy. Which means all queues are emptied with exactly the same speed and there is really no priority for processing elements in HIGH priority queue over LOW priority queue assuming they both are full. The goal however is to always process higher priority elements first, then switch to lower priority queue.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.