aboutsummaryrefslogtreecommitdiff
path: root/src/clojure/contrib/http/agent.clj
diff options
context:
space:
mode:
authorStuart Sierra <mail@stuartsierra.com>2009-08-14 22:44:31 -0400
committerStuart Sierra <mail@stuartsierra.com>2009-08-14 22:44:31 -0400
commited89b92ef38056520a1cd7cfb725a4d6cedf980b (patch)
treea72ded9c8dfb2122a1253caea33e5b503bc7043a /src/clojure/contrib/http/agent.clj
parent54ee3b6fc9d625bb476f38b096f80b791cfa9b65 (diff)
http/agent.clj: allow handling of stream as it is received
This changes the meaning of the :on-success and :on-failure callback functions; they are now called as soon as the response body stream is ready, and they can consume the InputStream directly.
Diffstat (limited to 'src/clojure/contrib/http/agent.clj')
-rw-r--r--src/clojure/contrib/http/agent.clj133
1 files changed, 89 insertions, 44 deletions
diff --git a/src/clojure/contrib/http/agent.clj b/src/clojure/contrib/http/agent.clj
index cbb4b725..698ea4a8 100644
--- a/src/clojure/contrib/http/agent.clj
+++ b/src/clojure/contrib/http/agent.clj
@@ -15,7 +15,8 @@
(ns #^{:doc "Agent-based asynchronous HTTP client."}
clojure.contrib.http.agent
(:require [clojure.contrib.http.connection :as c]
- [clojure.contrib.duck-streams :as duck]))
+ [clojure.contrib.duck-streams :as duck])
+ (:import (java.io ByteArrayOutputStream)))
(defn- setup-http-connection
[conn options]
@@ -28,32 +29,64 @@
;; Is the response in the 2xx range?
(= 2 (unchecked-divide (.getResponseCode conn) 100)))
-(defn- do-http-agent-request [state options]
+(defn- start-request [state options]
+ (prn "start-request")
(let [conn (::connection state)]
(setup-http-connection conn options)
(c/start-http-connection conn (:body options))
- (let [bytes (if (connection-success? conn)
- (duck/to-byte-array (.getInputStream conn))
- (duck/to-byte-array (.getErrorStream conn)))]
- (.disconnect conn)
- (assoc state
- ::response-body-bytes bytes
- ::state ::completed))))
-
-(def *http-request-defaults*
+ (assoc state ::state ::started)))
+
+(defn- open-response [state options]
+ (prn "open-response")
+ (let [conn (::connection state)]
+ (assoc state
+ ::response-stream (if (connection-success? conn)
+ (.getInputStream conn)
+ (.getErrorStream conn))
+ ::state ::receiving)))
+
+(defn- handle-response [state success-handler failure-handler options]
+ (prn "handle-response")
+ (let [conn (::connection state)]
+ (assoc state
+ ::result (if (connection-success? conn)
+ (success-handler)
+ (failure-handler))
+ ::state ::finished)))
+
+(defn- disconnect [state options]
+ (prn "disconnect")
+ (.close (::response-stream state))
+ (.disconnect (::connection state))
+ (assoc state
+ ::response-stream nil
+ ::state ::disconnected))
+
+(defn response-body-stream
+ "Returns an InputStream of the HTTP response body."
+ [http-agnt]
+ (let [a @http-agnt]
+ (if (= (::state a) ::receiving)
+ (::response-stream a)
+ (throw (Exception. "response-body-stream may only be called from a callback function passed to http-agent")))))
+
+(defn buffer-bytes
+ "The default HTTP agent result handler; it collects the response
+ body in a java.io.ByteArrayOutputStream."
+ [http-agnt]
+ (let [output (ByteArrayOutputStream.)]
+ (duck/copy (response-body-stream http-agnt) output)
+ output))
+
+(def *http-agent-defaults*
{:method "GET"
:headers {}
:body nil
:connect-timeout 0
:read-timeout 0
- :follow-redirects true})
-
-(defn- completed-watch [success-fn failure-fn key http-agnt old-state new-state]
- (when (and (= (::state new-state) ::completed)
- (not= (::state old-state) ::completed))
- (if (connection-success? (::connection new-state))
- (when success-fn (success-fn http-agnt))
- (when failure-fn (failure-fn http-agnt)))))
+ :follow-redirects true
+ :on-success buffer-bytes
+ :on-failure buffer-bytes})
(defn http-agent
"Creates (and immediately returns) an Agent representing an HTTP
@@ -93,37 +126,50 @@
:on-success f
Function to be called when the request succeeds with a 2xx response
- code. Default is nil, do nothing. The function will be called with
- the HTTP agent as its argument. Any exceptions thrown by this
- function will be added to the agent's error queue (see
- agent-errors).
+ code. The function will be called with the HTTP agent as its
+ argument, and can use the response-body-stream function to read the
+ response body. The return value of this function will be stored in
+ the state of the agent and can be retrieved with the 'result'
+ function. Any exceptions thrown by this function will be added to
+ the agent's error queue (see agent-errors). The default function
+ collects the response stream into a byte array.
:on-failure f
- Function to be called when the request fails with a 4xx or 5xx
- response code. Default is nil, do nothing. The function will be
- called with the HTTP agent as its argument. Any exceptions thrown
- by this function will become agent-errors. Any exceptions thrown by
- this function will be added to the agent's error queue (see
- agent-errors).
+ Like :on-success but this function will be called when the request
+ fails with a 4xx or 5xx response code.
"
([url & options]
- (let [opts (merge *http-request-defaults* (apply array-map options))]
+ (let [opts (merge *http-agent-defaults* (apply array-map options))]
(let [a (agent {::connection (c/http-connection url)
::state ::created
::url url
::options opts})]
- (when (or (:on-success opts) (:on-failure opts))
- (add-watch a ::completed-watch
- (partial completed-watch
- (:on-success opts) (:on-failure opts))))
- (send-off a do-http-agent-request opts)))))
+ (send-off a start-request opts)
+ (send-off a open-response opts)
+ (send-off a handle-response
+ (partial (:on-success opts) a)
+ (partial (:on-failure opts) a)
+ opts)
+ (send-off a disconnect opts)))))
+
+(defn result
+ "Returns the value returned by the :on-success or :on-failure
+ handler function of the HTTP agent; nil if the handler function has
+ not yet finished. The default handler function returns a
+ ByteArrayOutputStream."
+ [http-agnt]
+ (when (#{::disconnected ::finished} (::state @http-agnt))
+ (::result @http-agnt)))
(defn response-body-bytes
- "Returns a Java byte array of the content returned by the server."
- [a]
- (when (= (::state @a) ::completed)
- (::response-body-bytes @a)))
+ "Returns a Java byte array of the content returned by the server;
+ nil if the content is not yet available."
+ [http-agnt]
+ (when-let [buffer (result http-agnt)]
+ (if (instance? ByteArrayOutputStream buffer)
+ (.toByteArray buffer)
+ (throw (Exception. "Result was not a ByteArrayOutputStream")))))
(defn response-body-str
"Returns the HTTP response body as a string, using the given
@@ -137,11 +183,10 @@
(or (.getContentEncoding (::connection @http-agnt))
duck/*default-encoding*)))
([http-agnt encoding]
- (let [a @http-agnt]
- (when (= (::state a) ::completed)
- (let [conn (::connection a)
- bytes (::response-body-bytes a)]
- (String. bytes encoding))))))
+ (when-let [buffer (result http-agnt)]
+ (if (instance? ByteArrayOutputStream buffer)
+ (.toString buffer encoding)
+ (throw (Exception. "Result was not a ByteArrayOutputStream"))))))
(defn response-status
"Returns the Integer response status code (e.g. 200, 404) for this request."