/* * Copyright (c) 2006 - 2007 Israfil Consulting Services Corporation * Copyright (c) 2006 - 2007 Christian Edward Gruber * All Rights Reserved * * This software is licensed under the Berkeley Standard Distribution license, * (BSD license), as defined below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. Neither the name of Israfil Consulting Services nor the names of its contributors * may be used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY * OF SUCH DAMAGE. * * $Id: AllTests.java 13 2006-01-27 23:45:36Z cgruber $ */ package net.israfil.foundation.concurrent; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import net.israfil.foundation.concurrent.timing.Timeout; import org.testng.Assert; import org.testng.annotations.Test; /** * A Test Class * * @author Christian Edward Gruber * @author Latest: $Author: cgruber $ * @version $Revision: 130 $ */ @Test public class PumpTest { public void testSimplePump() throws InterruptedException { SimplePump pump = new SimplePump(); pump.setTimeout(10000L); MockProducer p = new MockProducer(); final MockConsumer c = new MockConsumer(); pump.setConsumer(c); pump.setProducer(p); Assert.assertNotNull(pump.getConsumer()); Assert.assertNotNull(pump.getProducer()); Assert.assertTrue(pump.isStopped()); pump.start(); Timeout timeout = new Timeout(20000); while(!pump.isRunning()) timeout.sleep(5); Assert.assertNull(p.item); Assert.assertNull(c.item); String itemA = "a"; p.item = itemA; timeout = new Timeout(20000); while( c.item == null) { timeout.sleep(5); } Assert.assertEquals(itemA,c.item); pump.stop(); Assert.assertTrue(pump.isStopped()); } public void testCollectingPump() throws InterruptedException { CollectingPump pump = new CollectingPump(); MockProducer p1 = new MockProducer(); MockProducer p2 = new MockProducer(); MockProducer p3 = new MockProducer(); final MockConsumer c = new MockConsumer(); pump.setConsumer(c); List> producers = new ArrayList>(); producers.add(p1); producers.add(p2); producers.add(p3); pump.setProducers(producers); Assert.assertNotNull(pump.getConsumer()); Assert.assertNotNull(pump.getProducers()); Assert.assertTrue(pump.isStopped()); pump.start(); Timeout timeout = new Timeout(20000); while(!pump.isRunning()) timeout.sleep(5); Assert.assertNull(p1.item); Assert.assertNull(c.item); String itemA = "a", itemB = "b", itemC = "c"; p1.item = itemA; p2.item = itemB; p3.item = itemC; Assert.assertNotNull(p1.item); Assert.assertNotNull(p2.item); Assert.assertNotNull(p3.item); timeout = new Timeout(20000); while(c.item == null) timeout.sleep(5); Assert.assertEquals(itemA, c.item); Assert.assertNull(p1.item); c.item = null; timeout = new Timeout(20000); while(c.item == null) timeout.sleep(5); Assert.assertEquals(itemB, c.item); Assert.assertNull(p2.item); c.item = null; timeout = new Timeout(20000); while(c.item == null) timeout.sleep(5); Assert.assertEquals(itemC, c.item); Assert.assertNull(p3.item); c.item = null; pump.stop(); Assert.assertTrue(pump.isStopped()); } public void testDistributingPump() throws InterruptedException { if (true) return; DistributingPump pump = new DistributingPump(); MockProducer p = new MockProducer(); MockConsumer c1 = new MockConsumer(); MockConsumer c2 = new MockConsumer(); MockConsumer c3 = new MockConsumer(); List> consumers = new ArrayList>(); consumers.add(c1); consumers.add(c2); consumers.add(c3); pump.setConsumers(consumers); pump.setProducer(p); Assert.assertNotNull(pump.getConsumers()); Assert.assertNotNull(pump.getProducer()); Assert.assertTrue(pump.isStopped()); Timeout timeout = new Timeout(500); while(!pump.isRunning()) timeout.sleep(5); Assert.assertNull(p.item); Assert.assertNull(c1.item); Assert.assertNull(c2.item); Assert.assertNull(c3.item); String itemA = "a", itemB = "b", itemC = "c"; p.item = itemA; timeout = new Timeout(20000); while(c1.item == null) timeout.sleep(5); Assert.assertEquals(itemA,c1.item); Assert.assertNull(p.item); p.item = itemB; timeout = new Timeout(20000); while(c1.item == null) timeout.sleep(5); Assert.assertEquals(itemB,c1.item); Assert.assertNull(p.item); p.item = itemC; timeout = new Timeout(20000); while(c1.item == null) timeout.sleep(5); Assert.assertEquals(itemC,c1.item); Assert.assertNull(p.item); pump.stop(); Assert.assertTrue(pump.isStopped()); } public static class MockProducer extends MockStringPipeComponent implements ConcurrentProducer { public T produce() { try { return produce(0,null); } catch (InterruptedException e) { throw new RuntimeInterruptedException(e); } } public T produce(long timeoutMillis, TimeUnit unit) throws InterruptedException { Thread.sleep(delay); Timeout timeout = new Timeout(timeoutMillis); while (this.item == null && (timeoutMillis < 1 || unit == null)) timeout.sleep(5); Long start = System.currentTimeMillis(); if (this.item == null) return null; // in case of timeouts. synchronized (this.item) { T oldItem = this.item; this.item = null; return oldItem; } } } public static class MockConsumer extends MockStringPipeComponent implements ConcurrentConsumer { public void consume(T item) { try { consume(item,0,null); } catch (InterruptedException e) { throw new RuntimeException(e); } } public void consume(T item, long timeoutMillis, TimeUnit unit) throws InterruptedException { Thread.sleep(delay); Timeout timeout = new Timeout(timeoutMillis); while (this.item != null && (timeoutMillis < 1 || unit == null)) timeout.sleep(5); this.item = item; } } public static class MockStringPipeComponent { public T item = null; public long delay = 0; public static boolean timeout(Long start, Long timeout) { return (System.currentTimeMillis() - start) < timeout; } } }