dream_http_client/client
Type-safe HTTP client with streaming support
Gleam doesn’t have a built-in HTTPS client, so this module wraps Erlang’s battle-hardened
httpc. Use this for calling external APIs, downloading files, streaming AI responses,
or building OTP-compatible services with concurrent HTTP streams.
Quick Example - Blocking Request
import dream_http_client/client.{host, path, add_header, send}
pub fn call_api() {
let result = client.new
|> host("api.example.com")
|> path("/users/123")
|> add_header("Authorization", "Bearer " <> token)
|> send()
case result {
Ok(body) -> decode_json(body)
Error(msg) -> handle_error(msg)
}
}
Execution Modes
This module provides three ways to execute HTTP requests:
1. Blocking - client.send()
Get the complete response at once. Perfect for:
- JSON API calls
- Small files or documents
- Any case where you need the full response before processing
2. Yielder Streaming - client.stream_yielder()
Get a yielder.Yielder that produces chunks sequentially. Perfect for:
- AI/LLM inference endpoints (streaming tokens)
- Simple file downloads
- Scripts or one-off operations
Note: This is a pull-based synchronous API. It blocks the calling process while waiting for chunks, making it unsuitable for OTP actors that need to handle multiple concurrent operations.
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print, println_error}
client.new
|> host("api.openai.com")
|> path("/v1/chat/completions")
|> stream_yielder()
|> each(fn(result) {
case result {
Ok(chunk) -> print(to_string(chunk))
Error(reason) -> println_error("Stream error: " <> reason)
}
})
3. Message-Based Streaming - client.stream_messages()
Get messages sent to your process mailbox. Perfect for:
- OTP actors handling multiple concurrent streams
- Long-lived connections that need cancellation
- Integration with OTP supervisors and selectors
This is a push-based asynchronous API fully compatible with OTP patterns.
import dream_http_client/client.{
type StreamMessage, Chunk, StreamEnd, StreamError, StreamStart,
select_stream_messages
}
import gleam/otp/actor.{continue}
import gleam/erlang/process.{new_selector}
pub type Message {
HttpStream(StreamMessage)
}
fn init_selector() {
new_selector()
|> select_stream_messages(HttpStream)
}
fn handle_message(msg: Message, state: State) {
case msg {
HttpStream(Chunk(req_id, data)) -> process_chunk(data, state)
HttpStream(StreamEnd(req_id, _)) -> cleanup(req_id, state)
HttpStream(StreamError(req_id, reason)) -> handle_error(req_id, reason, state)
HttpStream(StreamStart(_, _)) -> continue(state)
HttpStream(DecodeError(reason)) -> {
// FFI corruption - report as bug
log_critical_error("DecodeError: " <> reason)
continue(state)
}
}
}
Configuration
All execution modes support the same builder pattern for configuration:
- Timeouts: Use
timeout()to set request timeout (default: 30 seconds) - Headers: Use
add_header()for incremental orheaders()for batch - Method/Path/Query: Standard HTTP request components
Example with timeout:
import dream_http_client/client.{host, timeout, send}
client.new
|> host("slow-api.example.com")
|> timeout(60_000) // 60 second timeout
|> send()
Inspecting Requests
The ClientRequest type is opaque to ensure API stability. Use getter functions
to inspect request properties for logging, testing, or middleware:
import dream_http_client/client
import gleam/io
let req = client.new
|> client.host("api.example.com")
|> client.path("/users/123")
// Inspect the request before sending
io.println("Calling: " <> client.get_host(req) <> client.get_path(req))
// Prints: "Calling: api.example.com/users/123"
let result = client.send(req)
Available getters: get_method, get_scheme, get_host, get_port, get_path,
get_query, get_headers, get_body, get_timeout, get_recorder
Types
HTTP client request configuration
Represents a complete HTTP request with all its components. Use the builder
pattern with functions like host(), path(), method(), etc. to configure
the request, then execute it with send/1, stream_yielder/1, or
stream_messages/1 depending on whether you want a blocking, yielder-based,
or message-based streaming API.
Fields
method: The HTTP method (GET, POST, etc.)scheme: The protocol (HTTP or HTTPS)host: The server hostnameport: Optional port number (defaults to 80 for HTTP, 443 for HTTPS)path: The request path (e.g., “/api/users”)query: Optional query string (e.g., “?page=1&limit=10”)headers: List of header name-value pairsbody: The request body as a stringtimeout: Optional timeout in milliseconds (defaults to 30000ms)recorder: Optional recorder for request/response recording and playback
The type is opaque to ensure API stability. Use new with builder functions
to construct requests, and the getter functions to inspect request properties.
pub opaque type ClientRequest
Opaque request identifier for message-based streaming
Returned by stream_messages() and used to identify which stream a message
belongs to when handling multiple concurrent streams.
pub opaque type RequestId
Stream message types sent to your process mailbox
When using stream_messages(), httpc sends these messages directly to your
process. Use select_stream_messages() to integrate with OTP selectors.
Message Flow
StreamStart- Headers received, body chunks comingChunk- Zero or more data chunksStreamEndorStreamError- Stream completed normallyDecodeError- FFI layer corruption (rare, should be reported as a bug)
DecodeError
DecodeError indicates the Erlang→Gleam FFI boundary received a malformed
message from httpc. This is not a normal HTTP error - it means either:
- Erlang/OTP version incompatibility with this library
- Memory corruption or other serious runtime issue
- A bug in this library’s FFI code
What to do: If you see a DecodeError, please report it as a bug at
https://github.com/maxdeviant/dream/issues with the full error message.
The error message includes debug information to help diagnose the issue.
Unlike StreamError which has a RequestId, DecodeError does not because
the request ID itself could not be decoded from the corrupted message.
pub type StreamMessage {
StreamStart(
request_id: RequestId,
headers: List(#(String, String)),
)
Chunk(request_id: RequestId, data: BitArray)
StreamEnd(
request_id: RequestId,
headers: List(#(String, String)),
)
StreamError(request_id: RequestId, reason: String)
DecodeError(reason: String)
}
Constructors
-
StreamStart( request_id: RequestId, headers: List(#(String, String)), )Stream started, headers received
-
Chunk(request_id: RequestId, data: BitArray)Data chunk received
-
StreamEnd( request_id: RequestId, headers: List(#(String, String)), )Stream completed successfully
-
StreamError(request_id: RequestId, reason: String)Stream failed with error (connection drop, timeout, HTTP error, etc.)
-
DecodeError(reason: String)Failed to decode stream message from Erlang FFI (indicates library bug)
Values
pub fn add_header(
client_request: ClientRequest,
name: String,
value: String,
) -> ClientRequest
Add a header to the request
Adds a single header to the existing headers list without replacing them. The new header is prepended to the list, so it will take precedence if there’s a duplicate header name.
Parameters
client_request: The request to modifyname: The header name (e.g., “Authorization”, “Content-Type”)value: The header value
Returns
A new ClientRequest with the header added.
Example
import dream_http_client/client
client.new
|> client.add_header("Authorization", "Bearer " <> token)
|> client.add_header("Content-Type", "application/json")
pub fn body(
client_request: ClientRequest,
body_value: String,
) -> ClientRequest
Set the body for the request
Sets the request body as a string. Typically used for POST, PUT, and PATCH requests. For JSON, serialize your data first.
Parameters
client_request: The request to modifybody_value: The request body as a string
Returns
A new ClientRequest with the body updated.
Example
import dream_http_client/client
import gleam/json
let json_body = json.object([
#("name", json.string("Alice")),
#("email", json.string("alice@example.com")),
])
client.new
|> client.method(http.Post)
|> client.body(json.to_string(json_body))
pub fn cancel_stream(request_id: RequestId) -> Nil
Cancel an active streaming request
Cancels an HTTP stream that was started with stream_messages().
After cancellation, no more messages will be sent to your process.
Parameters
request_id: The request ID returned fromstream_messages()
Example
import dream_http_client/client.{host, stream_messages, cancel_stream}
let assert Ok(req_id) = client.new
|> host("api.example.com")
|> stream_messages()
// Later, cancel the stream
cancel_stream(req_id)
pub fn get_body(client_request: ClientRequest) -> String
Get the body from a request
Returns the request body as a string.
Example
import dream_http_client/client
let req = client.new |> client.body("{\"name\": \"Alice\"}")
let body = client.get_body(req)
// body == "{\"name\": \"Alice\"}"
pub fn get_headers(
client_request: ClientRequest,
) -> List(#(String, String))
Get the headers from a request
Returns the list of headers configured for the request.
Example
import dream_http_client/client
let req = client.new
|> client.add_header("Authorization", "Bearer token")
|> client.add_header("Content-Type", "application/json")
let headers = client.get_headers(req)
// headers == [#("Content-Type", "application/json"), #("Authorization", "Bearer token")]
pub fn get_host(client_request: ClientRequest) -> String
Get the host from a request
Returns the hostname configured for the request.
Example
import dream_http_client/client
let req = client.new |> client.host("api.example.com")
let host = client.get_host(req)
// host == "api.example.com"
pub fn get_method(client_request: ClientRequest) -> http.Method
Get the HTTP method from a request
Returns the HTTP method (GET, POST, etc.) configured for the request.
Example
import dream_http_client/client
import gleam/http.{Post}
let req = client.new |> client.method(Post)
let method = client.get_method(req)
// method == Post
pub fn get_path(client_request: ClientRequest) -> String
Get the path from a request
Returns the request path configured for the request.
Example
import dream_http_client/client
let req = client.new |> client.path("/api/users")
let path = client.get_path(req)
// path == "/api/users"
pub fn get_port(
client_request: ClientRequest,
) -> option.Option(Int)
Get the port from a request
Returns the optional port number configured for the request. If None, the default port for the scheme will be used (80 for HTTP, 443 for HTTPS).
Example
import dream_http_client/client
let req = client.new |> client.port(8080)
let port = client.get_port(req)
// port == Some(8080)
pub fn get_query(
client_request: ClientRequest,
) -> option.Option(String)
Get the query string from a request
Returns the optional query string configured for the request.
Example
import dream_http_client/client
let req = client.new |> client.query("page=1&limit=10")
let query = client.get_query(req)
// query == Some("page=1&limit=10")
pub fn get_recorder(
client_request: ClientRequest,
) -> option.Option(recorder.Recorder)
Get the recorder from a request
Returns the optional recorder attached to the request for recording or playback.
Example
import dream_http_client/client
import dream_http_client/recorder
import dream_http_client/matching
let assert Ok(rec) = recorder.start(
mode: recorder.Record(directory: "mocks"),
matching: matching.match_url_only(),
)
let req = client.new |> client.recorder(rec)
let recorder_opt = client.get_recorder(req)
// recorder_opt == Some(rec)
pub fn get_scheme(client_request: ClientRequest) -> http.Scheme
Get the URI scheme from a request
Returns the scheme (HTTP or HTTPS) configured for the request.
Example
import dream_http_client/client
import gleam/http.{Http}
let req = client.new |> client.scheme(Http)
let scheme = client.get_scheme(req)
// scheme == Http
pub fn get_timeout(
client_request: ClientRequest,
) -> option.Option(Int)
Get the timeout from a request
Returns the optional timeout in milliseconds configured for the request. If None, the default timeout (30000ms) will be used.
Example
import dream_http_client/client
let req = client.new |> client.timeout(5000)
let timeout = client.get_timeout(req)
// timeout == Some(5000)
pub fn headers(
client_request: ClientRequest,
headers_value: List(#(String, String)),
) -> ClientRequest
Set the headers for the request
Replaces all existing headers with the provided list. Use add_header()
to add a single header without replacing existing ones.
Parameters
client_request: The request to modifyheaders_value: List of header tuples#(name, value)
Returns
A new ClientRequest with headers replaced.
Example
import dream_http_client/client
client.new
|> client.headers([
#("Authorization", "Bearer " <> token),
#("Content-Type", "application/json"),
])
pub fn host(
client_request: ClientRequest,
host_value: String,
) -> ClientRequest
Set the host for the request
Sets the server hostname or IP address. This is required for all requests.
Parameters
client_request: The request to modifyhost_value: The hostname (e.g., “api.example.com” or “192.168.1.1”)
Returns
A new ClientRequest with the host updated.
Example
import dream_http_client/client
client.new
|> client.host("api.example.com")
pub fn method(
client_request: ClientRequest,
method_value: http.Method,
) -> ClientRequest
Set the HTTP method for the request
Configures the HTTP method (GET, POST, PUT, DELETE, etc.) for the request.
Parameters
client_request: The request to modifymethod_value: The HTTP method to use
Returns
A new ClientRequest with the method updated.
Example
import dream_http_client/client
import gleam/http
client.new
|> client.method(http.Post)
pub const new: ClientRequest
Default client request configuration
Creates a new ClientRequest with sensible defaults:
- Method: GET
- Scheme: HTTPS
- Host: “localhost”
- Port: None (uses default for scheme)
- Path: “” (empty)
- Query: None
- Headers: [] (empty)
- Body: “” (empty)
- Timeout: None (uses default 30000ms)
Use this as the starting point for building requests with the builder pattern.
Example
import dream_http_client/client.{host, path, method}
import gleam/http.{Get}
client.new
|> host("api.example.com")
|> path("/users/123")
|> method(Get)
pub fn path(
client_request: ClientRequest,
path_value: String,
) -> ClientRequest
Set the path for the request
Sets the request path. Should start with “/” for absolute paths.
Parameters
client_request: The request to modifypath_value: The path (e.g., “/api/users” or “/api/users/123”)
Returns
A new ClientRequest with the path updated.
Example
import dream_http_client/client
client.new
|> client.path("/api/users/123")
pub fn port(
client_request: ClientRequest,
port_value: Int,
) -> ClientRequest
Set the port for the request
Sets a custom port number. If not set, defaults to 80 for HTTP and 443 for HTTPS. Only set this if you’re using a non-standard port.
Parameters
client_request: The request to modifyport_value: The port number (e.g., 8080, 3000)
Returns
A new ClientRequest with the port updated.
Example
import dream_http_client/client
client.new
|> client.host("localhost")
|> client.port(3000) // Use port 3000 instead of default
pub fn query(
client_request: ClientRequest,
query_value: String,
) -> ClientRequest
Set the query string for the request
Sets the query string portion of the URL. Do not include the leading “?”.
Parameters
client_request: The request to modifyquery_value: The query string (e.g., “page=1&limit=10”)
Returns
A new ClientRequest with the query string updated.
Example
import dream_http_client/client
client.new
|> client.path("/api/users")
|> client.query("page=1&limit=10")
pub fn recorder(
client_request: ClientRequest,
recorder_value: recorder.Recorder,
) -> ClientRequest
Set the recorder for the request
Attaches a recorder to the request for recording or playback.
The recorder must be started with recorder.start() before use.
Parameters
client_request: The request to modifyrecorder_value: The recorder to attach
Returns
A new ClientRequest with the recorder attached.
Example
import dream_http_client/client
import dream_http_client/recorder
let assert Ok(rec) = recorder.start(
mode: recorder.Record(directory: "mocks"),
matching: recorder.match_url_only(),
)
client.new
|> client.host("api.example.com")
|> client.recorder(rec)
pub fn scheme(
client_request: ClientRequest,
scheme_value: http.Scheme,
) -> ClientRequest
Set the scheme (protocol) for the request
Configures whether to use HTTP or HTTPS. Defaults to HTTPS for security.
Parameters
client_request: The request to modifyscheme_value: The protocol scheme (http.Httporhttp.Https)
Returns
A new ClientRequest with the scheme updated.
Example
import dream_http_client/client
import gleam/http
client.new
|> client.scheme(http.Http) // Use HTTP instead of HTTPS
pub fn select_stream_messages(
selector: process.Selector(msg),
mapper: fn(StreamMessage) -> msg,
) -> process.Selector(msg)
Add stream message handling to an OTP selector
Integrates HTTP stream messages into your OTP actor’s selector. This allows you to handle HTTP streams alongside other messages in your actor.
The mapper function converts StreamMessage to your actor’s message type.
Parameters
selector: Your existing selectormapper: Function to wrapStreamMessagein your message type
Returns
Updated selector that handles stream messages
Example
import dream_http_client/client.{type StreamMessage, select_stream_messages}
import gleam/erlang/process.{type Selector, new_selector, selecting}
pub type Message {
HttpStream(StreamMessage)
OtherMessage(String)
}
fn build_selector() -> Selector(Message) {
new_selector()
|> select_stream_messages(HttpStream)
|> selecting(some_subject, OtherMessage)
}
pub fn send(
client_request: ClientRequest,
) -> Result(String, String)
Make a blocking HTTP request and get the complete response
Sends an HTTP request and collects all response chunks, returning the complete response body as a string. This is ideal for:
- JSON API responses
- Small files or documents
- Any case where you need the full response before processing
For large responses or when you need OTP compatibility, use
stream_yielder() or stream_messages() instead.
Parameters
client_request: The configured HTTP request
Returns
Ok(String): The complete response body as a stringError(String): An error message if the request failed
Example
import dream_http_client/client.{host, path, add_header, send}
import gleam/json.{decode}
let result = client.new
|> host("api.example.com")
|> path("/users/123")
|> add_header("Authorization", "Bearer " <> token)
|> send()
case result {
Ok(body) -> {
case decode(body, user_decoder) {
Ok(user) -> Ok(user)
Error(json_error) ->
Error("Invalid JSON response: " <> string.inspect(json_error))
}
}
Error(error_message) -> Error("Request failed: " <> error_message)
}
pub fn stream_messages(
client_request: ClientRequest,
) -> Result(RequestId, String)
Start a message-based streaming HTTP request (OTP compatible)
Sends an HTTP request and returns a request ID immediately. httpc sends stream messages directly to your process mailbox. Use this for:
- OTP actors handling multiple concurrent streams
- Long-lived connections that need cancellation
- Integration with OTP supervisors and selectors
For simple sequential streaming, use stream_yielder() instead.
Message Flow
Messages are sent to your process mailbox automatically:
StreamStart(request_id, headers)- Headers receivedChunk(request_id, data)- Zero or more data chunksStreamEnd(request_id, headers)orStreamError(request_id, reason)- Done
Parameters
req: The configured HTTP request
Returns
Ok(RequestId): Stream started, messages will arrive in your mailboxError(String): Failed to start the stream
Example
import dream_http_client/client.{
type StreamMessage, Chunk, StreamEnd, StreamError, StreamStart,
select_stream_messages
}
import gleam/otp/actor.{continue}
import gleam/erlang/process.{new_selector}
pub type Message {
HttpStream(StreamMessage)
}
fn handle_message(msg: Message, state: State) {
case msg {
HttpStream(stream_msg) -> {
case stream_msg {
Chunk(req_id, data) -> process_chunk(data, state)
StreamEnd(req_id, _) -> cleanup(req_id, state)
StreamError(req_id, reason) -> handle_error(req_id, reason, state)
StreamStart(_, _) -> continue(state)
}
}
}
}
fn init_selector() {
new_selector()
|> select_stream_messages(HttpStream)
}
pub fn stream_yielder(
client_request: ClientRequest,
) -> yielder.Yielder(Result(bytes_tree.BytesTree, String))
Stream HTTP response chunks using a yielder
Sends an HTTP request and returns a yielder that produces chunks of the response body as they arrive from the server. This allows you to process large responses incrementally without loading the entire response into memory.
Use this for simple sequential streaming:
- AI/LLM inference endpoints (stream tokens)
- Simple file downloads
- Scripts or one-off operations
For OTP actors with concurrency, use stream_messages() instead.
Error Semantics
The yielder produces Result(BytesTree, String) for each chunk:
Ok(chunk)- Successful chunk, more may followError(reason)- Terminal error, stream is done
After an Error, the yielder immediately returns Done on the next call.
This design reflects that HTTP stream errors (timeouts, connection drops,
etc.) are not recoverable - you cannot continue reading from a broken stream.
Normal stream completion: When the stream finishes successfully, the yielder
returns Done (no more items). The stream does NOT yield an error for normal completion.
Possible error reasons (actual errors only):
"timeout"- Request timed out- Connection errors from
httpc
Parameters
client_request: The configured HTTP request
Returns
A Yielder that produces Result(BytesTree, String). Always check each
result - errors are terminal and mean the stream has ended.
Examples
Streaming and processing chunks as they arrive:
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print, println_error}
client.new
|> host("api.openai.com")
|> path("/v1/chat/completions")
|> stream_yielder()
|> each(fn(result) {
case result {
Ok(chunk) -> print(to_string(chunk))
Error(error_reason) -> {
println_error("Stream error: " <> error_reason)
// Stream is now done, no more chunks will arrive
}
}
})
Collecting all chunks into a list:
import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder
import gleam/list
import gleam/bytes_tree
import gleam/string
// The stream automatically completes when done - no need to use take()!
let chunks =
client.new
|> host("example.com")
|> path("/data")
|> stream_yielder()
|> yielder.to_list()
// Handle results
case list.try_map(chunks, fn(result) { result }) {
Ok(chunk_list) -> {
// Concatenate all chunks
let body =
chunk_list
|> list.map(bytes_tree.to_string)
|> list.map(fn(chunk_result) { result.unwrap(chunk_result, "") })
|> string.join("")
Ok(body)
}
Error(error_reason) -> Error("Stream failed: " <> error_reason)
}
pub fn timeout(
client_request: ClientRequest,
timeout_ms: Int,
) -> ClientRequest
Set the timeout for the request in milliseconds
Sets how long to wait for a response before timing out. If not set, defaults to 30000ms (30 seconds).
Parameters
timeout_ms: Timeout duration in milliseconds
Example
import dream_http_client/client.{host, timeout}
client.new
|> host("slow-api.example.com")
|> timeout(60_000) // 60 second timeout