From 7c279747beb43c7e88633a6228a155a30e6834f7 Mon Sep 17 00:00:00 2001 From: Benjamin Culkin Date: Mon, 27 May 2024 11:38:33 -0400 Subject: Initial import --- .../foundation/concurrent/AbstractWorker.java | 212 +++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java (limited to 'israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java') diff --git a/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java new file mode 100644 index 0000000..14e51a0 --- /dev/null +++ b/israfil-foundation-concurrent/src/main/java/net/israfil/foundation/concurrent/AbstractWorker.java @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2003 - 2007 Israfil Consulting Services Corporation + * Copyright (c) 2003 - 2007 Christian Edward Gruber + * All Rights Reserved + * + * This software is licensed under the Berkeley Standard Distribution license, + * (BSD license), as defined below: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * 3. Neither the name of Israfil Consulting Services nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY + * OF SUCH DAMAGE. + * + * $Id: Worker.java 401 2007-01-26 02:05:20Z cgruber $ + */ +package net.israfil.foundation.concurrent; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An object that wraps a Thread and performs units of work within that + * thread. + * + * @author Christian Edward Gruber + */ +public abstract class AbstractWorker implements Runnable, Worker { + private static Logger logger = Logger.getLogger(AbstractWorker.class.getName()); + + private Thread _thread = null; + private String _name = this.getClass().getName(); + private Throwable _error = null; + + private WorkerState state = WorkerState.Stopped; + + public AbstractWorker() { } + + public AbstractWorker(String name) { + this._name = name; + } + + /** + * Runnable.run() implementation. + * + */ + public void run() { + try { + //execute lifecycle + state = WorkerState.Starting; + _thread = Thread.currentThread(); + _thread.setName(_name); + _error = null; + if (init()) { + state = WorkerState.Running; + executeLifecycle(); + } + state = WorkerState.Stopping; + deinit(); + state = WorkerState.Stopped; + } catch (InterruptedException e){ + state = WorkerState.Stopped; + logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e); + _error = e; + } catch (RuntimeInterruptedException e){ + state = WorkerState.Stopped; + logger.log(Level.FINE,this.getName() + " interrupted, setting stopped state.",e); + _error=e; + } catch (Throwable e){ + state = WorkerState.Stopped; + logger.log(Level.SEVERE,e.getClass().getName() + " thrown within run() method of " + + this.getName() + ", setting stopped state.",e); + _error = e; + } + } + + abstract void executeLifecycle() throws InterruptedException; + + // Lifecycle phases + + protected boolean init() throws InterruptedException { + validate(); + return _thread != null && _thread.isAlive(); + } + + protected void preProcess() throws InterruptedException { } + + protected abstract void process() throws InterruptedException; + + protected void postProcess() throws InterruptedException { } + + protected void deinit() throws InterruptedException { } + + protected void validate() { + if (getName() == null || getName().equals("")) + throw new IllegalStateException(getClass().getName() + " cannot start without a name."); + } + + // + // SIGNALING + // + // TODO: Validate signaling. + // TODO: Implement cleaner signaling. + // + + public boolean isRunning() { + return _thread != null && _thread.isAlive() && state == WorkerState.Running; + } + public boolean isStopping(){ + return state == WorkerState.Stopping; + } + public boolean isStarting(){ + return state == WorkerState.Starting; + } + public boolean isStopped(){ + return state == WorkerState.Stopped; + } + + // Accessors + + public Thread getThread() { return _thread; } + + public String getName() { return _name; } + + public void setName(String name) { this._name = name; } + + /** + * If the worker has stopped due to a thrown error, it should be captured + * and available via this method. This method will throw an IllegalStateException + * if called when the worker is not in a Stopped state. + * + */ + public Throwable getError() { + if (!isStopped()) throw new IllegalStateException("No error available when worker not stopped."); + else return this._error; + } + /** + * Start the worker + * @return boolean Did the Worker startup properly. + */ + public boolean start() { + try { + if (!isStopped()) + throw new IllegalStateException("Attempted to start a worker that wasn't stopped"); + Thread t = new Thread(this); + t.start(); + Thread.yield(); + return true; + } catch (Exception e) { + logger.log(Level.INFO,this.getName() + "Thread failed to start",e); + return false; + } + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after 20ms if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop() throws InterruptedException{ + stop(20); + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds) throws InterruptedException{ + if (state == WorkerState.Stopped) return; // succeed fast. + state = WorkerState.Stopping; + if (getThread() != null) getThread().join(timeout_miliseconds); + hardStop(); + } + + /** + * Attempts to gracefully shutdown the worker, but interrupts the + * thread after a given timeout if the worker does not stop on its own. + * @throws InterruptedException + */ + public void stop(long timeout_miliseconds,int timeout_nanoseconds) throws InterruptedException{ + if (state == WorkerState.Stopped) return; // succeed fast. + state = WorkerState.Stopping; + if (getThread() != null) getThread().join(timeout_miliseconds,timeout_nanoseconds); + hardStop(); + } + + protected void hardStop() { + if (isStopped() || getThread() == null) return; + getThread().interrupt(); + this._thread = null; + this.state = WorkerState.Stopped; + } + +} -- cgit v1.2.3