Implement ArrayBlockingQueue

This commit is contained in:
Alexey Andreev 2018-01-20 23:06:54 +03:00
parent ec3724b2bc
commit 28e17af500
8 changed files with 859 additions and 11 deletions

View File

@ -313,6 +313,7 @@ public class TObject {
public final void waitImpl(long timeout, int nanos, final AsyncCallback<Void> callback) { public final void waitImpl(long timeout, int nanos, final AsyncCallback<Void> callback) {
final NotifyListenerImpl listener = new NotifyListenerImpl(this, callback, monitor.count); final NotifyListenerImpl listener = new NotifyListenerImpl(this, callback, monitor.count);
monitor.notifyListeners.add(listener); monitor.notifyListeners.add(listener);
TThread.currentThread().interruptHandler = listener;
if (timeout > 0 || nanos > 0) { if (timeout > 0 || nanos > 0) {
listener.timerId = Platform.schedule(listener, timeout >= Integer.MAX_VALUE ? Integer.MAX_VALUE listener.timerId = Platform.schedule(listener, timeout >= Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) timeout); : (int) timeout);

View File

@ -31,7 +31,7 @@ public class TThread extends TObject implements TRunnable {
private int yieldCount; private int yieldCount;
private final Object finishedLock = new Object(); private final Object finishedLock = new Object();
private boolean interruptedFlag; private boolean interruptedFlag;
private TThreadInterruptHandler interruptHandler; public TThreadInterruptHandler interruptHandler;
private TString name; private TString name;
private boolean alive = true; private boolean alive = true;

View File

@ -18,11 +18,6 @@ package org.teavm.classlib.java.util;
import java.util.Arrays; import java.util.Arrays;
import org.teavm.classlib.java.lang.*; import org.teavm.classlib.java.lang.*;
/**
*
* @author Alexey Andreev
* @param <E>
*/
public class TArrayDeque<E> extends TAbstractCollection<E> implements TDeque<E> { public class TArrayDeque<E> extends TAbstractCollection<E> implements TDeque<E> {
private int version; private int version;
private Object[] array; private Object[] array;

View File

@ -15,11 +15,6 @@
*/ */
package org.teavm.classlib.java.util; package org.teavm.classlib.java.util;
/**
*
* @author Alexey Andreev
* @param <E>
*/
public interface TQueue<E> extends TCollection<E> { public interface TQueue<E> extends TCollection<E> {
boolean offer(E e); boolean offer(E e);

View File

@ -0,0 +1,495 @@
/*
* Copyright 2018 konsoletyper.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.teavm.classlib.java.util.concurrent;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.teavm.classlib.java.lang.TInterruptedException;
import org.teavm.classlib.java.lang.TThread;
import org.teavm.classlib.java.lang.TThreadInterruptHandler;
import org.teavm.classlib.java.util.TAbstractQueue;
import org.teavm.classlib.java.util.TCollection;
import org.teavm.classlib.java.util.TIterator;
import org.teavm.interop.Async;
import org.teavm.interop.Sync;
import org.teavm.platform.Platform;
import org.teavm.platform.PlatformQueue;
import org.teavm.platform.PlatformRunnable;
import org.teavm.platform.async.AsyncCallback;
public class TArrayBlockingQueue<E> extends TAbstractQueue<E> implements TBlockingQueue<E> {
private Object[] array;
private int head;
private int tail;
private PlatformQueue<WaitHandler> waitHandlers;
public TArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public TArrayBlockingQueue(int capacity, boolean fair) {
if (capacity < 1) {
throw new IllegalArgumentException("Capacity must be >= 1");
}
array = new Object[capacity];
}
public TArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
if (capacity < 1 || capacity < Objects.requireNonNull(c).size()) {
throw new IllegalArgumentException("Capacity must be at least 1 or collection's size");
}
if (c.size() > capacity) {
throw new IllegalArgumentException();
}
array = new Object[capacity];
int index = 0;
for (E e : c) {
array[index++] = Objects.requireNonNull(e);
}
tail = c.size();
}
@Override
public boolean add(E e) {
Objects.requireNonNull(e);
if (isFull()) {
throw new IllegalStateException("This blocking queue is full");
}
addImpl(e);
return true;
}
@Override
public boolean offer(E e) {
Objects.requireNonNull(e);
if (isFull()) {
return false;
}
addImpl(e);
return true;
}
@Override
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
while (isFull()) {
waitForChange(0);
}
addImpl(e);
}
@Override
public boolean offer(E e, long timeout, TTimeUnit unit) throws InterruptedException {
Objects.requireNonNull(e);
if (isFull()) {
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
while (isFull()) {
if (!waitForChange(timeLimit)) {
return false;
}
}
}
addImpl(e);
return true;
}
@Override
public E poll() {
if (isEmpty()) {
return null;
}
return removeImpl();
}
@Override
public E take() throws InterruptedException {
while (isEmpty()) {
waitForChange(0);
}
return removeImpl();
}
@Override
public E poll(long timeout, TTimeUnit unit) throws InterruptedException {
if (isEmpty()) {
long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
while (isEmpty()) {
if (!waitForChange(timeLimit)) {
return null;
}
}
}
return removeImpl();
}
@Override
@SuppressWarnings("unchecked")
public E peek() {
if (isEmpty()) {
return null;
}
return (E) array[head];
}
@Override
public int size() {
if (head < tail) {
return tail - head;
} else if (array[head] != null) {
return tail + array.length - head;
} else {
return 0;
}
}
@Override
public boolean isEmpty() {
return head == tail && array[head] == null;
}
@Override
public int remainingCapacity() {
return array.length - size();
}
@Override
public boolean remove(Object o) {
if (isEmpty()) {
return false;
} else if (head < tail) {
for (int i = head; i < tail; ++i) {
if (array[i].equals(o)) {
removeAt(i);
return true;
}
}
return false;
} else {
for (int i = head; i < array.length; ++i) {
if (array[i].equals(o)) {
removeAt(i);
return true;
}
}
for (int i = 0; i < tail; ++i) {
if (array[i].equals(o)) {
removeAt(i);
return true;
}
}
return false;
}
}
@Override
public boolean contains(Object o) {
if (isEmpty()) {
return false;
} else if (head < tail) {
for (int i = head; i < tail; ++i) {
if (array[i].equals(o)) {
return true;
}
}
return false;
} else {
for (int i = head; i < array.length; ++i) {
if (array[i].equals(o)) {
return true;
}
}
for (int i = 0; i < tail; ++i) {
if (array[i].equals(o)) {
return true;
}
}
return false;
}
}
@Override
public Object[] toArray() {
if (isEmpty()) {
return new Object[0];
} else if (head < tail) {
return Arrays.copyOfRange(array, head, tail);
} else {
Object[] result = new Object[size()];
System.arraycopy(array, head, result, 0, array.length - head);
System.arraycopy(array, 0, result, array.length - head, tail);
return result;
}
}
@Override
@SuppressWarnings({ "unchecked", "SuspiciousSystemArraycopy" })
public <T> T[] toArray(T[] a) {
if (isEmpty()) {
if (a.length > 0) {
a[0] = null;
}
return a;
}
int size = size();
if (size > a.length) {
a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
} else if (size < a.length) {
a[size] = null;
}
if (head < tail) {
System.arraycopy(array, head, a, 0, size);
} else {
System.arraycopy(array, head, a, 0, array.length - head);
System.arraycopy(array, 0, a, array.length - head, tail);
}
return a;
}
@Override
public void clear() {
if (isEmpty()) {
return;
}
if (head < tail) {
for (int i = head; i < tail; ++i) {
array[i] = null;
}
} else {
for (int i = head; i < array.length; ++i) {
array[i] = null;
}
for (int i = 0; i < tail; ++i) {
array[i] = null;
}
}
head = 0;
tail = 0;
notifyChange();
}
@Override
public int drainTo(TCollection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(TCollection<? super E> c, int maxElements) {
if (c == this) {
throw new IllegalArgumentException("The specified collection is this queue");
}
if (isEmpty()) {
return 0;
}
Objects.requireNonNull(c);
maxElements = Math.min(maxElements, size());
if (maxElements < tail) {
for (int i = 0; i < maxElements; ++i) {
drainSingleTo(c);
}
notifyChange();
} else {
int remaining = maxElements - (array.length - head);
while (head < array.length) {
drainSingleTo(c);
}
head = 0;
while (remaining-- > 0) {
drainSingleTo(c);
}
notifyChange();
}
return maxElements;
}
@Override
public TIterator<E> iterator() {
return new TIterator<E>() {
int index = head;
int removeIndex = -1;
@Override
public boolean hasNext() {
return array[index] != null;
}
@Override
public E next() {
removeIndex = index;
@SuppressWarnings("unchecked")
E result = (E) array[index++];
if (result == null) {
throw new NoSuchElementException();
}
if (index == array.length) {
index = 0;
}
return result;
}
@Override
public void remove() {
if (removeIndex < 0 || array[removeIndex] == null) {
throw new IllegalStateException();
}
removeAt(removeIndex);
removeIndex = -1;
}
};
}
private void removeAt(int index) {
if (index < tail) {
shiftElements(index, tail);
--tail;
} else {
shiftElements(index, array.length);
array[array.length - 1] = array[0];
shiftElements(0, tail);
if (--tail < 0) {
tail = array.length;
}
}
array[tail] = null;
notifyChange();
}
private void drainSingleTo(TCollection<? super E> c) {
@SuppressWarnings("unchecked")
E e = (E) array[head];
array[head++] = null;
c.add(e);
}
private void shiftElements(int from, int to) {
int remaining = to - from - 1;
if (remaining > 0) {
System.arraycopy(array, from + 1, array, from, remaining);
}
}
@Sync
private void addImpl(E e) {
array[tail++] = e;
if (tail == array.length) {
tail = 0;
}
notifyChange();
}
@Sync
private E removeImpl() {
@SuppressWarnings("unchecked")
E result = (E) array[head];
array[head++] = null;
if (head == array.length) {
head = 0;
}
notifyChange();
return result;
}
private void notifyChange() {
if (waitHandlers == null) {
return;
}
if (waitHandlers != null) {
while (!waitHandlers.isEmpty()) {
Platform.postpone(() -> waitHandlers.remove().changed());
}
waitHandlers = null;
}
}
@Async
private native Boolean waitForChange(long timeLimit) throws InterruptedException;
private void waitForChange(long timeLimit, AsyncCallback<Boolean> callback) {
if (waitHandlers == null) {
waitHandlers = Platform.createQueue();
}
WaitHandler handler = new WaitHandler(callback);
waitHandlers.add(handler);
if (timeLimit > 0) {
int timeout = Math.max(0, (int) (timeLimit - System.currentTimeMillis()));
handler.timerId = Platform.schedule(handler, timeout);
} else {
handler.timerId = -1;
}
TThread.currentThread().interruptHandler = handler;
}
class WaitHandler implements PlatformRunnable, TThreadInterruptHandler {
AsyncCallback<Boolean> callback;
boolean complete;
int timerId;
WaitHandler(AsyncCallback<Boolean> callback) {
this.callback = callback;
}
@Override
public void run() {
if (complete()) {
return;
}
callback.complete(false);
}
@Override
public void interrupted() {
if (complete()) {
return;
}
callback.error(new TInterruptedException());
}
private boolean complete() {
if (complete) {
return true;
}
complete = true;
if (timerId >= 0) {
Platform.killSchedule(timerId);
timerId = -1;
}
TThread.currentThread().interruptHandler = null;
return false;
}
void changed() {
if (complete()) {
return;
}
callback.complete(true);
}
}
private boolean isFull() {
return head == tail && array[head] != null;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2018 konsoletyper.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.teavm.classlib.java.util.concurrent;
import org.teavm.classlib.java.util.TCollection;
import org.teavm.classlib.java.util.TQueue;
public interface TBlockingQueue<E> extends TQueue<E> {
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TTimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TTimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(TCollection<? super E> c);
int drainTo(TCollection<? super E> c, int maxElements);
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2018 konsoletyper.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.teavm.classlib.java.util.concurrent;
public enum TTimeUnit {
NANOSECONDS(1),
MICROSECONDS(1_000),
MILLISECONDS(1_000_000),
SECONDS(1_000_000_000),
MINUTES(60_000_000_000L),
HOURS(3_600_000_000_000L),
DAYS(24 * 3_600_000_000_000L);
private long nanoseconds;
TTimeUnit(long nanoseconds) {
this.nanoseconds = nanoseconds;
}
public long convert(long sourceDuration, TTimeUnit sourceUnit) {
long sourceNanos = sourceUnit.nanoseconds;
long targetNanos = nanoseconds;
if (sourceNanos < targetNanos) {
return sourceDuration * (targetNanos / sourceNanos);
} else {
return sourceDuration / (sourceNanos / targetNanos);
}
}
public long toNanos(long duration) {
return duration * nanoseconds;
}
public long toMicros(long duration) {
return MICROSECONDS.convert(duration, this);
}
public long toMillis(long duration) {
return MILLISECONDS.convert(duration, this);
}
public long toSeconds(long duration) {
return SECONDS.convert(duration, this);
}
public long toMinutes(long duration) {
return MINUTES.convert(duration, this);
}
public long toHours(long duration) {
return HOURS.convert(duration, this);
}
public long toDays(long duration) {
return DAYS.convert(duration, this);
}
public void timedWait(Object obj, long timeout) throws InterruptedException {
obj.wait(toMillis(timeout));
}
public void timedJoin(Thread thread, long timeout) throws InterruptedException {
thread.join(toMillis(timeout));
}
public void sleep(long timeout) throws InterruptedException {
Thread.sleep(toMillis(timeout));
}
}

View File

@ -0,0 +1,245 @@
/*
* Copyright 2018 Alexey Andreev.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.teavm.classlib.java.util.concurrent;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.teavm.junit.TeaVMTestRunner;
@RunWith(TeaVMTestRunner.class)
public class ArrayBlockingQueueTest {
@Test
public void constructed() {
List<Integer> list = Arrays.asList(2, 3, 5);
try {
new ArrayBlockingQueue<>(2, false, list);
fail("IAE expected");
} catch (IllegalArgumentException e) {
// Expected
}
try {
new ArrayBlockingQueue<>(0);
fail("IAE expected");
} catch (IllegalArgumentException e) {
// Expected
}
try {
new ArrayBlockingQueue<>(1, false, null);
fail("IAE expected");
} catch (NullPointerException e) {
// Expected
}
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5, false, list);
assertEquals(3, queue.size());
assertEquals(2, queue.remainingCapacity());
assertEquals(2, queue.poll().intValue());
assertEquals(3, queue.poll().intValue());
assertEquals(5, queue.poll().intValue());
assertNull(queue.poll());
}
@Test
public void singleThread() {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
queue.add(1);
queue.add(2);
queue.add(3);
assertEquals(1, queue.poll().intValue());
queue.add(4);
assertEquals(2, queue.poll().intValue());
assertEquals(3, queue.poll().intValue());
assertEquals(4, queue.poll().intValue());
assertNull(queue.poll());
queue.add(5);
assertEquals(5, queue.poll().intValue());
}
@Test
public void blockingAddition() throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
assertTrue(queue.offer(1));
assertFalse(queue.offer(2));
new Thread(() -> {
try {
Thread.sleep(150);
} catch (InterruptedException e) {
// Do nothing
}
queue.poll();
}).start();
long start = System.currentTimeMillis();
queue.put(3);
long end = System.currentTimeMillis();
assertTrue(start + 50 < end && start + 250 > end);
assertEquals(3, queue.remove().intValue());
}
@Test
public void blockingRemoval() throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
assertNull(queue.poll());
new Thread(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// Do nothing
}
queue.add(1);
queue.add(2);
}).start();
long start = System.currentTimeMillis();
int a = queue.take();
long end = System.currentTimeMillis();
int b = queue.take();
assertTrue(start + 100 < end && start + 500 > end);
assertEquals(1, a);
assertEquals(2, b);
}
@Test
public void shiftQueueSize() {
assertEquals(6, shiftQueue().size());
}
@Test
public void remove() {
BlockingQueue<Integer> queue = simpleQueue();
assertTrue(queue.remove(2));
assertEquals(2, queue.size());
assertEquals(1, queue.poll().intValue());
assertEquals(3, queue.poll().intValue());
assertNull(queue.poll());
queue = shiftQueue();
assertTrue(queue.remove(1));
assertEquals(5, queue.size());
assertEquals(0, queue.poll().intValue());
assertEquals(2, queue.poll().intValue());
assertEquals(3, queue.poll().intValue());
assertEquals(4, queue.poll().intValue());
assertEquals(5, queue.poll().intValue());
assertNull(queue.poll());
queue = shiftQueue();
assertTrue(queue.remove(4));
assertEquals(5, queue.size());
assertEquals(0, queue.poll().intValue());
assertEquals(1, queue.poll().intValue());
assertEquals(2, queue.poll().intValue());
assertEquals(3, queue.poll().intValue());
assertEquals(5, queue.poll().intValue());
assertNull(queue.poll());
}
@Test
public void dumpsToArray() {
BlockingQueue<Integer> queue = simpleQueue();
assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray());
queue = shiftQueue();
assertArrayEquals(new Integer[] { 0, 1, 2, 3, 4, 5 }, queue.toArray());
}
@Test
public void dumpsToTypedArray() {
BlockingQueue<Integer> queue = simpleQueue();
assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray(new Integer[3]));
assertArrayEquals(new Integer[] { 1, 2, 3 }, queue.toArray(new Integer[1]));
Integer[] array = new Integer[] { 10, 11, 12, 13, 14, 15 };
assertArrayEquals(new Integer[] { 1, 2, 3, null, 14, 15 }, queue.toArray(array));
}
@Test
public void drains() {
BlockingQueue<Integer> queue = simpleQueue();
List<Integer> target = new ArrayList<>();
assertEquals(2, queue.drainTo(target, 2));
assertEquals(Arrays.asList(1, 2), target);
queue = simpleQueue();
target.clear();
assertEquals(3, queue.drainTo(target, 4));
assertEquals(Arrays.asList(1, 2, 3), target);
queue = shiftQueue();
target.clear();
assertEquals(2, queue.drainTo(target, 2));
assertEquals(Arrays.asList(0, 1), target);
queue = shiftQueue();
target.clear();
assertEquals(4, queue.drainTo(target, 4));
assertEquals(Arrays.asList(0, 1, 2, 3), target);
}
@Test
public void iterator() {
BlockingQueue<Integer> queue = simpleQueue();
assertEquals(Arrays.asList(1, 2, 3), new ArrayList<>(queue));
queue = shiftQueue();
assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), new ArrayList<>(queue));
}
private BlockingQueue<Integer> simpleQueue() {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
queue.add(0);
queue.remove();
queue.add(1);
queue.add(2);
queue.add(3);
return queue;
}
private BlockingQueue<Integer> shiftQueue() {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
for (int i = 0; i < 7; ++i) {
queue.add(i);
}
while (!queue.isEmpty()) {
queue.remove();
}
for (int i = 0; i < 6; ++i) {
queue.add(i);
}
return queue;
}
}