From 28e17af5002b7da0b8376e266acee8198688698d Mon Sep 17 00:00:00 2001 From: Alexey Andreev Date: Sat, 20 Jan 2018 23:06:54 +0300 Subject: [PATCH] Implement ArrayBlockingQueue --- .../org/teavm/classlib/java/lang/TObject.java | 1 + .../org/teavm/classlib/java/lang/TThread.java | 2 +- .../teavm/classlib/java/util/TArrayDeque.java | 5 - .../org/teavm/classlib/java/util/TQueue.java | 5 - .../util/concurrent/TArrayBlockingQueue.java | 495 ++++++++++++++++++ .../java/util/concurrent/TBlockingQueue.java | 35 ++ .../java/util/concurrent/TTimeUnit.java | 82 +++ .../concurrent/ArrayBlockingQueueTest.java | 245 +++++++++ 8 files changed, 859 insertions(+), 11 deletions(-) create mode 100644 classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TArrayBlockingQueue.java create mode 100644 classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TBlockingQueue.java create mode 100644 classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TTimeUnit.java create mode 100644 tests/src/test/java/org/teavm/classlib/java/util/concurrent/ArrayBlockingQueueTest.java diff --git a/classlib/src/main/java/org/teavm/classlib/java/lang/TObject.java b/classlib/src/main/java/org/teavm/classlib/java/lang/TObject.java index 8ac28cdeb..c5445b008 100644 --- a/classlib/src/main/java/org/teavm/classlib/java/lang/TObject.java +++ b/classlib/src/main/java/org/teavm/classlib/java/lang/TObject.java @@ -313,6 +313,7 @@ public class TObject { public final void waitImpl(long timeout, int nanos, final AsyncCallback callback) { final NotifyListenerImpl listener = new NotifyListenerImpl(this, callback, monitor.count); monitor.notifyListeners.add(listener); + TThread.currentThread().interruptHandler = listener; if (timeout > 0 || nanos > 0) { listener.timerId = Platform.schedule(listener, timeout >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout); diff --git a/classlib/src/main/java/org/teavm/classlib/java/lang/TThread.java b/classlib/src/main/java/org/teavm/classlib/java/lang/TThread.java index 1e6cfdf4d..50d91f213 100644 --- a/classlib/src/main/java/org/teavm/classlib/java/lang/TThread.java +++ b/classlib/src/main/java/org/teavm/classlib/java/lang/TThread.java @@ -31,7 +31,7 @@ public class TThread extends TObject implements TRunnable { private int yieldCount; private final Object finishedLock = new Object(); private boolean interruptedFlag; - private TThreadInterruptHandler interruptHandler; + public TThreadInterruptHandler interruptHandler; private TString name; private boolean alive = true; diff --git a/classlib/src/main/java/org/teavm/classlib/java/util/TArrayDeque.java b/classlib/src/main/java/org/teavm/classlib/java/util/TArrayDeque.java index de871737a..591caddda 100644 --- a/classlib/src/main/java/org/teavm/classlib/java/util/TArrayDeque.java +++ b/classlib/src/main/java/org/teavm/classlib/java/util/TArrayDeque.java @@ -18,11 +18,6 @@ package org.teavm.classlib.java.util; import java.util.Arrays; import org.teavm.classlib.java.lang.*; -/** - * - * @author Alexey Andreev - * @param - */ public class TArrayDeque extends TAbstractCollection implements TDeque { private int version; private Object[] array; diff --git a/classlib/src/main/java/org/teavm/classlib/java/util/TQueue.java b/classlib/src/main/java/org/teavm/classlib/java/util/TQueue.java index 27b2805cf..b2c86fbf4 100644 --- a/classlib/src/main/java/org/teavm/classlib/java/util/TQueue.java +++ b/classlib/src/main/java/org/teavm/classlib/java/util/TQueue.java @@ -15,11 +15,6 @@ */ package org.teavm.classlib.java.util; -/** - * - * @author Alexey Andreev - * @param - */ public interface TQueue extends TCollection { boolean offer(E e); diff --git a/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TArrayBlockingQueue.java b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TArrayBlockingQueue.java new file mode 100644 index 000000000..d75603955 --- /dev/null +++ b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TArrayBlockingQueue.java @@ -0,0 +1,495 @@ +/* + * Copyright 2018 konsoletyper. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.teavm.classlib.java.util.concurrent; + +import java.lang.reflect.Array; +import java.util.Arrays; +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.Objects; +import org.teavm.classlib.java.lang.TInterruptedException; +import org.teavm.classlib.java.lang.TThread; +import org.teavm.classlib.java.lang.TThreadInterruptHandler; +import org.teavm.classlib.java.util.TAbstractQueue; +import org.teavm.classlib.java.util.TCollection; +import org.teavm.classlib.java.util.TIterator; +import org.teavm.interop.Async; +import org.teavm.interop.Sync; +import org.teavm.platform.Platform; +import org.teavm.platform.PlatformQueue; +import org.teavm.platform.PlatformRunnable; +import org.teavm.platform.async.AsyncCallback; + +public class TArrayBlockingQueue extends TAbstractQueue implements TBlockingQueue { + private Object[] array; + private int head; + private int tail; + private PlatformQueue waitHandlers; + + public TArrayBlockingQueue(int capacity) { + this(capacity, false); + } + + public TArrayBlockingQueue(int capacity, boolean fair) { + if (capacity < 1) { + throw new IllegalArgumentException("Capacity must be >= 1"); + } + array = new Object[capacity]; + } + + public TArrayBlockingQueue(int capacity, boolean fair, Collection c) { + if (capacity < 1 || capacity < Objects.requireNonNull(c).size()) { + throw new IllegalArgumentException("Capacity must be at least 1 or collection's size"); + } + if (c.size() > capacity) { + throw new IllegalArgumentException(); + } + array = new Object[capacity]; + + int index = 0; + for (E e : c) { + array[index++] = Objects.requireNonNull(e); + } + tail = c.size(); + } + + @Override + public boolean add(E e) { + Objects.requireNonNull(e); + if (isFull()) { + throw new IllegalStateException("This blocking queue is full"); + } + addImpl(e); + return true; + } + + @Override + public boolean offer(E e) { + Objects.requireNonNull(e); + if (isFull()) { + return false; + } + addImpl(e); + return true; + } + + @Override + public void put(E e) throws InterruptedException { + Objects.requireNonNull(e); + while (isFull()) { + waitForChange(0); + } + addImpl(e); + } + + @Override + public boolean offer(E e, long timeout, TTimeUnit unit) throws InterruptedException { + Objects.requireNonNull(e); + if (isFull()) { + long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); + while (isFull()) { + if (!waitForChange(timeLimit)) { + return false; + } + } + } + addImpl(e); + return true; + } + + @Override + public E poll() { + if (isEmpty()) { + return null; + } + return removeImpl(); + } + + @Override + public E take() throws InterruptedException { + while (isEmpty()) { + waitForChange(0); + } + return removeImpl(); + } + + @Override + public E poll(long timeout, TTimeUnit unit) throws InterruptedException { + if (isEmpty()) { + long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); + while (isEmpty()) { + if (!waitForChange(timeLimit)) { + return null; + } + } + } + return removeImpl(); + } + + @Override + @SuppressWarnings("unchecked") + public E peek() { + if (isEmpty()) { + return null; + } + return (E) array[head]; + } + + @Override + public int size() { + if (head < tail) { + return tail - head; + } else if (array[head] != null) { + return tail + array.length - head; + } else { + return 0; + } + } + + @Override + public boolean isEmpty() { + return head == tail && array[head] == null; + } + + @Override + public int remainingCapacity() { + return array.length - size(); + } + + @Override + public boolean remove(Object o) { + if (isEmpty()) { + return false; + } else if (head < tail) { + for (int i = head; i < tail; ++i) { + if (array[i].equals(o)) { + removeAt(i); + return true; + } + } + return false; + } else { + for (int i = head; i < array.length; ++i) { + if (array[i].equals(o)) { + removeAt(i); + return true; + } + } + + for (int i = 0; i < tail; ++i) { + if (array[i].equals(o)) { + removeAt(i); + return true; + } + } + + return false; + } + } + + @Override + public boolean contains(Object o) { + if (isEmpty()) { + return false; + } else if (head < tail) { + for (int i = head; i < tail; ++i) { + if (array[i].equals(o)) { + return true; + } + } + return false; + } else { + for (int i = head; i < array.length; ++i) { + if (array[i].equals(o)) { + return true; + } + } + for (int i = 0; i < tail; ++i) { + if (array[i].equals(o)) { + return true; + } + } + return false; + } + } + + @Override + public Object[] toArray() { + if (isEmpty()) { + return new Object[0]; + } else if (head < tail) { + return Arrays.copyOfRange(array, head, tail); + } else { + Object[] result = new Object[size()]; + System.arraycopy(array, head, result, 0, array.length - head); + System.arraycopy(array, 0, result, array.length - head, tail); + return result; + } + } + + @Override + @SuppressWarnings({ "unchecked", "SuspiciousSystemArraycopy" }) + public T[] toArray(T[] a) { + if (isEmpty()) { + if (a.length > 0) { + a[0] = null; + } + return a; + } + + int size = size(); + if (size > a.length) { + a = (T[]) Array.newInstance(a.getClass().getComponentType(), size); + } else if (size < a.length) { + a[size] = null; + } + + if (head < tail) { + System.arraycopy(array, head, a, 0, size); + } else { + System.arraycopy(array, head, a, 0, array.length - head); + System.arraycopy(array, 0, a, array.length - head, tail); + } + + return a; + } + + @Override + public void clear() { + if (isEmpty()) { + return; + } + if (head < tail) { + for (int i = head; i < tail; ++i) { + array[i] = null; + } + } else { + for (int i = head; i < array.length; ++i) { + array[i] = null; + } + for (int i = 0; i < tail; ++i) { + array[i] = null; + } + } + + head = 0; + tail = 0; + notifyChange(); + } + + @Override + public int drainTo(TCollection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(TCollection c, int maxElements) { + if (c == this) { + throw new IllegalArgumentException("The specified collection is this queue"); + } + if (isEmpty()) { + return 0; + } + Objects.requireNonNull(c); + maxElements = Math.min(maxElements, size()); + if (maxElements < tail) { + for (int i = 0; i < maxElements; ++i) { + drainSingleTo(c); + } + notifyChange(); + } else { + int remaining = maxElements - (array.length - head); + while (head < array.length) { + drainSingleTo(c); + } + head = 0; + while (remaining-- > 0) { + drainSingleTo(c); + } + notifyChange(); + } + return maxElements; + } + + @Override + public TIterator iterator() { + return new TIterator() { + int index = head; + int removeIndex = -1; + + @Override + public boolean hasNext() { + return array[index] != null; + } + + @Override + public E next() { + removeIndex = index; + @SuppressWarnings("unchecked") + E result = (E) array[index++]; + if (result == null) { + throw new NoSuchElementException(); + } + if (index == array.length) { + index = 0; + } + return result; + } + + @Override + public void remove() { + if (removeIndex < 0 || array[removeIndex] == null) { + throw new IllegalStateException(); + } + removeAt(removeIndex); + removeIndex = -1; + } + }; + } + + private void removeAt(int index) { + if (index < tail) { + shiftElements(index, tail); + --tail; + } else { + shiftElements(index, array.length); + array[array.length - 1] = array[0]; + shiftElements(0, tail); + if (--tail < 0) { + tail = array.length; + } + } + array[tail] = null; + notifyChange(); + } + + private void drainSingleTo(TCollection c) { + @SuppressWarnings("unchecked") + E e = (E) array[head]; + array[head++] = null; + c.add(e); + } + + private void shiftElements(int from, int to) { + int remaining = to - from - 1; + if (remaining > 0) { + System.arraycopy(array, from + 1, array, from, remaining); + } + } + + @Sync + private void addImpl(E e) { + array[tail++] = e; + if (tail == array.length) { + tail = 0; + } + notifyChange(); + } + + @Sync + private E removeImpl() { + @SuppressWarnings("unchecked") + E result = (E) array[head]; + array[head++] = null; + if (head == array.length) { + head = 0; + } + notifyChange(); + return result; + } + + private void notifyChange() { + if (waitHandlers == null) { + return; + } + if (waitHandlers != null) { + while (!waitHandlers.isEmpty()) { + Platform.postpone(() -> waitHandlers.remove().changed()); + } + waitHandlers = null; + } + } + + @Async + private native Boolean waitForChange(long timeLimit) throws InterruptedException; + + private void waitForChange(long timeLimit, AsyncCallback callback) { + if (waitHandlers == null) { + waitHandlers = Platform.createQueue(); + } + + WaitHandler handler = new WaitHandler(callback); + waitHandlers.add(handler); + if (timeLimit > 0) { + int timeout = Math.max(0, (int) (timeLimit - System.currentTimeMillis())); + handler.timerId = Platform.schedule(handler, timeout); + } else { + handler.timerId = -1; + } + + TThread.currentThread().interruptHandler = handler; + } + + class WaitHandler implements PlatformRunnable, TThreadInterruptHandler { + AsyncCallback callback; + boolean complete; + int timerId; + + WaitHandler(AsyncCallback callback) { + this.callback = callback; + } + + @Override + public void run() { + if (complete()) { + return; + } + callback.complete(false); + } + + @Override + public void interrupted() { + if (complete()) { + return; + } + callback.error(new TInterruptedException()); + } + + private boolean complete() { + if (complete) { + return true; + } + complete = true; + if (timerId >= 0) { + Platform.killSchedule(timerId); + timerId = -1; + } + TThread.currentThread().interruptHandler = null; + return false; + } + + void changed() { + if (complete()) { + return; + } + callback.complete(true); + } + } + + private boolean isFull() { + return head == tail && array[head] != null; + } +} diff --git a/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TBlockingQueue.java b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TBlockingQueue.java new file mode 100644 index 000000000..34e5b9384 --- /dev/null +++ b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TBlockingQueue.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018 konsoletyper. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.teavm.classlib.java.util.concurrent; + +import org.teavm.classlib.java.util.TCollection; +import org.teavm.classlib.java.util.TQueue; + +public interface TBlockingQueue extends TQueue { + void put(E e) throws InterruptedException; + + boolean offer(E e, long timeout, TTimeUnit unit) throws InterruptedException; + + E take() throws InterruptedException; + + E poll(long timeout, TTimeUnit unit) throws InterruptedException; + + int remainingCapacity(); + + int drainTo(TCollection c); + + int drainTo(TCollection c, int maxElements); +} diff --git a/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TTimeUnit.java b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TTimeUnit.java new file mode 100644 index 000000000..591204226 --- /dev/null +++ b/classlib/src/main/java/org/teavm/classlib/java/util/concurrent/TTimeUnit.java @@ -0,0 +1,82 @@ +/* + * Copyright 2018 konsoletyper. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.teavm.classlib.java.util.concurrent; + +public enum TTimeUnit { + NANOSECONDS(1), + MICROSECONDS(1_000), + MILLISECONDS(1_000_000), + SECONDS(1_000_000_000), + MINUTES(60_000_000_000L), + HOURS(3_600_000_000_000L), + DAYS(24 * 3_600_000_000_000L); + + private long nanoseconds; + + TTimeUnit(long nanoseconds) { + this.nanoseconds = nanoseconds; + } + + public long convert(long sourceDuration, TTimeUnit sourceUnit) { + long sourceNanos = sourceUnit.nanoseconds; + long targetNanos = nanoseconds; + if (sourceNanos < targetNanos) { + return sourceDuration * (targetNanos / sourceNanos); + } else { + return sourceDuration / (sourceNanos / targetNanos); + } + } + + public long toNanos(long duration) { + return duration * nanoseconds; + } + + public long toMicros(long duration) { + return MICROSECONDS.convert(duration, this); + } + + public long toMillis(long duration) { + return MILLISECONDS.convert(duration, this); + } + + public long toSeconds(long duration) { + return SECONDS.convert(duration, this); + } + + public long toMinutes(long duration) { + return MINUTES.convert(duration, this); + } + + public long toHours(long duration) { + return HOURS.convert(duration, this); + } + + public long toDays(long duration) { + return DAYS.convert(duration, this); + } + + public void timedWait(Object obj, long timeout) throws InterruptedException { + obj.wait(toMillis(timeout)); + } + + public void timedJoin(Thread thread, long timeout) throws InterruptedException { + thread.join(toMillis(timeout)); + } + + public void sleep(long timeout) throws InterruptedException { + Thread.sleep(toMillis(timeout)); + } +} diff --git a/tests/src/test/java/org/teavm/classlib/java/util/concurrent/ArrayBlockingQueueTest.java b/tests/src/test/java/org/teavm/classlib/java/util/concurrent/ArrayBlockingQueueTest.java new file mode 100644 index 000000000..e7eef8729 --- /dev/null +++ b/tests/src/test/java/org/teavm/classlib/java/util/concurrent/ArrayBlockingQueueTest.java @@ -0,0 +1,245 @@ +/* + * Copyright 2018 Alexey Andreev. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.teavm.classlib.java.util.concurrent; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.teavm.junit.TeaVMTestRunner; + +@RunWith(TeaVMTestRunner.class) +public class ArrayBlockingQueueTest { + @Test + public void constructed() { + List list = Arrays.asList(2, 3, 5); + + try { + new ArrayBlockingQueue<>(2, false, list); + fail("IAE expected"); + } catch (IllegalArgumentException e) { + // Expected + } + + try { + new ArrayBlockingQueue<>(0); + fail("IAE expected"); + } catch (IllegalArgumentException e) { + // Expected + } + + try { + new ArrayBlockingQueue<>(1, false, null); + fail("IAE expected"); + } catch (NullPointerException e) { + // Expected + } + + BlockingQueue queue = new ArrayBlockingQueue<>(5, false, list); + assertEquals(3, queue.size()); + assertEquals(2, queue.remainingCapacity()); + + assertEquals(2, queue.poll().intValue()); + assertEquals(3, queue.poll().intValue()); + assertEquals(5, queue.poll().intValue()); + assertNull(queue.poll()); + } + + @Test + public void singleThread() { + BlockingQueue queue = new ArrayBlockingQueue<>(10); + queue.add(1); + queue.add(2); + queue.add(3); + assertEquals(1, queue.poll().intValue()); + queue.add(4); + assertEquals(2, queue.poll().intValue()); + assertEquals(3, queue.poll().intValue()); + assertEquals(4, queue.poll().intValue()); + assertNull(queue.poll()); + + queue.add(5); + assertEquals(5, queue.poll().intValue()); + } + + @Test + public void blockingAddition() throws InterruptedException { + BlockingQueue queue = new ArrayBlockingQueue<>(1); + assertTrue(queue.offer(1)); + assertFalse(queue.offer(2)); + + new Thread(() -> { + try { + Thread.sleep(150); + } catch (InterruptedException e) { + // Do nothing + } + queue.poll(); + }).start(); + + long start = System.currentTimeMillis(); + queue.put(3); + long end = System.currentTimeMillis(); + assertTrue(start + 50 < end && start + 250 > end); + + assertEquals(3, queue.remove().intValue()); + } + + @Test + public void blockingRemoval() throws InterruptedException { + BlockingQueue queue = new ArrayBlockingQueue<>(10); + assertNull(queue.poll()); + + new Thread(() -> { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + // Do nothing + } + queue.add(1); + queue.add(2); + }).start(); + + long start = System.currentTimeMillis(); + int a = queue.take(); + long end = System.currentTimeMillis(); + int b = queue.take(); + + assertTrue(start + 100 < end && start + 500 > end); + assertEquals(1, a); + assertEquals(2, b); + } + + @Test + public void shiftQueueSize() { + assertEquals(6, shiftQueue().size()); + } + + @Test + public void remove() { + BlockingQueue queue = simpleQueue(); + assertTrue(queue.remove(2)); + assertEquals(2, queue.size()); + assertEquals(1, queue.poll().intValue()); + assertEquals(3, queue.poll().intValue()); + assertNull(queue.poll()); + + queue = shiftQueue(); + assertTrue(queue.remove(1)); + assertEquals(5, queue.size()); + assertEquals(0, queue.poll().intValue()); + assertEquals(2, queue.poll().intValue()); + assertEquals(3, queue.poll().intValue()); + assertEquals(4, queue.poll().intValue()); + assertEquals(5, queue.poll().intValue()); + assertNull(queue.poll()); + + queue = shiftQueue(); + assertTrue(queue.remove(4)); + assertEquals(5, queue.size()); + assertEquals(0, queue.poll().intValue()); + assertEquals(1, queue.poll().intValue()); + assertEquals(2, queue.poll().intValue()); + assertEquals(3, queue.poll().intValue()); + assertEquals(5, queue.poll().intValue()); + assertNull(queue.poll()); + } + + @Test + public void dumpsToArray() { + BlockingQueue queue = simpleQueue(); + assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray()); + + queue = shiftQueue(); + assertArrayEquals(new Integer[] { 0, 1, 2, 3, 4, 5 }, queue.toArray()); + } + + @Test + public void dumpsToTypedArray() { + BlockingQueue queue = simpleQueue(); + assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray(new Integer[3])); + assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray(new Integer[1])); + + Integer[] array = new Integer[] { 10, 11, 12, 13, 14, 15 }; + assertArrayEquals(new Integer[] { 1, 2, 3, null, 14, 15 }, queue.toArray(array)); + } + + @Test + public void drains() { + BlockingQueue queue = simpleQueue(); + List target = new ArrayList<>(); + assertEquals(2, queue.drainTo(target, 2)); + assertEquals(Arrays.asList(1, 2), target); + + queue = simpleQueue(); + target.clear(); + assertEquals(3, queue.drainTo(target, 4)); + assertEquals(Arrays.asList(1, 2, 3), target); + + queue = shiftQueue(); + target.clear(); + assertEquals(2, queue.drainTo(target, 2)); + assertEquals(Arrays.asList(0, 1), target); + + queue = shiftQueue(); + target.clear(); + assertEquals(4, queue.drainTo(target, 4)); + assertEquals(Arrays.asList(0, 1, 2, 3), target); + } + + @Test + public void iterator() { + BlockingQueue queue = simpleQueue(); + assertEquals(Arrays.asList(1, 2, 3), new ArrayList<>(queue)); + + queue = shiftQueue(); + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), new ArrayList<>(queue)); + } + + private BlockingQueue simpleQueue() { + BlockingQueue queue = new ArrayBlockingQueue<>(10); + queue.add(0); + queue.remove(); + queue.add(1); + queue.add(2); + queue.add(3); + return queue; + } + + private BlockingQueue shiftQueue() { + BlockingQueue queue = new ArrayBlockingQueue<>(10); + for (int i = 0; i < 7; ++i) { + queue.add(i); + } + while (!queue.isEmpty()) { + queue.remove(); + } + + for (int i = 0; i < 6; ++i) { + queue.add(i); + } + return queue; + } +}