diff options
Diffstat (limited to 'src/jvm/clojure/lang/Agent.java')
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 148 |
1 files changed, 114 insertions, 34 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 310c826b..7d40ce77 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -17,10 +17,27 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.Map; public class Agent extends ARef { + +static class ActionQueue { + public final IPersistentStack q; + public final Throwable error; // non-null indicates fail state + static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null); + + public ActionQueue( IPersistentStack q, Throwable error ) + { + this.q = q; + this.error = error; + } +} + +static final Keyword CONTINUE = Keyword.intern(null, "continue"); +static final Keyword FAIL = Keyword.intern(null, "fail"); + volatile Object state; - AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); + AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY); - volatile ISeq errors = null; + volatile Keyword errorMode = CONTINUE; + volatile IFn errorHandler = null; final public static ExecutorService pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); @@ -50,10 +67,24 @@ static class Action implements Runnable{ } void execute(){ - if(solo) - soloExecutor.execute(this); - else - pooledExecutor.execute(this); + try + { + if(solo) + soloExecutor.execute(this); + else + pooledExecutor.execute(this); + } + catch(Throwable error) + { + if(agent.errorHandler != null) + { + try + { + agent.errorHandler.invoke(agent, error); + } + catch(Throwable e) {} // ignore errorHandler errors + } + } } static void doRun(Action action){ @@ -62,7 +93,7 @@ static class Action implements Runnable{ Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); nested.set(PersistentVector.EMPTY); - boolean hadError = false; + Throwable error = null; try { Object oldval = action.agent.state; @@ -72,28 +103,41 @@ static class Action implements Runnable{ } catch(Throwable e) { - //todo report/callback - action.agent.errors = RT.cons(e, action.agent.errors); - hadError = true; + error = e; } - if(!hadError) + if(error == null) { releasePendingSends(); } + else + { + nested.set(PersistentVector.EMPTY); + if(action.agent.errorHandler != null) + { + try + { + action.agent.errorHandler.invoke(action.agent, error); + } + catch(Throwable e) {} // ignore errorHandler errors + } + if(action.agent.errorMode == CONTINUE) + { + error = null; + } + } boolean popped = false; - IPersistentStack next = null; + ActionQueue next = null; while(!popped) { - IPersistentStack prior = action.agent.q.get(); - next = prior.pop(); - popped = action.agent.q.compareAndSet(prior, next); + ActionQueue prior = action.agent.aq.get(); + next = new ActionQueue(prior.q.pop(), error); + popped = action.agent.aq.compareAndSet(prior, next); } - if(next.count() > 0) - ((Action) next.peek()).execute(); - + if(error == null && next.q.count() > 0) + ((Action) next.q.peek()).execute(); } finally { @@ -124,25 +168,61 @@ boolean setState(Object newState) throws Exception{ } public Object deref() throws Exception{ - if(errors != null) - { - throw new Exception("Agent has errors", (Exception) RT.first(errors)); - } return state; } - public ISeq getErrors(){ - return errors; +public Throwable getError(){ + return aq.get().error; } -public void clearErrors(){ - errors = null; +public void setErrorMode(Keyword k){ + errorMode = k; +} + +public Keyword getErrorMode(){ + return errorMode; +} + +public void setErrorHandler(IFn f){ + errorHandler = f; +} + +public IFn getErrorHandler(){ + return errorHandler; +} + +synchronized public Object restart(Object newState, boolean clearActions){ + if(getError() == null) + { + throw new RuntimeException("Agent does not need a restart"); + } + validate(newState); + state = newState; + + if(clearActions) + aq.set(ActionQueue.EMPTY); + else + { + boolean restarted = false; + ActionQueue prior = null; + while(!restarted) + { + prior = aq.get(); + restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null)); + } + + if(prior.q.count() > 0) + ((Action) prior.q.peek()).execute(); + } + + return newState; } public Object dispatch(IFn fn, ISeq args, boolean solo) { - if(errors != null) + Throwable error = getError(); + if(error != null) { - throw new RuntimeException("Agent has errors", (Exception) RT.first(errors)); + throw new RuntimeException("Agent is failed, needs restart", error); } Action action = new Action(this, fn, args, solo); dispatchAction(action); @@ -164,22 +244,22 @@ static void dispatchAction(Action action){ void enqueue(Action action){ boolean queued = false; - IPersistentStack prior = null; + ActionQueue prior = null; while(!queued) { - prior = q.get(); - queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action)); + prior = aq.get(); + queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error)); } - if(prior.count() == 0) + if(prior.q.count() == 0 && prior.error == null) action.execute(); } public int getQueueCount(){ - return q.get().count(); + return aq.get().q.count(); } - static public int releasePendingSends(){ +static public int releasePendingSends(){ IPersistentVector sends = nested.get(); if(sends == null) return 0; |