summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/clj/clojure/core.clj89
-rw-r--r--src/clj/clojure/core_print.clj8
-rw-r--r--src/jvm/clojure/lang/Agent.java148
-rw-r--r--test/clojure/test_clojure/agents.clj81
4 files changed, 280 insertions, 46 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 245c3643..c253fc1b 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -1329,14 +1329,29 @@
:validator validate-fn
+ :error-handler handler-fn
+
+ :error-mode mode-keyword
+
If metadata-map is supplied, it will be come the metadata on the
agent. validate-fn must be nil or a side-effect-free fn of one
argument, which will be passed the intended new state on any state
change. If the new state is unacceptable, the validate-fn should
- return false or throw an exception."
- ([state] (new clojure.lang.Agent state))
+ return false or throw an exception. handler-fn is called if an
+ action throws an exception or if validate-fn rejects a new state --
+ see set-error-handler! for details. The mode-keyword may be either
+ :continue (the default if an error-handler is given) or :fail (the
+ default if no error-handler is given) -- see set-error-mode! for
+ details."
([state & options]
- (setup-reference (agent state) options)))
+ (let [a (new clojure.lang.Agent state)
+ opts (apply hash-map options)]
+ (setup-reference a options)
+ (when (:error-handler opts)
+ (.setErrorHandler a (:error-handler opts)))
+ (.setErrorMode a (or (:error-mode opts)
+ (if (:error-handler opts) :continue :fail)))
+ a)))
(defn send
"Dispatch an action to an agent. Returns the agent immediately.
@@ -1388,16 +1403,73 @@
[#^clojure.lang.IRef reference key]
(.removeWatch reference key))
+(defn agent-error
+ "Returns the exception thrown during an asynchronous action of the
+ agent if the agent is failed. Returns nil if the agent is not
+ failed."
+ [#^clojure.lang.Agent a] (.getError a))
+
+(defn restart-agent
+ "When an agent is failed, changes the agent state to new-state and
+ then un-fails the agent so that sends are allowed again. If
+ a :clear-actions true option is given, any actions queued on the
+ agent that were being held while it was failed will be discarded,
+ otherwise those held actions will proceed. The new-state must pass
+ the validator if any, or restart will throw an exception and the
+ agent will remain failed with its old state and error. Watchers, if
+ any, will NOT be notified of the new state. Throws an exception if
+ the agent is not failed."
+ [#^clojure.lang.Agent a, new-state & options]
+ (let [opts (apply hash-map options)]
+ (.restart a new-state (if (:clear-actions opts) true false))))
+
+(defn set-error-handler!
+ "Sets the error-handler of agent a to handler-fn. If an action
+ being run by the agent throws an exception or doesn't pass the
+ validator fn, handler-fn will be called with two arguments: the
+ agent and the exception."
+ [#^clojure.lang.Agent a, handler-fn]
+ (.setErrorHandler a handler-fn))
+
+(defn error-handler
+ "Returns the error-handler of agent a, or nil if there is none.
+ See set-error-handler!"
+ [#^clojure.lang.Agent a]
+ (.getErrorHandler a))
+
+(defn set-error-mode!
+ "Sets the error-mode of agent a to mode-keyword, which must be
+ either :fail or :continue. If an action being run by the agent
+ throws an exception or doesn't pass the validator fn, an
+ error-handler may be called (see set-error-handler!), after which,
+ if the mode is :continue, the agent will continue as if neither the
+ action that caused the error nor the error itself ever happened.
+
+ If the mode is :fail, the agent will become failed and will stop
+ accepting new 'send' and 'send-off' actions, and any previously
+ queued actions will be held until a 'restart-agent'. Deref will
+ still work, returning the state of the agent before the error."
+ [#^clojure.lang.Agent a, mode-keyword]
+ (.setErrorMode a mode-keyword))
+
+(defn error-mode
+ "Returns the error-mode of agent a. See set-error-mode!"
+ [#^clojure.lang.Agent a]
+ (.getErrorMode a))
(defn agent-errors
- "Returns a sequence of the exceptions thrown during asynchronous
+ "DEPRECATED: Use 'agent-error' instead.
+ Returns a sequence of the exceptions thrown during asynchronous
actions of the agent."
- [#^clojure.lang.Agent a] (. a (getErrors)))
+ [a]
+ (when-let [e (agent-error a)]
+ (list e)))
(defn clear-agent-errors
- "Clears any exceptions thrown during asynchronous actions of the
+ "DEPRECATED: Use 'restart-agent' instead.
+ Clears any exceptions thrown during asynchronous actions of the
agent, allowing subsequent actions to occur."
- [#^clojure.lang.Agent a] (. a (clearErrors)))
+ [#^clojure.lang.Agent a] (restart-agent a (.deref a)))
(defn shutdown-agents
"Initiates a shutdown of the thread pools that back the agent
@@ -2104,7 +2176,8 @@
(defn await
"Blocks the current thread (indefinitely!) until all actions
dispatched thus far, from this thread or agent, to the agent(s) have
- occurred."
+ occurred. Will block on failed agents. Will never return if
+ a failed agent is restarted with :clear-actions true."
[& agents]
(io! "await in transaction"
(when *agent*
diff --git a/src/clj/clojure/core_print.clj b/src/clj/clojure/core_print.clj
index d2b1612e..0ab1ee52 100644
--- a/src/clj/clojure/core_print.clj
+++ b/src/clj/clojure/core_print.clj
@@ -309,9 +309,13 @@
(.write w ")"))
(defmethod print-method clojure.lang.IDeref [o #^Writer w]
- (print-sequential (format "#<%s@%x: "
+ (print-sequential (format "#<%s@%x%s: "
(.getSimpleName (class o))
- (System/identityHashCode o))
+ (System/identityHashCode o)
+ (if (and (instance? clojure.lang.Agent o)
+ (agent-error o))
+ " FAILED"
+ ""))
pr-on, "", ">", (list (if (and (future? o) (not (future-done? o))) :pending @o)), w))
(def #^{:private true} print-initialized true)
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 310c826b..7d40ce77 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -17,10 +17,27 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.Map;
public class Agent extends ARef {
+
+static class ActionQueue {
+ public final IPersistentStack q;
+ public final Throwable error; // non-null indicates fail state
+ static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);
+
+ public ActionQueue( IPersistentStack q, Throwable error )
+ {
+ this.q = q;
+ this.error = error;
+ }
+}
+
+static final Keyword CONTINUE = Keyword.intern(null, "continue");
+static final Keyword FAIL = Keyword.intern(null, "fail");
+
volatile Object state;
- AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
+ AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);
- volatile ISeq errors = null;
+ volatile Keyword errorMode = CONTINUE;
+ volatile IFn errorHandler = null;
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
@@ -50,10 +67,24 @@ static class Action implements Runnable{
}
void execute(){
- if(solo)
- soloExecutor.execute(this);
- else
- pooledExecutor.execute(this);
+ try
+ {
+ if(solo)
+ soloExecutor.execute(this);
+ else
+ pooledExecutor.execute(this);
+ }
+ catch(Throwable error)
+ {
+ if(agent.errorHandler != null)
+ {
+ try
+ {
+ agent.errorHandler.invoke(agent, error);
+ }
+ catch(Throwable e) {} // ignore errorHandler errors
+ }
+ }
}
static void doRun(Action action){
@@ -62,7 +93,7 @@ static class Action implements Runnable{
Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);
- boolean hadError = false;
+ Throwable error = null;
try
{
Object oldval = action.agent.state;
@@ -72,28 +103,41 @@ static class Action implements Runnable{
}
catch(Throwable e)
{
- //todo report/callback
- action.agent.errors = RT.cons(e, action.agent.errors);
- hadError = true;
+ error = e;
}
- if(!hadError)
+ if(error == null)
{
releasePendingSends();
}
+ else
+ {
+ nested.set(PersistentVector.EMPTY);
+ if(action.agent.errorHandler != null)
+ {
+ try
+ {
+ action.agent.errorHandler.invoke(action.agent, error);
+ }
+ catch(Throwable e) {} // ignore errorHandler errors
+ }
+ if(action.agent.errorMode == CONTINUE)
+ {
+ error = null;
+ }
+ }
boolean popped = false;
- IPersistentStack next = null;
+ ActionQueue next = null;
while(!popped)
{
- IPersistentStack prior = action.agent.q.get();
- next = prior.pop();
- popped = action.agent.q.compareAndSet(prior, next);
+ ActionQueue prior = action.agent.aq.get();
+ next = new ActionQueue(prior.q.pop(), error);
+ popped = action.agent.aq.compareAndSet(prior, next);
}
- if(next.count() > 0)
- ((Action) next.peek()).execute();
-
+ if(error == null && next.q.count() > 0)
+ ((Action) next.q.peek()).execute();
}
finally
{
@@ -124,25 +168,61 @@ boolean setState(Object newState) throws Exception{
}
public Object deref() throws Exception{
- if(errors != null)
- {
- throw new Exception("Agent has errors", (Exception) RT.first(errors));
- }
return state;
}
- public ISeq getErrors(){
- return errors;
+public Throwable getError(){
+ return aq.get().error;
}
-public void clearErrors(){
- errors = null;
+public void setErrorMode(Keyword k){
+ errorMode = k;
+}
+
+public Keyword getErrorMode(){
+ return errorMode;
+}
+
+public void setErrorHandler(IFn f){
+ errorHandler = f;
+}
+
+public IFn getErrorHandler(){
+ return errorHandler;
+}
+
+synchronized public Object restart(Object newState, boolean clearActions){
+ if(getError() == null)
+ {
+ throw new RuntimeException("Agent does not need a restart");
+ }
+ validate(newState);
+ state = newState;
+
+ if(clearActions)
+ aq.set(ActionQueue.EMPTY);
+ else
+ {
+ boolean restarted = false;
+ ActionQueue prior = null;
+ while(!restarted)
+ {
+ prior = aq.get();
+ restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
+ }
+
+ if(prior.q.count() > 0)
+ ((Action) prior.q.peek()).execute();
+ }
+
+ return newState;
}
public Object dispatch(IFn fn, ISeq args, boolean solo) {
- if(errors != null)
+ Throwable error = getError();
+ if(error != null)
{
- throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
+ throw new RuntimeException("Agent is failed, needs restart", error);
}
Action action = new Action(this, fn, args, solo);
dispatchAction(action);
@@ -164,22 +244,22 @@ static void dispatchAction(Action action){
void enqueue(Action action){
boolean queued = false;
- IPersistentStack prior = null;
+ ActionQueue prior = null;
while(!queued)
{
- prior = q.get();
- queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action));
+ prior = aq.get();
+ queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
}
- if(prior.count() == 0)
+ if(prior.q.count() == 0 && prior.error == null)
action.execute();
}
public int getQueueCount(){
- return q.get().count();
+ return aq.get().q.count();
}
- static public int releasePendingSends(){
+static public int releasePendingSends(){
IPersistentVector sends = nested.get();
if(sends == null)
return 0;
diff --git a/test/clojure/test_clojure/agents.clj b/test/clojure/test_clojure/agents.clj
index 63b6c8cc..6dc35790 100644
--- a/test/clojure/test_clojure/agents.clj
+++ b/test/clojure/test_clojure/agents.clj
@@ -18,7 +18,7 @@
(send agt (fn [state] (throw (Throwable. "just testing Throwables"))))
(try
;; Let the action finish; eat the "agent has errors" error that bubbles up
- (await agt)
+ (await-for 100 agt)
(catch RuntimeException _))
(is (instance? Throwable (first (agent-errors agt))))
(is (= 1 (count (agent-errors agt))))
@@ -27,9 +27,86 @@
(clear-agent-errors agt)
(is (= nil @agt))
(send agt nil?)
- (await agt)
+ (is (true? (await-for 100 agt)))
(is (true? @agt))))
+(deftest default-modes
+ (is (= :fail (error-mode (agent nil))))
+ (is (= :continue (error-mode (agent nil :error-handler println)))))
+
+(deftest continue-handler
+ (let [err (atom nil)
+ agt (agent 0 :error-mode :continue :error-handler #(reset! err %&))]
+ (send agt /)
+ (is (true? (await-for 100 agt)))
+ (is (= 0 @agt))
+ (is (nil? (agent-error agt)))
+ (is (= agt (first @err)))
+ (is (true? (instance? ArithmeticException (second @err))))))
+
+(deftest fail-handler
+ (let [err (atom nil)
+ agt (agent 0 :error-mode :fail :error-handler #(reset! err %&))]
+ (send agt /)
+ (Thread/sleep 100)
+ (is (true? (instance? ArithmeticException (agent-error agt))))
+ (is (= 0 @agt))
+ (is (= agt (first @err)))
+ (is (true? (instance? ArithmeticException (second @err))))
+ (is (thrown? RuntimeException (send agt inc)))))
+
+(deftest restart-no-clear
+ (let [p (promise)
+ agt (agent 1 :error-mode :fail)]
+ (send agt (fn [v] @p))
+ (send agt /)
+ (send agt inc)
+ (send agt inc)
+ (deliver p 0)
+ (Thread/sleep 100)
+ (is (= 0 @agt))
+ (is (= ArithmeticException (class (agent-error agt))))
+ (restart-agent agt 10)
+ (is (true? (await-for 100 agt)))
+ (is (= 12 @agt))
+ (is (nil? (agent-error agt)))))
+
+(deftest restart-clear
+ (let [p (promise)
+ agt (agent 1 :error-mode :fail)]
+ (send agt (fn [v] @p))
+ (send agt /)
+ (send agt inc)
+ (send agt inc)
+ (deliver p 0)
+ (Thread/sleep 100)
+ (is (= 0 @agt))
+ (is (= ArithmeticException (class (agent-error agt))))
+ (restart-agent agt 10 :clear-actions true)
+ (is (true? (await-for 100 agt)))
+ (is (= 10 @agt))
+ (is (nil? (agent-error agt)))
+ (send agt inc)
+ (is (true? (await-for 100 agt)))
+ (is (= 11 @agt))
+ (is (nil? (agent-error agt)))))
+
+(deftest invalid-restart
+ (let [p (promise)
+ agt (agent 2 :error-mode :fail :validator even?)]
+ (is (thrown? RuntimeException (restart-agent agt 4)))
+ (send agt (fn [v] @p))
+ (send agt (partial + 2))
+ (send agt (partial + 2))
+ (deliver p 3)
+ (Thread/sleep 100)
+ (is (= 2 @agt))
+ (is (= IllegalStateException (class (agent-error agt))))
+ (is (thrown? RuntimeException (restart-agent agt 5)))
+ (restart-agent agt 6)
+ (is (true? (await-for 100 agt)))
+ (is (= 10 @agt))
+ (is (nil? (agent-error agt)))))
; http://clojure.org/agents