diff options
author | Stuart Sierra <mail@stuartsierra.com> | 2009-08-14 22:44:31 -0400 |
---|---|---|
committer | Stuart Sierra <mail@stuartsierra.com> | 2009-08-14 22:44:31 -0400 |
commit | ed89b92ef38056520a1cd7cfb725a4d6cedf980b (patch) | |
tree | a72ded9c8dfb2122a1253caea33e5b503bc7043a /src/clojure/contrib/http/agent.clj | |
parent | 54ee3b6fc9d625bb476f38b096f80b791cfa9b65 (diff) |
http/agent.clj: allow handling of stream as it is received
This changes the meaning of the :on-success and :on-failure callback
functions; they are now called as soon as the response body stream
is ready, and they can consume the InputStream directly.
Diffstat (limited to 'src/clojure/contrib/http/agent.clj')
-rw-r--r-- | src/clojure/contrib/http/agent.clj | 133 |
1 files changed, 89 insertions, 44 deletions
diff --git a/src/clojure/contrib/http/agent.clj b/src/clojure/contrib/http/agent.clj index cbb4b725..698ea4a8 100644 --- a/src/clojure/contrib/http/agent.clj +++ b/src/clojure/contrib/http/agent.clj @@ -15,7 +15,8 @@ (ns #^{:doc "Agent-based asynchronous HTTP client."} clojure.contrib.http.agent (:require [clojure.contrib.http.connection :as c] - [clojure.contrib.duck-streams :as duck])) + [clojure.contrib.duck-streams :as duck]) + (:import (java.io ByteArrayOutputStream))) (defn- setup-http-connection [conn options] @@ -28,32 +29,64 @@ ;; Is the response in the 2xx range? (= 2 (unchecked-divide (.getResponseCode conn) 100))) -(defn- do-http-agent-request [state options] +(defn- start-request [state options] + (prn "start-request") (let [conn (::connection state)] (setup-http-connection conn options) (c/start-http-connection conn (:body options)) - (let [bytes (if (connection-success? conn) - (duck/to-byte-array (.getInputStream conn)) - (duck/to-byte-array (.getErrorStream conn)))] - (.disconnect conn) - (assoc state - ::response-body-bytes bytes - ::state ::completed)))) - -(def *http-request-defaults* + (assoc state ::state ::started))) + +(defn- open-response [state options] + (prn "open-response") + (let [conn (::connection state)] + (assoc state + ::response-stream (if (connection-success? conn) + (.getInputStream conn) + (.getErrorStream conn)) + ::state ::receiving))) + +(defn- handle-response [state success-handler failure-handler options] + (prn "handle-response") + (let [conn (::connection state)] + (assoc state + ::result (if (connection-success? conn) + (success-handler) + (failure-handler)) + ::state ::finished))) + +(defn- disconnect [state options] + (prn "disconnect") + (.close (::response-stream state)) + (.disconnect (::connection state)) + (assoc state + ::response-stream nil + ::state ::disconnected)) + +(defn response-body-stream + "Returns an InputStream of the HTTP response body." + [http-agnt] + (let [a @http-agnt] + (if (= (::state a) ::receiving) + (::response-stream a) + (throw (Exception. "response-body-stream may only be called from a callback function passed to http-agent"))))) + +(defn buffer-bytes + "The default HTTP agent result handler; it collects the response + body in a java.io.ByteArrayOutputStream." + [http-agnt] + (let [output (ByteArrayOutputStream.)] + (duck/copy (response-body-stream http-agnt) output) + output)) + +(def *http-agent-defaults* {:method "GET" :headers {} :body nil :connect-timeout 0 :read-timeout 0 - :follow-redirects true}) - -(defn- completed-watch [success-fn failure-fn key http-agnt old-state new-state] - (when (and (= (::state new-state) ::completed) - (not= (::state old-state) ::completed)) - (if (connection-success? (::connection new-state)) - (when success-fn (success-fn http-agnt)) - (when failure-fn (failure-fn http-agnt))))) + :follow-redirects true + :on-success buffer-bytes + :on-failure buffer-bytes}) (defn http-agent "Creates (and immediately returns) an Agent representing an HTTP @@ -93,37 +126,50 @@ :on-success f Function to be called when the request succeeds with a 2xx response - code. Default is nil, do nothing. The function will be called with - the HTTP agent as its argument. Any exceptions thrown by this - function will be added to the agent's error queue (see - agent-errors). + code. The function will be called with the HTTP agent as its + argument, and can use the response-body-stream function to read the + response body. The return value of this function will be stored in + the state of the agent and can be retrieved with the 'result' + function. Any exceptions thrown by this function will be added to + the agent's error queue (see agent-errors). The default function + collects the response stream into a byte array. :on-failure f - Function to be called when the request fails with a 4xx or 5xx - response code. Default is nil, do nothing. The function will be - called with the HTTP agent as its argument. Any exceptions thrown - by this function will become agent-errors. Any exceptions thrown by - this function will be added to the agent's error queue (see - agent-errors). + Like :on-success but this function will be called when the request + fails with a 4xx or 5xx response code. " ([url & options] - (let [opts (merge *http-request-defaults* (apply array-map options))] + (let [opts (merge *http-agent-defaults* (apply array-map options))] (let [a (agent {::connection (c/http-connection url) ::state ::created ::url url ::options opts})] - (when (or (:on-success opts) (:on-failure opts)) - (add-watch a ::completed-watch - (partial completed-watch - (:on-success opts) (:on-failure opts)))) - (send-off a do-http-agent-request opts))))) + (send-off a start-request opts) + (send-off a open-response opts) + (send-off a handle-response + (partial (:on-success opts) a) + (partial (:on-failure opts) a) + opts) + (send-off a disconnect opts))))) + +(defn result + "Returns the value returned by the :on-success or :on-failure + handler function of the HTTP agent; nil if the handler function has + not yet finished. The default handler function returns a + ByteArrayOutputStream." + [http-agnt] + (when (#{::disconnected ::finished} (::state @http-agnt)) + (::result @http-agnt))) (defn response-body-bytes - "Returns a Java byte array of the content returned by the server." - [a] - (when (= (::state @a) ::completed) - (::response-body-bytes @a))) + "Returns a Java byte array of the content returned by the server; + nil if the content is not yet available." + [http-agnt] + (when-let [buffer (result http-agnt)] + (if (instance? ByteArrayOutputStream buffer) + (.toByteArray buffer) + (throw (Exception. "Result was not a ByteArrayOutputStream"))))) (defn response-body-str "Returns the HTTP response body as a string, using the given @@ -137,11 +183,10 @@ (or (.getContentEncoding (::connection @http-agnt)) duck/*default-encoding*))) ([http-agnt encoding] - (let [a @http-agnt] - (when (= (::state a) ::completed) - (let [conn (::connection a) - bytes (::response-body-bytes a)] - (String. bytes encoding)))))) + (when-let [buffer (result http-agnt)] + (if (instance? ByteArrayOutputStream buffer) + (.toString buffer encoding) + (throw (Exception. "Result was not a ByteArrayOutputStream")))))) (defn response-status "Returns the Integer response status code (e.g. 200, 404) for this request." |