summaryrefslogtreecommitdiff
path: root/src/jvm/clojure/lang/Agent.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/jvm/clojure/lang/Agent.java')
-rw-r--r--src/jvm/clojure/lang/Agent.java148
1 files changed, 114 insertions, 34 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 310c826b..7d40ce77 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -17,10 +17,27 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.Map;
public class Agent extends ARef {
+
+static class ActionQueue {
+ public final IPersistentStack q;
+ public final Throwable error; // non-null indicates fail state
+ static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);
+
+ public ActionQueue( IPersistentStack q, Throwable error )
+ {
+ this.q = q;
+ this.error = error;
+ }
+}
+
+static final Keyword CONTINUE = Keyword.intern(null, "continue");
+static final Keyword FAIL = Keyword.intern(null, "fail");
+
volatile Object state;
- AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
+ AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);
- volatile ISeq errors = null;
+ volatile Keyword errorMode = CONTINUE;
+ volatile IFn errorHandler = null;
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
@@ -50,10 +67,24 @@ static class Action implements Runnable{
}
void execute(){
- if(solo)
- soloExecutor.execute(this);
- else
- pooledExecutor.execute(this);
+ try
+ {
+ if(solo)
+ soloExecutor.execute(this);
+ else
+ pooledExecutor.execute(this);
+ }
+ catch(Throwable error)
+ {
+ if(agent.errorHandler != null)
+ {
+ try
+ {
+ agent.errorHandler.invoke(agent, error);
+ }
+ catch(Throwable e) {} // ignore errorHandler errors
+ }
+ }
}
static void doRun(Action action){
@@ -62,7 +93,7 @@ static class Action implements Runnable{
Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);
- boolean hadError = false;
+ Throwable error = null;
try
{
Object oldval = action.agent.state;
@@ -72,28 +103,41 @@ static class Action implements Runnable{
}
catch(Throwable e)
{
- //todo report/callback
- action.agent.errors = RT.cons(e, action.agent.errors);
- hadError = true;
+ error = e;
}
- if(!hadError)
+ if(error == null)
{
releasePendingSends();
}
+ else
+ {
+ nested.set(PersistentVector.EMPTY);
+ if(action.agent.errorHandler != null)
+ {
+ try
+ {
+ action.agent.errorHandler.invoke(action.agent, error);
+ }
+ catch(Throwable e) {} // ignore errorHandler errors
+ }
+ if(action.agent.errorMode == CONTINUE)
+ {
+ error = null;
+ }
+ }
boolean popped = false;
- IPersistentStack next = null;
+ ActionQueue next = null;
while(!popped)
{
- IPersistentStack prior = action.agent.q.get();
- next = prior.pop();
- popped = action.agent.q.compareAndSet(prior, next);
+ ActionQueue prior = action.agent.aq.get();
+ next = new ActionQueue(prior.q.pop(), error);
+ popped = action.agent.aq.compareAndSet(prior, next);
}
- if(next.count() > 0)
- ((Action) next.peek()).execute();
-
+ if(error == null && next.q.count() > 0)
+ ((Action) next.q.peek()).execute();
}
finally
{
@@ -124,25 +168,61 @@ boolean setState(Object newState) throws Exception{
}
public Object deref() throws Exception{
- if(errors != null)
- {
- throw new Exception("Agent has errors", (Exception) RT.first(errors));
- }
return state;
}
- public ISeq getErrors(){
- return errors;
+public Throwable getError(){
+ return aq.get().error;
}
-public void clearErrors(){
- errors = null;
+public void setErrorMode(Keyword k){
+ errorMode = k;
+}
+
+public Keyword getErrorMode(){
+ return errorMode;
+}
+
+public void setErrorHandler(IFn f){
+ errorHandler = f;
+}
+
+public IFn getErrorHandler(){
+ return errorHandler;
+}
+
+synchronized public Object restart(Object newState, boolean clearActions){
+ if(getError() == null)
+ {
+ throw new RuntimeException("Agent does not need a restart");
+ }
+ validate(newState);
+ state = newState;
+
+ if(clearActions)
+ aq.set(ActionQueue.EMPTY);
+ else
+ {
+ boolean restarted = false;
+ ActionQueue prior = null;
+ while(!restarted)
+ {
+ prior = aq.get();
+ restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
+ }
+
+ if(prior.q.count() > 0)
+ ((Action) prior.q.peek()).execute();
+ }
+
+ return newState;
}
public Object dispatch(IFn fn, ISeq args, boolean solo) {
- if(errors != null)
+ Throwable error = getError();
+ if(error != null)
{
- throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
+ throw new RuntimeException("Agent is failed, needs restart", error);
}
Action action = new Action(this, fn, args, solo);
dispatchAction(action);
@@ -164,22 +244,22 @@ static void dispatchAction(Action action){
void enqueue(Action action){
boolean queued = false;
- IPersistentStack prior = null;
+ ActionQueue prior = null;
while(!queued)
{
- prior = q.get();
- queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action));
+ prior = aq.get();
+ queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
}
- if(prior.count() == 0)
+ if(prior.q.count() == 0 && prior.error == null)
action.execute();
}
public int getQueueCount(){
- return q.get().count();
+ return aq.get().q.count();
}
- static public int releasePendingSends(){
+static public int releasePendingSends(){
IPersistentVector sends = nested.get();
if(sends == null)
return 0;