Concurrent Set Queue

Steve Skrla picture Steve Skrla · Jun 25, 2010 · Viewed 14.9k times · Source

Maybe this is a silly question, but I cannot seem to find an obvious answer.

I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

Answer

Kevin Bourrillion picture Kevin Bourrillion · Jun 25, 2010

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>
    implements Set<E>, Queue<E> {
  private enum Dummy { VALUE }

  private final ConcurrentMap<E, Dummy> map;

  ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {
    super(map.keySet());
    this.map = Preconditions.checkNotNull(map);
  }

  @Override public boolean add(E element) {
    return map.put(element, Dummy.VALUE) == null;
  }

  @Override public boolean addAll(Collection<? extends E> newElements) {
    // just the standard implementation
    boolean modified = false;
    for (E element : newElements) {
      modified |= add(element);
    }
    return modified;
  }

  @Override public boolean offer(E element) {
    return add(element);
  }

  @Override public E remove() {
    E polled = poll();
    if (polled == null) {
      throw new NoSuchElementException();
    }
    return polled;
  }

  @Override public E poll() {
    for (E element : this) {
      // Not convinced that removing via iterator is viable (check this?)
      if (map.remove(element) != null) {
        return element;
      }
    }
    return null;
  }

  @Override public E element() {
    return iterator().next();
  }

  @Override public E peek() {
    Iterator<E> iterator = iterator();
    return iterator.hasNext() ? iterator.next() : null;
  }
}

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.