summaryrefslogtreecommitdiff
path: root/israfil-foundation-concurrent/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'israfil-foundation-concurrent/src/main')
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java64
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java77
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java82
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java212
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java109
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java70
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java62
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java101
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java53
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java71
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java50
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java90
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java49
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java97
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java64
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java74
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java85
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java104
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java61
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java79
-rw-r--r--israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java68
21 files changed, 1722 insertions, 0 deletions
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java
new file mode 100644
index 0000000..2389c95
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractGated.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+
+/**
+ * An object that consumes an object and processes it, then produces the result
+ * upon request. If the processor has no item in processing, it may consume
+ * an item. If the processor has an item it produces it when requested. In
+ * the converse cases the behaviour of the object depends on whether the
+ * processor is in its on state or its off state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public abstract class AbstractGated implements Gated {
+
+ private GateState state = GateState.Close;
+
+ public AbstractGated() {}
+
+ public void close() {
+ this.state = GateState.Close;
+ }
+
+ public GateState getState() {
+ return state;
+ }
+
+ public void open() {
+ this.state = GateState.Open;
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java
new file mode 100644
index 0000000..4c5226a
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPipe.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An object that consumes an object and processes it, then produces the result
+ * upon request. If the processor has no item in processing, it may consume
+ * an item. If the processor has an item it produces it when requested. In
+ * the converse cases the behaviour of the object depends on whether the
+ * processor is in its on state or its off state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public abstract class AbstractPipe<I,O> extends AbstractGated implements Pipe<I,O> {
+
+ protected BlockingQueue<I> queue;
+
+ public AbstractPipe() {}
+
+ public BlockingQueue<I> getQueue() { return queue; }
+ public void setQueue(BlockingQueue<I> queue) { this.queue = queue; }
+
+ public void consume(I item) {
+ try {
+ this.consume(item,0,null);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public O produce() {
+ try {
+ return this.produce(0,null);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static void validateTimeout(long timeout, TimeUnit unit) {
+ if (timeout > 0 && unit == null)
+ throw new IllegalArgumentException("Cannot set a timeout with a null TimeUnit");
+ }
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java
new file mode 100644
index 0000000..df1d757
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractPump.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A worker that repeats and draws items from a producer and feeds it to a consumer
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public abstract class AbstractPump<T> extends RepeatingWorker {
+
+ protected Long timeout = 0L;
+ protected TimeUnit timeoutUnit = TimeUnit.MILLISECONDS;
+
+ public AbstractPump() {}
+
+ public AbstractPump(String name, long timeout, TimeUnit timeoutUnit) {
+ super(name);
+ this.timeout = timeout;
+ this.timeoutUnit = timeoutUnit;
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ if (this.timeout == null)
+ throw new IllegalStateException("Timeoutunit cannot be null.");
+ if (this.timeoutUnit == null)
+ throw new IllegalStateException("Timeoutunit cannot be null.");
+ }
+
+ @Override
+ public boolean continueProcessing() { return isRunning(); }
+
+ @Override
+ public long iterationDelay() { return 0; }
+
+ public long getTimeout() { return timeout; }
+
+ public void setTimeout(long timeout) { this.timeout = timeout; }
+
+ public TimeUnit getTimeoutUnit() { return timeoutUnit; }
+
+ public void setTimeoutUnit(TimeUnit timeoutUnit) {
+ this.timeoutUnit = timeoutUnit;
+ }
+
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java
new file mode 100644
index 0000000..14e51a0
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Worker.java 401 2007-01-26 02:05:20Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An object that wraps a Thread and performs units of work within that
+ * thread.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public abstract class AbstractWorker implements Runnable, Worker {
+ private static Logger logger = Logger.getLogger(AbstractWorker.class.getName());
+
+ private Thread _thread = null;
+ private String _name = this.getClass().getName();
+ private Throwable _error = null;
+
+ private WorkerState state = WorkerState.Stopped;
+
+ public AbstractWorker() { }
+
+ public AbstractWorker(String name) {
+ this._name = name;
+ }
+
+ /**
+ * Runnable.run() implementation.
+ *
+ */
+ public void run() {
+ try {
+ //execute lifecycle
+ state = WorkerState.Starting;
+ _thread = Thread.currentThread();
+ _thread.setName(_name);
+ _error = null;
+ if (init()) {
+ state = WorkerState.Running;
+ executeLifecycle();
+ }
+ state = WorkerState.Stopping;
+ deinit();
+ state = WorkerState.Stopped;
+ } catch (InterruptedException e){
+ state = WorkerState.Stopped;
+ logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e);
+ _error = e;
+ } catch (RuntimeInterruptedException e){
+ state = WorkerState.Stopped;
+ logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e);
+ _error=e;
+ } catch (Throwable e){
+ state = WorkerState.Stopped;
+ logger.log(Level.SEVERE,e.getClass().getName() + " thrown within run() method of " +
+ this.getName() + ", setting stopped state.",e);
+ _error = e;
+ }
+ }
+
+ abstract void executeLifecycle() throws InterruptedException;
+
+ // Lifecycle phases
+
+ protected boolean init() throws InterruptedException {
+ validate();
+ return _thread != null && _thread.isAlive();
+ }
+
+ protected void preProcess() throws InterruptedException { }
+
+ protected abstract void process() throws InterruptedException;
+
+ protected void postProcess() throws InterruptedException { }
+
+ protected void deinit() throws InterruptedException { }
+
+ protected void validate() {
+ if (getName() == null || getName().equals(""))
+ throw new IllegalStateException(getClass().getName() + " cannot start without a name.");
+ }
+
+ //
+ // SIGNALING
+ //
+ // TODO: Validate signaling.
+ // TODO: Implement cleaner signaling.
+ //
+
+ public boolean isRunning() {
+ return _thread != null && _thread.isAlive() && state == WorkerState.Running;
+ }
+ public boolean isStopping(){
+ return state == WorkerState.Stopping;
+ }
+ public boolean isStarting(){
+ return state == WorkerState.Starting;
+ }
+ public boolean isStopped(){
+ return state == WorkerState.Stopped;
+ }
+
+ // Accessors
+
+ public Thread getThread() { return _thread; }
+
+ public String getName() { return _name; }
+
+ public void setName(String name) { this._name = name; }
+
+ /**
+ * If the worker has stopped due to a thrown error, it should be captured
+ * and available via this method. This method will throw an IllegalStateException
+ * if called when the worker is not in a Stopped state.
+ *
+ */
+ public Throwable getError() {
+ if (!isStopped()) throw new IllegalStateException("No error available when worker not stopped.");
+ else return this._error;
+ }
+ /**
+ * Start the worker
+ * @return boolean Did the Worker startup properly.
+ */
+ public boolean start() {
+ try {
+ if (!isStopped())
+ throw new IllegalStateException("Attempted to start a worker that wasn't stopped");
+ Thread t = new Thread(this);
+ t.start();
+ Thread.yield();
+ return true;
+ } catch (Exception e) {
+ logger.log(Level.INFO,this.getName() + "Thread failed to start",e);
+ return false;
+ }
+ }
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after 20ms if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop() throws InterruptedException{
+ stop(20);
+ }
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after a given timeout if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop(long timeout_miliseconds) throws InterruptedException{
+ if (state == WorkerState.Stopped) return; // succeed fast.
+ state = WorkerState.Stopping;
+ if (getThread() != null) getThread().join(timeout_miliseconds);
+ hardStop();
+ }
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after a given timeout if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop(long timeout_miliseconds,int timeout_nanoseconds) throws InterruptedException{
+ if (state == WorkerState.Stopped) return; // succeed fast.
+ state = WorkerState.Stopping;
+ if (getThread() != null) getThread().join(timeout_miliseconds,timeout_nanoseconds);
+ hardStop();
+ }
+
+ protected void hardStop() {
+ if (isStopped() || getThread() == null) return;
+ getThread().interrupt();
+ this._thread = null;
+ this.state = WorkerState.Stopped;
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java
new file mode 100644
index 0000000..3ed37af
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/CollectingPump.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A worker that repeats and draws items from a producer and feeds it to a consumer
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class CollectingPump<T> extends AbstractPump<T> {
+
+ protected Collection<ConcurrentProducer<T>> producers = null;
+ protected ConcurrentConsumer<T> consumer = null;
+ private List<ConcurrentProducer<T>> producersArray = null;
+ private int currentItem;
+
+ public CollectingPump() {}
+
+ public CollectingPump(String name, Collection<ConcurrentProducer<T>> producers, ConcurrentConsumer<T> consumer,
+ long timeout, TimeUnit timeoutUnit) {
+ super(name,timeout,timeoutUnit);
+ setProducers(producers);
+ setConsumer(consumer);
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ if (getConsumer() == null)
+ throw new IllegalStateException("consumer cannot be null.");
+ if (getProducers() == null || getProducers().size() == 0)
+ throw new IllegalStateException("producers cannot be null.");
+ }
+
+ @Override
+ public boolean continueProcessing() { return isRunning(); }
+
+ @Override
+ public long iterationDelay() { return 0; }
+
+ public Collection<ConcurrentProducer<T>> getProducers() { return producers; }
+
+ public void setProducers(Collection<ConcurrentProducer<T>> producers) {
+ if (this.isRunning())
+ throw new IllegalStateException("Cannot set producers while this pump is not stopped.");
+ this.producers = producers;
+ }
+
+ public ConcurrentConsumer<T> getConsumer() { return consumer; }
+
+ public void setConsumer(ConcurrentConsumer<T> consumer) {
+ if (!this.isStopped())
+ throw new IllegalStateException("Cannot set consumer while this pump is not stopped.");
+ this.consumer = consumer;
+ }
+
+ @Override
+ protected boolean init() throws InterruptedException {
+ boolean init = super.init() && this.getConsumer() != null &&
+ this.getProducers() != null && this.getProducers().size() > 0;
+ if (!init) return init; // fail fast.
+ producersArray = new ArrayList<ConcurrentProducer<T>>(producers); // get consistent list.
+ return init;
+ }
+
+ @Override
+ protected void process() throws InterruptedException {
+ ConcurrentProducer<T> producer = producersArray.get(currentItem++);
+ if (currentItem >= producersArray.size()) currentItem = 0;
+ consumer.consume(producer.produce());
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java
new file mode 100644
index 0000000..c3bf409
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentConsumer.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Sink.java 356 2007-01-21 04:17:47Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+import net.israfil.foundation.lifecycle.Consumer;
+
+/**
+ * A Consumer that overloads consume() with timeout values.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface ConcurrentConsumer<T> extends Consumer<T> {
+
+ /**
+ * Consumes an item of type T. This version will block until
+ * interrupted. To keep conformity with Consumer<T>, if
+ * interrupted, a RuntimeInterruptedException should be thrown.
+ * This is an unchecked exception.
+ *
+ * @param item The item to be consumed.
+ */
+ public void consume(T item);
+
+
+ /**
+ * Consumes an item of type T. If the consumer is not ready, then it will
+ * block the call until the timeout has been reached or the method is
+ * interrupted.If the object is closed, then this method will return
+ * null. This method will never return null except when it is closed.
+ *
+ * @param item The item to be consumed.
+ * @param timeout The length of time consume should attempt to operate.
+ * @param timeoutUnit The unit by which timeout should be interpreted.
+ */
+ public void consume(T item, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java
new file mode 100644
index 0000000..2298b87
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ConcurrentProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 355 2007-01-21 02:41:03Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+import net.israfil.foundation.lifecycle.Producer;
+
+/**
+ * An object that is a source (producer) of objects
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface ConcurrentProducer<T> extends Producer<T> {
+
+ /**
+ * Produces an item of type T. If the producer is not ready, then it will
+ * block the call until the timeout is interrupted. To keep conformity with
+ * Producer<T>, if interrupted, a RuntimeInterruptedException should be thrown.
+ * This is an unchecked exception.
+ */
+ public T produce();
+
+ /**
+ * Produces an item of type T. If the producer is not ready, then it will
+ * block the call until the timeout is reached or the method is
+ * interrupted.
+ */
+ public T produce(long timeout, TimeUnit unit) throws InterruptedException;
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java
new file mode 100644
index 0000000..ecce5db
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/DistributingPump.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A worker that repeats and draws items from a producer and feeds it to a consumer
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class DistributingPump<T> extends AbstractPump<T> {
+
+ protected ConcurrentProducer<T> producer;
+ protected Collection<ConcurrentConsumer<T>> consumers;
+ private List<ConcurrentConsumer<T>> consumersArray = null;
+ private int currentItem;
+
+ public DistributingPump() {}
+
+ public DistributingPump(String name, ConcurrentProducer<T> producer, Collection<ConcurrentConsumer<T>> consumers,
+ long timeout, TimeUnit timeoutUnit) {
+ super(name,timeout,timeoutUnit);
+ setProducer(producer);
+ setConsumers(consumers);
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ if (getProducer() == null)
+ throw new IllegalStateException("consumer cannot be null.");
+ if (getConsumers() == null || getConsumers().size() == 0)
+ throw new IllegalStateException("producers cannot be null.");
+ }
+
+ @Override
+ public boolean continueProcessing() { return isRunning(); }
+
+ @Override
+ public long iterationDelay() { return 0; }
+
+ public ConcurrentProducer<T> getProducer() { return producer; }
+
+ public void setProducer(ConcurrentProducer<T> producer) { this.producer = producer; }
+
+ public Collection<ConcurrentConsumer<T>> getConsumers() { return consumers; }
+
+ public void setConsumers(Collection<ConcurrentConsumer<T>> consumers) { this.consumers = consumers; }
+
+ @Override
+ protected boolean init() throws InterruptedException {
+ boolean init = super.init() && this.getProducer() != null &&
+ this.getConsumers() != null && this.getConsumers().size() > 0;
+ if (!init) return init; // fail fast.
+ consumersArray = new ArrayList<ConcurrentConsumer<T>>(consumers); // get consistent list.
+ return init;
+ }
+
+ @Override
+ protected void process() throws InterruptedException {
+ ConcurrentConsumer<T> consumer = consumersArray.get(currentItem++);
+ if (currentItem >= consumersArray.size()) currentItem = 0;
+ consumer.consume(producer.produce());
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java
new file mode 100644
index 0000000..18a2acf
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Gated.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+
+/**
+ * A Gated object is one which can be in an open or closed state, and which
+ * provides operations to set the state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface Gated {
+
+ public void open();
+
+ public void close();
+
+ public GateState getState();
+
+ public static enum GateState { Open, Close }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java
new file mode 100644
index 0000000..3c6f196
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/OneShotWorker.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright © 2003-2009 Israfil Consulting Services Corporation
+ * Copyright © 2003-2009 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: OneShotWorker.java 365 2007-01-22 04:36:55Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.logging.Logger;
+
+/**
+ * A Worker sub-class that implements a one-shot lifecycle, performing any
+ * initialization, pre-actions, the desired work, any post-work actions, and
+ * any de-initialization.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public abstract class OneShotWorker extends AbstractWorker {
+
+ private static Logger logger = Logger.getLogger(OneShotWorker.class.getName());
+
+ public OneShotWorker() { }
+
+ public OneShotWorker(String name) {
+ super(name);
+ }
+
+ public void executeLifecycle() throws InterruptedException {
+ logger.finer(getName() + " about to pre-process.");
+ preProcess();
+ logger.finer(getName() + " completed pre-processing.");
+ logger.finer(getName() + " about to process.");
+ process();
+ logger.finer(getName() + " completed processing.");
+ logger.finer(getName() + " about to post-process.");
+ postProcess();
+ logger.finer(getName() + " completed post-processing.");
+ }
+
+ public void preProcess() { }
+
+ public void postProcess() { }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java
new file mode 100644
index 0000000..9789f25
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Pipe.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+
+/**
+ * An object that consumes an object and processes it, then produces the result
+ * upon request. If the processor has no item in processing, it may consume
+ * an item. If the processor has an item it produces it when requested. In
+ * the converse cases the behaviour of the object depends on whether the
+ * processor is in its on state or its off state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface Pipe<I,O> extends ConcurrentProducer<O>, ConcurrentConsumer<I>, Gated {
+
+
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java
new file mode 100644
index 0000000..2e23c2a
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/ProcessingPipe.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An object that consumes an object and processes it, then produces the result
+ * upon request. If the processor has no item in processing, it may consume
+ * an item. If the processor has an item it produces it when requested. In
+ * the converse cases the behaviour of the object depends on whether the
+ * processor is in its on state or its off state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public class ProcessingPipe<I,O> extends AbstractPipe<I,O> {
+
+ protected Processor<I,O> processor;
+ protected Integer timeout;
+ protected TimeUnit timeoutUnit;
+
+ public ProcessingPipe() {}
+
+ public Processor<I,O> getProcessor() { return processor; }
+ public void setProcessor(Processor<I,O> processor) {
+ this.processor = processor;
+ }
+ public Integer getTimeout() { return timeout; }
+ public void setTimeout(Integer timeout) { this.timeout = timeout; }
+ public TimeUnit getTimeoutUnit() { return timeoutUnit; }
+ public void setTimeoutUnit(TimeUnit timeoutUnit) {
+ this.timeoutUnit = timeoutUnit;
+ }
+
+ public void consume(I item, long timeout, TimeUnit unit) throws InterruptedException {
+ validateTimeout(timeout,unit);
+ try {
+ if (timeout > 0) getQueue().offer(item,timeout,unit);
+ else getQueue().put(item);
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ }
+
+ public O produce(long timeout, TimeUnit unit) throws InterruptedException {
+ validateTimeout(timeout,unit);
+ O processedItem = null;
+ try {
+ if (timeout > 0)
+ processedItem = this.processor.process(getQueue().poll(timeout,unit),getTimeout(),getTimeoutUnit());
+ else
+ processedItem = this.processor.process(getQueue().poll(),getTimeout(),getTimeoutUnit());
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ return processedItem;
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java
new file mode 100644
index 0000000..90a5cc7
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Processor.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An object that proceses an item and returns a result.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface Processor<I,O> {
+
+ public O process(I item) throws InterruptedException;
+
+ public O process(I item, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java
new file mode 100644
index 0000000..a6588fc
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RepeatingWorker.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: RepeatingWorker.java 367 2007-01-22 23:02:28Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Worker sub-class that implements a slightly different lifecycle. In
+ * particular, it iterates based on a continuProcessing() signalling method.
+ * It also implements pre and post processing logic in case any per-iteration
+ * setup and tear-down is required.
+ *
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public abstract class RepeatingWorker extends AbstractWorker {
+
+ private static Logger logger = Logger.getLogger(RepeatingWorker.class.getName());
+
+ public RepeatingWorker() { }
+
+ public RepeatingWorker(String name) { super(name); }
+
+ public void executeLifecycle() throws InterruptedException {
+ do {
+ logger.finer(getName() + " about to pre-process.");
+ preProcess();
+ logger.finer(getName() + " completed pre-processing.");
+ logger.finer(getName() + " about to process.");
+ process();
+ logger.finer(getName() + " completed processing.");
+ logger.finer(getName() + " about to post-process.");
+ postProcess();
+ logger.finer(getName() + " completed post-processing.");
+ Thread.yield();
+ } while (continueProcessing() && delay());
+ }
+
+ public abstract boolean continueProcessing() throws InterruptedException;
+
+ /**
+ * @return long Number of milliseconds to delay between iterations
+ */
+ public abstract long iterationDelay();
+
+ /**
+ * Returns boolean only so it can be used in a while statement. delay()
+ * simply sleeps as long as iterationDelay() indicates, then returns true,
+ * unless interrupted in which case it returns false.
+ */
+ private boolean delay() {
+ try {
+ Thread.sleep(iterationDelay());
+ return true;
+ } catch (InterruptedException e) {
+ logger.log(Level.WARNING, getName() + " interrupted.",e);
+ return false;
+ }
+ }
+
+ public void preProcess() { }
+
+ public void postProcess() { }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java
new file mode 100644
index 0000000..ee05a44
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/RuntimeInterruptedException.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2003-2009 Israfil Consulting Services Corporation
+ * Copyright © 2003-2009 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: OneShotWorker.java 365 2007-01-22 04:36:55Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+
+/**
+ * A runtime (unchecked) equivalent to InterruptedException
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public class RuntimeInterruptedException extends RuntimeException {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -667612054120346668L;
+
+ public RuntimeInterruptedException() {
+ super();
+ }
+
+ public RuntimeInterruptedException(String s) {
+ super(s);
+ }
+
+ public RuntimeInterruptedException(String s, Throwable cause) {
+ super(s,cause);
+ }
+
+ public RuntimeInterruptedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java
new file mode 100644
index 0000000..d1383fb
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePipe.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2006, 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2006, 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Source.java 129 2006-12-31 23:20:02Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An object that consumes an object and queues it, then produces the object
+ * upon request. If the processor has an unfilled queue, it may consume
+ * an item. If the processor has an item it produces it when requested. In
+ * the converse cases the behaviour of the object depends on whether the
+ * processor is in its on state or its off state.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public class SimplePipe<I> extends AbstractPipe<I,I> {
+
+ public SimplePipe() {}
+
+ public void consume(I item, long timeout, TimeUnit unit) throws InterruptedException {
+ validateTimeout(timeout,unit);
+ try {
+ if (timeout > 0) getQueue().offer(item,timeout,unit);
+ else getQueue().put(item);
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ }
+
+ public I produce(long timeout, TimeUnit unit) throws InterruptedException {
+ validateTimeout(timeout,unit);
+ try {
+ if (timeout > 0)
+ return getQueue().poll(timeout,unit);
+ else
+ return getQueue().poll();
+ } catch (InterruptedException e) {
+ throw e;
+ }
+
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java
new file mode 100644
index 0000000..4b9b11a
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/SimplePump.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: SimplePump.java 367 2007-01-22 23:02:28Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A worker that repeats and draws items from a producer and feeds it to a consumer
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class SimplePump<T> extends AbstractPump<T> {
+
+ protected ConcurrentProducer<T> producer;
+ protected ConcurrentConsumer<T> consumer;
+
+ public SimplePump() {}
+
+ public SimplePump(String name, ConcurrentProducer<T> producer, ConcurrentConsumer<T> consumer,
+ long timeout, TimeUnit timeoutUnit) {
+ super(name,timeout,timeoutUnit);
+ setProducer(producer);
+ setConsumer(consumer);
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ if (getConsumer() == null)
+ throw new IllegalStateException("consumer cannot be null.");
+ if (getProducer() == null )
+ throw new IllegalStateException("producer cannot be null.");
+ }
+
+ @Override
+ public boolean continueProcessing() { return isRunning(); }
+
+ @Override
+ public long iterationDelay() { return 0; }
+
+ public ConcurrentProducer<T> getProducer() { return producer; }
+
+ public void setProducer(ConcurrentProducer<T> producer) { this.producer = producer; }
+
+ public ConcurrentConsumer<T> getConsumer() { return consumer; }
+
+ public void setConsumer(ConcurrentConsumer<T> consumer) { this.consumer = consumer; }
+
+ @Override
+ protected void process() throws InterruptedException {
+ consumer.consume(producer.produce());
+ }
+
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java
new file mode 100644
index 0000000..697d466
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/Worker.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2003 - 2009 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2009 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Worker.java 401 2007-01-26 02:05:20Z cgruber $
+ */
+package net.israfil.foundation.concurrent;
+
+
+/**
+ * An object that wraps a Thread and performs units of work within that
+ * thread. Very handy for concurrent development.
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber </a>
+ */
+public interface Worker {
+
+
+
+ //
+ // SIGNALING
+ //
+ // TODO: Validate signaling.
+ // TODO: Implement cleaner signaling.
+ //
+
+ public boolean isRunning();
+
+ public boolean isStopping();
+
+ public boolean isStarting();
+
+ public boolean isStopped();
+
+ // Accessors
+
+ public String getName();
+
+ /**
+ * If the worker has stopped due to a thrown error, it should be captured
+ * and available via this method. This method will throw an IllegalStateException
+ * if called when the worker is not in a Stopped state.
+ *
+ */
+ public Throwable getError();
+
+ /**
+ * Start the worker
+ * @return boolean Did the Worker startup properly.
+ */
+ public boolean start();
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after 20ms if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop() throws InterruptedException;
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after a given timeout if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop(long timeout_miliseconds) throws InterruptedException;
+
+ /**
+ * Attempts to gracefully shutdown the worker, but interrupts the
+ * thread after a given timeout if the worker does not stop on its own.
+ * @throws InterruptedException
+ */
+ public void stop(long timeout_miliseconds,int timeout_nanoseconds) throws InterruptedException;
+
+ public static enum WorkerState {
+ Stopped,Starting,Running,Stopping
+ }
+}
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java
new file mode 100644
index 0000000..7fd2121
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/consumers/LoggingStringConsumer.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2006 Israfil Consulting Services Corporation
+ * Copyright (c) 2006 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Dynamic.java 14 2006-01-27 23:50:37Z cgruber $
+ */
+package net.israfil.foundation.concurrent.consumers;
+
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.israfil.foundation.concurrent.ConcurrentConsumer;
+
+/**
+ * A consumer that consumes strings and sends them to a provided logger at a
+ * provided level.
+ *
+ * @author <a href="cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class LoggingStringConsumer implements ConcurrentConsumer<String> {
+ private Logger logger;
+ private Level level;
+ public LoggingStringConsumer(Logger logger,Level level) {
+ this.logger = logger;
+ this.level = level;
+ }
+ public void consume(String item) {
+ logger.log(level,item);
+ }
+ public void consume(String item, long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("Timeouts not supported. Use consume(String)");
+ }
+} \ No newline at end of file
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java
new file mode 100644
index 0000000..f232b00
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/producers/ReaderStringProducer.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2006 Israfil Consulting Services Corporation
+ * Copyright (c) 2006 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Dynamic.java 14 2006-01-27 23:50:37Z cgruber $
+ */
+package net.israfil.foundation.concurrent.producers;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.CharBuffer;
+import java.util.concurrent.TimeUnit;
+
+import net.israfil.foundation.concurrent.ConcurrentProducer;
+
+/**
+ * A provider that reads an input stream and creates strings out of lines
+ * of text.
+ *
+ * @author <a href="cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class ReaderStringProducer implements ConcurrentProducer<String> {
+ Reader reader = null;
+ public ReaderStringProducer(Reader reader) {
+ this.reader = reader;
+ }
+ public String produce() {
+ CharBuffer cb = CharBuffer.allocate(1024);
+ int count = 0;
+ try {
+ int val = reader.read();
+ while (val != -1) {
+ cb.append( (char)val );
+ count++;
+ if ((char)val == '\n') break;
+ val = reader.read();
+ }
+ if (count == 0 && val == -1) return null;
+ } catch (IOException e) { return null; }
+ char[] finalarray = new char[count];
+ cb.position(0);
+ cb.get(finalarray);
+ return new String(finalarray);
+ }
+ public String produce(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException("Timeouts not supported. Use produce()");
+ }
+
+
+
+ }
+ \ No newline at end of file
diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java
new file mode 100644
index 0000000..0abe15b
--- /dev/null
+++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/timing/Timeout.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation
+ * Copyright (c) 2003 - 2007 Christian Edward Gruber
+ * All Rights Reserved
+ *
+ * This software is licensed under the Berkeley Standard Distribution license,
+ * (BSD license), as defined below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. Neither the name of Israfil Consulting Services nor the names of its contributors
+ * may be used to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
+ * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
+ * OF SUCH DAMAGE.
+ *
+ * $Id: Pump.java 362 2007-01-21 06:37:33Z cgruber $
+ */
+package net.israfil.foundation.concurrent.timing;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An object that represents a timeout value;
+ *
+ * @author <a href="mailto:cgruber@israfil.net">Christian Edward Gruber</a>
+ */
+public class Timeout {
+
+ private final long startTime;
+ private final long timeout;
+
+ public Timeout(long timeout) {
+ this.timeout = timeout;
+ startTime = System.currentTimeMillis();
+ }
+ public void assertWithinTimeout() throws TimeoutException {
+ long now = System.currentTimeMillis();
+ if (now - startTime > timeout) {
+ throw new TimeoutException("Timeout of " + timeout + "ms exceeded: " + (now - startTime) + "ms.");
+ }
+ }
+ public long getStartTime() {
+ return startTime;
+ }
+ public long getTimeout() {
+ return timeout;
+ }
+ public void sleep(long interval) throws InterruptedException {
+ Thread.sleep(interval);
+ }
+
+}