diff options
Diffstat (limited to 'israfil-foundation-concurrent/src/test/java/net/israfil/foundation/concurrent/PumpTest.java')
| -rw-r--r-- | israfil-foundation-concurrent/src/test/java/net/israfil/foundation/concurrent/PumpTest.java | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/israfil-foundation-concurrent/src/test/java/net/israfil/foundation/concurrent/PumpTest.java b/israfil-foundation-concurrent/src/test/java/net/israfil/foundation/concurrent/PumpTest.java new file mode 100644 index 0000000..5d714f8 --- /dev/null +++ b/israfil-foundation-concurrent/src/test/java/net/israfil/foundation/concurrent/PumpTest.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2006 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: AllTests.java 13 2006-01-27 23:45:36Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import net.israfil.foundation.concurrent.timing.Timeout; + +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * A Test Class + * + * @author <a href="mailto:cgruber@roundarch.com">Christian Edward Gruber</a> + * @author Latest: $Author: cgruber $ + * @version $Revision: 130 $ + */ +@Test +public class PumpTest { + + + public void testSimplePump() throws InterruptedException { + SimplePump<String> pump = new SimplePump<String>(); + pump.setTimeout(10000L); + MockProducer<String> p = new MockProducer<String>(); + final MockConsumer<String> c = new MockConsumer<String>(); + pump.setConsumer(c); + pump.setProducer(p); + Assert.assertNotNull(pump.getConsumer()); + Assert.assertNotNull(pump.getProducer()); + Assert.assertTrue(pump.isStopped()); + pump.start(); + Timeout timeout = new Timeout(20000); + while(!pump.isRunning()) timeout.sleep(5); + Assert.assertNull(p.item); + Assert.assertNull(c.item); + String itemA = "a"; + p.item = itemA; + timeout = new Timeout(20000); + while( c.item == null) { + timeout.sleep(5); + } + Assert.assertEquals(itemA,c.item); + pump.stop(); + Assert.assertTrue(pump.isStopped()); + } + + public void testCollectingPump() throws InterruptedException { + CollectingPump<String> pump = new CollectingPump<String>(); + MockProducer<String> p1 = new MockProducer<String>(); + MockProducer<String> p2 = new MockProducer<String>(); + MockProducer<String> p3 = new MockProducer<String>(); + final MockConsumer<String> c = new MockConsumer<String>(); + pump.setConsumer(c); + List<ConcurrentProducer<String>> producers = new ArrayList<ConcurrentProducer<String>>(); + producers.add(p1); + producers.add(p2); + producers.add(p3); + pump.setProducers(producers); + Assert.assertNotNull(pump.getConsumer()); + Assert.assertNotNull(pump.getProducers()); + Assert.assertTrue(pump.isStopped()); + pump.start(); + Timeout timeout = new Timeout(20000); + while(!pump.isRunning()) timeout.sleep(5); + Assert.assertNull(p1.item); + Assert.assertNull(c.item); + String itemA = "a", itemB = "b", itemC = "c"; + p1.item = itemA; + p2.item = itemB; + p3.item = itemC; + Assert.assertNotNull(p1.item); + Assert.assertNotNull(p2.item); + Assert.assertNotNull(p3.item); + timeout = new Timeout(20000); + while(c.item == null) timeout.sleep(5); + Assert.assertEquals(itemA, c.item); + Assert.assertNull(p1.item); + c.item = null; + timeout = new Timeout(20000); + while(c.item == null) timeout.sleep(5); + Assert.assertEquals(itemB, c.item); + Assert.assertNull(p2.item); + c.item = null; + timeout = new Timeout(20000); + while(c.item == null) timeout.sleep(5); + Assert.assertEquals(itemC, c.item); + Assert.assertNull(p3.item); + c.item = null; + pump.stop(); + Assert.assertTrue(pump.isStopped()); + } + + public void testDistributingPump() throws InterruptedException { + if (true) return; + DistributingPump<String> pump = new DistributingPump<String>(); + MockProducer<String> p = new MockProducer<String>(); + MockConsumer<String> c1 = new MockConsumer<String>(); + MockConsumer<String> c2 = new MockConsumer<String>(); + MockConsumer<String> c3 = new MockConsumer<String>(); + List<ConcurrentConsumer<String>> consumers = new ArrayList<ConcurrentConsumer<String>>(); + consumers.add(c1); + consumers.add(c2); + consumers.add(c3); + pump.setConsumers(consumers); + pump.setProducer(p); + Assert.assertNotNull(pump.getConsumers()); + Assert.assertNotNull(pump.getProducer()); + Assert.assertTrue(pump.isStopped()); + Timeout timeout = new Timeout(500); + while(!pump.isRunning()) timeout.sleep(5); + Assert.assertNull(p.item); + Assert.assertNull(c1.item); + Assert.assertNull(c2.item); + Assert.assertNull(c3.item); + String itemA = "a", itemB = "b", itemC = "c"; + p.item = itemA; + timeout = new Timeout(20000); + while(c1.item == null) timeout.sleep(5); + Assert.assertEquals(itemA,c1.item); + Assert.assertNull(p.item); + p.item = itemB; + timeout = new Timeout(20000); + while(c1.item == null) timeout.sleep(5); + Assert.assertEquals(itemB,c1.item); + Assert.assertNull(p.item); + p.item = itemC; + timeout = new Timeout(20000); + while(c1.item == null) timeout.sleep(5); + Assert.assertEquals(itemC,c1.item); + Assert.assertNull(p.item); + pump.stop(); + Assert.assertTrue(pump.isStopped()); + } + + public static class MockProducer<T> extends MockStringPipeComponent<T> implements ConcurrentProducer<T> { + public T produce() { + try { + return produce(0,null); + } catch (InterruptedException e) { + throw new RuntimeInterruptedException(e); + } + } + public T produce(long timeoutMillis, TimeUnit unit) throws InterruptedException { + Thread.sleep(delay); + Timeout timeout = new Timeout(timeoutMillis); + while (this.item == null && (timeoutMillis < 1 || unit == null)) + timeout.sleep(5); + Long start = System.currentTimeMillis(); + if (this.item == null) return null; // in case of timeouts. + synchronized (this.item) { + T oldItem = this.item; + this.item = null; + return oldItem; + } + } + } + + public static class MockConsumer<T> extends MockStringPipeComponent<T> implements ConcurrentConsumer<T> { + public void consume(T item) { + try { + consume(item,0,null); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + public void consume(T item, long timeoutMillis, TimeUnit unit) throws InterruptedException { + Thread.sleep(delay); + Timeout timeout = new Timeout(timeoutMillis); + while (this.item != null && (timeoutMillis < 1 || unit == null)) + timeout.sleep(5); + this.item = item; + } + } + + public static class MockStringPipeComponent<T> { + public T item = null; + public long delay = 0; + public static boolean timeout(Long start, Long timeout) { + return (System.currentTimeMillis() - start) < timeout; + } + } + +} |
