From 7c279747beb43c7e88633a6228a155a30e6834f7 Mon Sep 17 00:00:00 2001 From: Benjamin Culkin Date: Mon, 27 May 2024 11:38:33 -0400 Subject: Initial import --- .../foundation/concurrent/AbstractGated.java | 64 +++++++ .../foundation/concurrent/AbstractPipe.java | 77 ++++++++ .../foundation/concurrent/AbstractPump.java | 82 ++++++++ .../foundation/concurrent/AbstractWorker.java | 212 +++++++++++++++++++++ .../foundation/concurrent/CollectingPump.java | 109 +++++++++++ .../foundation/concurrent/ConcurrentConsumer.java | 70 +++++++ .../foundation/concurrent/ConcurrentProducer.java | 62 ++++++ .../foundation/concurrent/DistributingPump.java | 101 ++++++++++ .../net/israfil/foundation/concurrent/Gated.java | 53 ++++++ .../foundation/concurrent/OneShotWorker.java | 71 +++++++ .../net/israfil/foundation/concurrent/Pipe.java | 50 +++++ .../foundation/concurrent/ProcessingPipe.java | 90 +++++++++ .../israfil/foundation/concurrent/Processor.java | 49 +++++ .../foundation/concurrent/RepeatingWorker.java | 97 ++++++++++ .../concurrent/RuntimeInterruptedException.java | 64 +++++++ .../israfil/foundation/concurrent/SimplePipe.java | 74 +++++++ .../israfil/foundation/concurrent/SimplePump.java | 85 +++++++++ .../net/israfil/foundation/concurrent/Worker.java | 104 ++++++++++ .../consumers/LoggingStringConsumer.java | 61 ++++++ .../concurrent/producers/ReaderStringProducer.java | 79 ++++++++ .../foundation/concurrent/timing/Timeout.java | 68 +++++++ 21 files changed, 1722 insertions(+) create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java (limited to 'israfil-foundation-concurrent/src/main/java') diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java new file mode 100644 index 0000000..2389c95 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + + +/** + * An object that consumes an object and processes it, then produces the result + * upon request. If the processor has no item in processing, it may consume + * an item. If the processor has an item it produces it when requested. In + * the converse cases the behaviour of the object depends on whether the + * processor is in its on state or its off state. + * + * @author Christian Edward Gruber + */ +public abstract class AbstractGated implements Gated { + + private GateState state = GateState.Close; + + public AbstractGated() {} + + public void close() { + this.state = GateState.Close; + } + + public GateState getState() { + return state; + } + + public void open() { + this.state = GateState.Open; + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java new file mode 100644 index 0000000..4c5226a --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * An object that consumes an object and processes it, then produces the result + * upon request. If the processor has no item in processing, it may consume + * an item. If the processor has an item it produces it when requested. In + * the converse cases the behaviour of the object depends on whether the + * processor is in its on state or its off state. + * + * @author Christian Edward Gruber + */ +public abstract class AbstractPipe extends AbstractGated implements Pipe { + + protected BlockingQueue queue; + + public AbstractPipe() {} + + public BlockingQueue getQueue() { return queue; } + public void setQueue(BlockingQueue queue) { this.queue = queue; } + + public void consume(I item) { + try { + this.consume(item,0,null); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public O produce() { + try { + return this.produce(0,null); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected static void validateTimeout(long timeout, TimeUnit unit) { + if (timeout > 0 && unit == null) + throw new IllegalArgumentException("Cannot set a timeout with a null TimeUnit"); + } +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java new file mode 100644 index 0000000..df1d757 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * A worker that repeats and draws items from a producer and feeds it to a consumer + * + * @author Christian Edward Gruber + */ +public abstract class AbstractPump extends RepeatingWorker { + + protected Long timeout = 0L; + protected TimeUnit timeoutUnit = TimeUnit.MILLISECONDS; + + public AbstractPump() {} + + public AbstractPump(String name, long timeout, TimeUnit timeoutUnit) { + super(name); + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + @Override + protected void validate() { + super.validate(); + if (this.timeout == null) + throw new IllegalStateException("Timeoutunit cannot be null."); + if (this.timeoutUnit == null) + throw new IllegalStateException("Timeoutunit cannot be null."); + } + + @Override + public boolean continueProcessing() { return isRunning(); } + + @Override + public long iterationDelay() { return 0; } + + public long getTimeout() { return timeout; } + + public void setTimeout(long timeout) { this.timeout = timeout; } + + public TimeUnit getTimeoutUnit() { return timeoutUnit; } + + public void setTimeoutUnit(TimeUnit timeoutUnit) { + this.timeoutUnit = timeoutUnit; + } + + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java new file mode 100644 index 0000000..14e51a0 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Worker.java 401 2007-01-26 02:05:20Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An object that wraps a Thread and performs units of work within that + * thread. + * + * @author Christian Edward Gruber + */ +public abstract class AbstractWorker implements Runnable, Worker { + private static Logger logger = Logger.getLogger(AbstractWorker.class.getName()); + + private Thread _thread = null; + private String _name = this.getClass().getName(); + private Throwable _error = null; + + private WorkerState state = WorkerState.Stopped; + + public AbstractWorker() { } + + public AbstractWorker(String name) { + this._name = name; + } + + /** + * Runnable.run() implementation. + * + */ + public void run() { + try { + //execute lifecycle + state = WorkerState.Starting; + _thread = Thread.currentThread(); + _thread.setName(_name); + _error = null; + if (init()) { + state = WorkerState.Running; + executeLifecycle(); + } + state = WorkerState.Stopping; + deinit(); + state = WorkerState.Stopped; + } catch (InterruptedException e){ + state = WorkerState.Stopped; + logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e); + _error = e; + } catch (RuntimeInterruptedException e){ + state = WorkerState.Stopped; + logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e); + _error=e; + } catch (Throwable e){ + state = WorkerState.Stopped; + logger.log(Level.SEVERE,e.getClass().getName() + " thrown within run() method of " + + this.getName() + ", setting stopped state.",e); + _error = e; + } + } + + abstract void executeLifecycle() throws InterruptedException; + + // Lifecycle phases + + protected boolean init() throws InterruptedException { + validate(); + return _thread != null && _thread.isAlive(); + } + + protected void preProcess() throws InterruptedException { } + + protected abstract void process() throws InterruptedException; + + protected void postProcess() throws InterruptedException { } + + protected void deinit() throws InterruptedException { } + + protected void validate() { + if (getName() == null || getName().equals("")) + throw new IllegalStateException(getClass().getName() + " cannot start without a name."); + } + + // + // SIGNALING + // + // TODO: Validate signaling. + // TODO: Implement cleaner signaling. + // + + public boolean isRunning() { + return _thread != null && _thread.isAlive() && state == WorkerState.Running; + } + public boolean isStopping(){ + return state == WorkerState.Stopping; + } + public boolean isStarting(){ + return state == WorkerState.Starting; + } + public boolean isStopped(){ + return state == WorkerState.Stopped; + } + + // Accessors + + public Thread getThread() { return _thread; } + + public String getName() { return _name; } + + public void setName(String name) { this._name = name; } + + /** + * If the worker has stopped due to a thrown error, it should be captured + * and available via this method. This method will throw an IllegalStateException + * if called when the worker is not in a Stopped state. + * + */ + public Throwable getError() { + if (!isStopped()) throw new IllegalStateException("No error available when worker not stopped."); + else return this._error; + } + /** + * Start the worker + * @return boolean Did the Worker startup properly. + */ + public boolean start() { + try { + if (!isStopped()) + throw new IllegalStateException("Attempted to start a worker that wasn't stopped"); + Thread t = new Thread(this); + t.start(); + Thread.yield(); + return true; + } catch (Exception e) { + logger.log(Level.INFO,this.getName() + "Thread failed to start",e); + return false; + } + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after 20ms if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop() throws InterruptedException{ + stop(20); + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds) throws InterruptedException{ + if (state == WorkerState.Stopped) return; // succeed fast. + state = WorkerState.Stopping; + if (getThread() != null) getThread().join(timeout_miliseconds); + hardStop(); + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds,int timeout_nanoseconds) throws InterruptedException{ + if (state == WorkerState.Stopped) return; // succeed fast. + state = WorkerState.Stopping; + if (getThread() != null) getThread().join(timeout_miliseconds,timeout_nanoseconds); + hardStop(); + } + + protected void hardStop() { + if (isStopped() || getThread() == null) return; + getThread().interrupt(); + this._thread = null; + this.state = WorkerState.Stopped; + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java new file mode 100644 index 0000000..3ed37af --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A worker that repeats and draws items from a producer and feeds it to a consumer + * + * @author Christian Edward Gruber + */ +public class CollectingPump extends AbstractPump { + + protected Collection> producers = null; + protected ConcurrentConsumer consumer = null; + private List> producersArray = null; + private int currentItem; + + public CollectingPump() {} + + public CollectingPump(String name, Collection> producers, ConcurrentConsumer consumer, + long timeout, TimeUnit timeoutUnit) { + super(name,timeout,timeoutUnit); + setProducers(producers); + setConsumer(consumer); + } + + @Override + protected void validate() { + super.validate(); + if (getConsumer() == null) + throw new IllegalStateException("consumer cannot be null."); + if (getProducers() == null || getProducers().size() == 0) + throw new IllegalStateException("producers cannot be null."); + } + + @Override + public boolean continueProcessing() { return isRunning(); } + + @Override + public long iterationDelay() { return 0; } + + public Collection> getProducers() { return producers; } + + public void setProducers(Collection> producers) { + if (this.isRunning()) + throw new IllegalStateException("Cannot set producers while this pump is not stopped."); + this.producers = producers; + } + + public ConcurrentConsumer getConsumer() { return consumer; } + + public void setConsumer(ConcurrentConsumer consumer) { + if (!this.isStopped()) + throw new IllegalStateException("Cannot set consumer while this pump is not stopped."); + this.consumer = consumer; + } + + @Override + protected boolean init() throws InterruptedException { + boolean init = super.init() && this.getConsumer() != null && + this.getProducers() != null && this.getProducers().size() > 0; + if (!init) return init; // fail fast. + producersArray = new ArrayList>(producers); // get consistent list. + return init; + } + + @Override + protected void process() throws InterruptedException { + ConcurrentProducer producer = producersArray.get(currentItem++); + if (currentItem >= producersArray.size()) currentItem = 0; + consumer.consume(producer.produce()); + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java new file mode 100644 index 0000000..c3bf409 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Sink.java 356 2007-01-21 04:17:47Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +import net.israfil.foundation.lifecycle.Consumer; + +/** + * A Consumer that overloads consume() with timeout values. + * + * @author Christian Edward Gruber + */ +public interface ConcurrentConsumer extends Consumer { + + /** + * Consumes an item of type T. This version will block until + * interrupted. To keep conformity with Consumer, if + * interrupted, a RuntimeInterruptedException should be thrown. + * This is an unchecked exception. + * + * @param item The item to be consumed. + */ + public void consume(T item); + + + /** + * Consumes an item of type T. If the consumer is not ready, then it will + * block the call until the timeout has been reached or the method is + * interrupted.If the object is closed, then this method will return + * null. This method will never return null except when it is closed. + * + * @param item The item to be consumed. + * @param timeout The length of time consume should attempt to operate. + * @param timeoutUnit The unit by which timeout should be interpreted. + */ + public void consume(T item, long timeout, TimeUnit timeoutUnit) throws InterruptedException; + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java new file mode 100644 index 0000000..2298b87 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 355 2007-01-21 02:41:03Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +import net.israfil.foundation.lifecycle.Producer; + +/** + * An object that is a source (producer) of objects + * + * @author Christian Edward Gruber + */ +public interface ConcurrentProducer extends Producer { + + /** + * Produces an item of type T. If the producer is not ready, then it will + * block the call until the timeout is interrupted. To keep conformity with + * Producer, if interrupted, a RuntimeInterruptedException should be thrown. + * This is an unchecked exception. + */ + public T produce(); + + /** + * Produces an item of type T. If the producer is not ready, then it will + * block the call until the timeout is reached or the method is + * interrupted. + */ + public T produce(long timeout, TimeUnit unit) throws InterruptedException; + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java new file mode 100644 index 0000000..ecce5db --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * A worker that repeats and draws items from a producer and feeds it to a consumer + * + * @author Christian Edward Gruber + */ +public class DistributingPump extends AbstractPump { + + protected ConcurrentProducer producer; + protected Collection> consumers; + private List> consumersArray = null; + private int currentItem; + + public DistributingPump() {} + + public DistributingPump(String name, ConcurrentProducer producer, Collection> consumers, + long timeout, TimeUnit timeoutUnit) { + super(name,timeout,timeoutUnit); + setProducer(producer); + setConsumers(consumers); + } + + @Override + protected void validate() { + super.validate(); + if (getProducer() == null) + throw new IllegalStateException("consumer cannot be null."); + if (getConsumers() == null || getConsumers().size() == 0) + throw new IllegalStateException("producers cannot be null."); + } + + @Override + public boolean continueProcessing() { return isRunning(); } + + @Override + public long iterationDelay() { return 0; } + + public ConcurrentProducer getProducer() { return producer; } + + public void setProducer(ConcurrentProducer producer) { this.producer = producer; } + + public Collection> getConsumers() { return consumers; } + + public void setConsumers(Collection> consumers) { this.consumers = consumers; } + + @Override + protected boolean init() throws InterruptedException { + boolean init = super.init() && this.getProducer() != null && + this.getConsumers() != null && this.getConsumers().size() > 0; + if (!init) return init; // fail fast. + consumersArray = new ArrayList>(consumers); // get consistent list. + return init; + } + + @Override + protected void process() throws InterruptedException { + ConcurrentConsumer consumer = consumersArray.get(currentItem++); + if (currentItem >= consumersArray.size()) currentItem = 0; + consumer.consume(producer.produce()); + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java new file mode 100644 index 0000000..18a2acf --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2007 Israfil Consulting Services Corporation + * Copyright (c) 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + + +/** + * A Gated object is one which can be in an open or closed state, and which + * provides operations to set the state. + * + * @author Christian Edward Gruber + */ +public interface Gated { + + public void open(); + + public void close(); + + public GateState getState(); + + public static enum GateState { Open, Close } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java new file mode 100644 index 0000000..3c6f196 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java @@ -0,0 +1,71 @@ +/* + * Copyright © 2003-2009 Israfil Consulting Services Corporation + * Copyright © 2003-2009 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: OneShotWorker.java 365 2007-01-22 04:36:55Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.logging.Logger; + +/** + * A Worker sub-class that implements a one-shot lifecycle, performing any + * initialization, pre-actions, the desired work, any post-work actions, and + * any de-initialization. + * + * @author Christian Edward Gruber + */ +public abstract class OneShotWorker extends AbstractWorker { + + private static Logger logger = Logger.getLogger(OneShotWorker.class.getName()); + + public OneShotWorker() { } + + public OneShotWorker(String name) { + super(name); + } + + public void executeLifecycle() throws InterruptedException { + logger.finer(getName() + " about to pre-process."); + preProcess(); + logger.finer(getName() + " completed pre-processing."); + logger.finer(getName() + " about to process."); + process(); + logger.finer(getName() + " completed processing."); + logger.finer(getName() + " about to post-process."); + postProcess(); + logger.finer(getName() + " completed post-processing."); + } + + public void preProcess() { } + + public void postProcess() { } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java new file mode 100644 index 0000000..9789f25 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + + +/** + * An object that consumes an object and processes it, then produces the result + * upon request. If the processor has no item in processing, it may consume + * an item. If the processor has an item it produces it when requested. In + * the converse cases the behaviour of the object depends on whether the + * processor is in its on state or its off state. + * + * @author Christian Edward Gruber + */ +public interface Pipe extends ConcurrentProducer, ConcurrentConsumer, Gated { + + + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java new file mode 100644 index 0000000..2e23c2a --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * An object that consumes an object and processes it, then produces the result + * upon request. If the processor has no item in processing, it may consume + * an item. If the processor has an item it produces it when requested. In + * the converse cases the behaviour of the object depends on whether the + * processor is in its on state or its off state. + * + * @author Christian Edward Gruber + */ +public class ProcessingPipe extends AbstractPipe { + + protected Processor processor; + protected Integer timeout; + protected TimeUnit timeoutUnit; + + public ProcessingPipe() {} + + public Processor getProcessor() { return processor; } + public void setProcessor(Processor processor) { + this.processor = processor; + } + public Integer getTimeout() { return timeout; } + public void setTimeout(Integer timeout) { this.timeout = timeout; } + public TimeUnit getTimeoutUnit() { return timeoutUnit; } + public void setTimeoutUnit(TimeUnit timeoutUnit) { + this.timeoutUnit = timeoutUnit; + } + + public void consume(I item, long timeout, TimeUnit unit) throws InterruptedException { + validateTimeout(timeout,unit); + try { + if (timeout > 0) getQueue().offer(item,timeout,unit); + else getQueue().put(item); + } catch (InterruptedException e) { + throw e; + } + } + + public O produce(long timeout, TimeUnit unit) throws InterruptedException { + validateTimeout(timeout,unit); + O processedItem = null; + try { + if (timeout > 0) + processedItem = this.processor.process(getQueue().poll(timeout,unit),getTimeout(),getTimeoutUnit()); + else + processedItem = this.processor.process(getQueue().poll(),getTimeout(),getTimeoutUnit()); + } catch (InterruptedException e) { + throw e; + } + return processedItem; + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java new file mode 100644 index 0000000..90a5cc7 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * An object that proceses an item and returns a result. + * + * @author Christian Edward Gruber + */ +public interface Processor { + + public O process(I item) throws InterruptedException; + + public O process(I item, long timeout, TimeUnit timeoutUnit) throws InterruptedException; + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java new file mode 100644 index 0000000..a6588fc --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: RepeatingWorker.java 367 2007-01-22 23:02:28Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A Worker sub-class that implements a slightly different lifecycle. In + * particular, it iterates based on a continuProcessing() signalling method. + * It also implements pre and post processing logic in case any per-iteration + * setup and tear-down is required. + * + * + * @author Christian Edward Gruber + */ +public abstract class RepeatingWorker extends AbstractWorker { + + private static Logger logger = Logger.getLogger(RepeatingWorker.class.getName()); + + public RepeatingWorker() { } + + public RepeatingWorker(String name) { super(name); } + + public void executeLifecycle() throws InterruptedException { + do { + logger.finer(getName() + " about to pre-process."); + preProcess(); + logger.finer(getName() + " completed pre-processing."); + logger.finer(getName() + " about to process."); + process(); + logger.finer(getName() + " completed processing."); + logger.finer(getName() + " about to post-process."); + postProcess(); + logger.finer(getName() + " completed post-processing."); + Thread.yield(); + } while (continueProcessing() && delay()); + } + + public abstract boolean continueProcessing() throws InterruptedException; + + /** + * @return long Number of milliseconds to delay between iterations + */ + public abstract long iterationDelay(); + + /** + * Returns boolean only so it can be used in a while statement. delay() + * simply sleeps as long as iterationDelay() indicates, then returns true, + * unless interrupted in which case it returns false. + */ + private boolean delay() { + try { + Thread.sleep(iterationDelay()); + return true; + } catch (InterruptedException e) { + logger.log(Level.WARNING, getName() + " interrupted.",e); + return false; + } + } + + public void preProcess() { } + + public void postProcess() { } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java new file mode 100644 index 0000000..ee05a44 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java @@ -0,0 +1,64 @@ +/* + * Copyright © 2003-2009 Israfil Consulting Services Corporation + * Copyright © 2003-2009 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: OneShotWorker.java 365 2007-01-22 04:36:55Z cgruber $ + */ +package net.israfil.foundation.concurrent; + + +/** + * A runtime (unchecked) equivalent to InterruptedException + * + * @author Christian Edward Gruber + */ +public class RuntimeInterruptedException extends RuntimeException { + + /** + * + */ + private static final long serialVersionUID = -667612054120346668L; + + public RuntimeInterruptedException() { + super(); + } + + public RuntimeInterruptedException(String s) { + super(s); + } + + public RuntimeInterruptedException(String s, Throwable cause) { + super(s,cause); + } + + public RuntimeInterruptedException(Throwable cause) { + super(cause); + } +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java new file mode 100644 index 0000000..d1383fb --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation + * Copyright (c) 2006, 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * An object that consumes an object and queues it, then produces the object + * upon request. If the processor has an unfilled queue, it may consume + * an item. If the processor has an item it produces it when requested. In + * the converse cases the behaviour of the object depends on whether the + * processor is in its on state or its off state. + * + * @author Christian Edward Gruber + */ +public class SimplePipe extends AbstractPipe { + + public SimplePipe() {} + + public void consume(I item, long timeout, TimeUnit unit) throws InterruptedException { + validateTimeout(timeout,unit); + try { + if (timeout > 0) getQueue().offer(item,timeout,unit); + else getQueue().put(item); + } catch (InterruptedException e) { + throw e; + } + } + + public I produce(long timeout, TimeUnit unit) throws InterruptedException { + validateTimeout(timeout,unit); + try { + if (timeout > 0) + return getQueue().poll(timeout,unit); + else + return getQueue().poll(); + } catch (InterruptedException e) { + throw e; + } + + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java new file mode 100644 index 0000000..4b9b11a --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: SimplePump.java 367 2007-01-22 23:02:28Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.concurrent.TimeUnit; + +/** + * A worker that repeats and draws items from a producer and feeds it to a consumer + * + * @author Christian Edward Gruber + */ +public class SimplePump extends AbstractPump { + + protected ConcurrentProducer producer; + protected ConcurrentConsumer consumer; + + public SimplePump() {} + + public SimplePump(String name, ConcurrentProducer producer, ConcurrentConsumer consumer, + long timeout, TimeUnit timeoutUnit) { + super(name,timeout,timeoutUnit); + setProducer(producer); + setConsumer(consumer); + } + + @Override + protected void validate() { + super.validate(); + if (getConsumer() == null) + throw new IllegalStateException("consumer cannot be null."); + if (getProducer() == null ) + throw new IllegalStateException("producer cannot be null."); + } + + @Override + public boolean continueProcessing() { return isRunning(); } + + @Override + public long iterationDelay() { return 0; } + + public ConcurrentProducer getProducer() { return producer; } + + public void setProducer(ConcurrentProducer producer) { this.producer = producer; } + + public ConcurrentConsumer getConsumer() { return consumer; } + + public void setConsumer(ConcurrentConsumer consumer) { this.consumer = consumer; } + + @Override + protected void process() throws InterruptedException { + consumer.consume(producer.produce()); + } + +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java new file mode 100644 index 0000000..697d466 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2003 - 2009 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2009 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Worker.java 401 2007-01-26 02:05:20Z cgruber $ + */ +package net.israfil.foundation.concurrent; + + +/** + * An object that wraps a Thread and performs units of work within that + * thread. Very handy for concurrent development. + * + * @author Christian Edward Gruber + */ +public interface Worker { + + + + // + // SIGNALING + // + // TODO: Validate signaling. + // TODO: Implement cleaner signaling. + // + + public boolean isRunning(); + + public boolean isStopping(); + + public boolean isStarting(); + + public boolean isStopped(); + + // Accessors + + public String getName(); + + /** + * If the worker has stopped due to a thrown error, it should be captured + * and available via this method. This method will throw an IllegalStateException + * if called when the worker is not in a Stopped state. + * + */ + public Throwable getError(); + + /** + * Start the worker + * @return boolean Did the Worker startup properly. + */ + public boolean start(); + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after 20ms if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop() throws InterruptedException; + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds) throws InterruptedException; + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds,int timeout_nanoseconds) throws InterruptedException; + + public static enum WorkerState { + Stopped,Starting,Running,Stopping + } +} diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java new file mode 100644 index 0000000..7fd2121 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2006 Israfil Consulting Services Corporation + * Copyright (c) 2006 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Dynamic.java 14 2006-01-27 23:50:37Z cgruber $ + */ +package net.israfil.foundation.concurrent.consumers; + +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.israfil.foundation.concurrent.ConcurrentConsumer; + +/** + * A consumer that consumes strings and sends them to a provided logger at a + * provided level. + * + * @author Christian Edward Gruber + */ +public class LoggingStringConsumer implements ConcurrentConsumer { + private Logger logger; + private Level level; + public LoggingStringConsumer(Logger logger,Level level) { + this.logger = logger; + this.level = level; + } + public void consume(String item) { + logger.log(level,item); + } + public void consume(String item, long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("Timeouts not supported. Use consume(String)"); + } +} \ No newline at end of file diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java new file mode 100644 index 0000000..f232b00 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2006 Israfil Consulting Services Corporation + * Copyright (c) 2006 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Dynamic.java 14 2006-01-27 23:50:37Z cgruber $ + */ +package net.israfil.foundation.concurrent.producers; + +import java.io.IOException; +import java.io.Reader; +import java.nio.CharBuffer; +import java.util.concurrent.TimeUnit; + +import net.israfil.foundation.concurrent.ConcurrentProducer; + +/** + * A provider that reads an input stream and creates strings out of lines + * of text. + * + * @author Christian Edward Gruber + */ +public class ReaderStringProducer implements ConcurrentProducer { + Reader reader = null; + public ReaderStringProducer(Reader reader) { + this.reader = reader; + } + public String produce() { + CharBuffer cb = CharBuffer.allocate(1024); + int count = 0; + try { + int val = reader.read(); + while (val != -1) { + cb.append( (char)val ); + count++; + if ((char)val == '\n') break; + val = reader.read(); + } + if (count == 0 && val == -1) return null; + } catch (IOException e) { return null; } + char[] finalarray = new char[count]; + cb.position(0); + cb.get(finalarray); + return new String(finalarray); + } + public String produce(long timeout, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException("Timeouts not supported. Use produce()"); + } + + + + } + \ No newline at end of file diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java new file mode 100644 index 0000000..0abe15b --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $ + */ +package net.israfil.foundation.concurrent.timing; + +import java.util.concurrent.TimeoutException; + +/** + * An object that represents a timeout value; + * + * @author Christian Edward Gruber + */ +public class Timeout { + + private final long startTime; + private final long timeout; + + public Timeout(long timeout) { + this.timeout = timeout; + startTime = System.currentTimeMillis(); + } + public void assertWithinTimeout() throws TimeoutException { + long now = System.currentTimeMillis(); + if (now - startTime > timeout) { + throw new TimeoutException("Timeout of " + timeout + "ms exceeded: " + (now - startTime) + "ms."); + } + } + public long getStartTime() { + return startTime; + } + public long getTimeout() { + return timeout; + } + public void sleep(long interval) throws InterruptedException { + Thread.sleep(interval); + } + +} -- cgit v1.2.3