dream_http_client/client

Type-safe HTTP client with streaming support

Gleam doesn’t have a built-in HTTPS client, so this module wraps Erlang’s battle-hardened httpc. Use this for calling external APIs, downloading files, streaming AI responses, or building OTP-compatible services with concurrent HTTP streams.

Quick Example - Blocking Request

import dream_http_client/client.{host, path, add_header, send}

pub fn call_api() {
  let result = client.new
    |> host("api.example.com")
    |> path("/users/123")
    |> add_header("Authorization", "Bearer " <> token)
    |> send()

  case result {
    Ok(body) -> decode_json(body)
    Error(msg) -> handle_error(msg)
  }
}

Execution Modes

This module provides three ways to execute HTTP requests:

1. Blocking - client.send()

Get the complete response at once. Perfect for:

2. Yielder Streaming - client.stream_yielder()

Get a yielder.Yielder that produces chunks sequentially. Perfect for:

Note: This is a pull-based synchronous API. It blocks the calling process while waiting for chunks, making it unsuitable for OTP actors that need to handle multiple concurrent operations.

import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print, println_error}

client.new
|> host("api.openai.com")
|> path("/v1/chat/completions")
|> stream_yielder()
|> each(fn(result) {
  case result {
    Ok(chunk) -> print(to_string(chunk))
    Error(reason) -> println_error("Stream error: " <> reason)
  }
})

3. Message-Based Streaming - client.stream_messages()

Get messages sent to your process mailbox. Perfect for:

This is a push-based asynchronous API fully compatible with OTP patterns.

import dream_http_client/client.{
  type StreamMessage, Chunk, StreamEnd, StreamError, StreamStart,
  select_stream_messages
}
import gleam/otp/actor.{continue}
import gleam/erlang/process.{new_selector}

pub type Message {
  HttpStream(StreamMessage)
}

fn init_selector() {
  new_selector()
  |> select_stream_messages(HttpStream)
}

fn handle_message(msg: Message, state: State) {
  case msg {
    HttpStream(Chunk(req_id, data)) -> process_chunk(data, state)
    HttpStream(StreamEnd(req_id, _)) -> cleanup(req_id, state)
    HttpStream(StreamError(req_id, reason)) -> handle_error(req_id, reason, state)
    HttpStream(StreamStart(_, _)) -> continue(state)
    HttpStream(DecodeError(reason)) -> {
      // FFI corruption - report as bug
      log_critical_error("DecodeError: " <> reason)
      continue(state)
    }
  }
}

Configuration

All execution modes support the same builder pattern for configuration:

Example with timeout:

import dream_http_client/client.{host, timeout, send}

client.new
|> host("slow-api.example.com")
|> timeout(60_000)  // 60 second timeout
|> send()

Inspecting Requests

The ClientRequest type is opaque to ensure API stability. Use getter functions to inspect request properties for logging, testing, or middleware:

import dream_http_client/client
import gleam/io

let req = client.new
  |> client.host("api.example.com")
  |> client.path("/users/123")

// Inspect the request before sending
io.println("Calling: " <> client.get_host(req) <> client.get_path(req))
// Prints: "Calling: api.example.com/users/123"

let result = client.send(req)

Available getters: get_method, get_scheme, get_host, get_port, get_path, get_query, get_headers, get_body, get_timeout, get_recorder

Types

HTTP client request configuration

Represents a complete HTTP request with all its components. Use the builder pattern with functions like host(), path(), method(), etc. to configure the request, then send it with fetch.send() or stream.send().

Fields

  • method: The HTTP method (GET, POST, etc.)
  • scheme: The protocol (HTTP or HTTPS)
  • host: The server hostname
  • port: Optional port number (defaults to 80 for HTTP, 443 for HTTPS)
  • path: The request path (e.g., “/api/users”)
  • query: Optional query string (e.g., “?page=1&limit=10”)
  • headers: List of header name-value pairs
  • body: The request body as a string
  • timeout: Optional timeout in milliseconds (defaults to 30000ms)
  • recorder: Optional recorder for request/response recording and playback

The type is opaque to ensure API stability. Use new with builder functions to construct requests, and the getter functions to inspect request properties.

pub opaque type ClientRequest

Opaque request identifier for message-based streaming

Returned by stream_messages() and used to identify which stream a message belongs to when handling multiple concurrent streams.

pub opaque type RequestId

Stream message types sent to your process mailbox

When using stream_messages(), httpc sends these messages directly to your process. Use select_stream_messages() to integrate with OTP selectors.

Message Flow

  1. StreamStart - Headers received, body chunks coming
  2. Chunk - Zero or more data chunks
  3. StreamEnd or StreamError - Stream completed normally
  4. DecodeError - FFI layer corruption (rare, should be reported as a bug)

DecodeError

DecodeError indicates the Erlang→Gleam FFI boundary received a malformed message from httpc. This is not a normal HTTP error - it means either:

  • Erlang/OTP version incompatibility with this library
  • Memory corruption or other serious runtime issue
  • A bug in this library’s FFI code

What to do: If you see a DecodeError, please report it as a bug at https://github.com/maxdeviant/dream/issues with the full error message. The error message includes debug information to help diagnose the issue.

Unlike StreamError which has a RequestId, DecodeError does not because the request ID itself could not be decoded from the corrupted message.

pub type StreamMessage {
  StreamStart(
    request_id: RequestId,
    headers: List(#(String, String)),
  )
  Chunk(request_id: RequestId, data: BitArray)
  StreamEnd(
    request_id: RequestId,
    headers: List(#(String, String)),
  )
  StreamError(request_id: RequestId, reason: String)
  DecodeError(reason: String)
}

Constructors

  • StreamStart(
      request_id: RequestId,
      headers: List(#(String, String)),
    )

    Stream started, headers received

  • Chunk(request_id: RequestId, data: BitArray)

    Data chunk received

  • StreamEnd(
      request_id: RequestId,
      headers: List(#(String, String)),
    )

    Stream completed successfully

  • StreamError(request_id: RequestId, reason: String)

    Stream failed with error (connection drop, timeout, HTTP error, etc.)

  • DecodeError(reason: String)

    Failed to decode stream message from Erlang FFI (indicates library bug)

Values

pub fn add_header(
  client_request: ClientRequest,
  name: String,
  value: String,
) -> ClientRequest

Add a header to the request

Adds a single header to the existing headers list without replacing them. The new header is prepended to the list, so it will take precedence if there’s a duplicate header name.

Parameters

  • client_request: The request to modify
  • name: The header name (e.g., “Authorization”, “Content-Type”)
  • value: The header value

Returns

A new ClientRequest with the header added.

Example

import dream_http_client/client

client.new
|> client.add_header("Authorization", "Bearer " <> token)
|> client.add_header("Content-Type", "application/json")
pub fn body(
  client_request: ClientRequest,
  body_value: String,
) -> ClientRequest

Set the body for the request

Sets the request body as a string. Typically used for POST, PUT, and PATCH requests. For JSON, serialize your data first.

Parameters

  • client_request: The request to modify
  • body_value: The request body as a string

Returns

A new ClientRequest with the body updated.

Example

import dream_http_client/client
import gleam/json

let json_body = json.object([
  #("name", json.string("Alice")),
  #("email", json.string("alice@example.com")),
])

client.new
|> client.method(http.Post)
|> client.body(json.to_string(json_body))
pub fn cancel_stream(request_id: RequestId) -> Nil

Cancel an active streaming request

Cancels an HTTP stream that was started with stream_messages(). After cancellation, no more messages will be sent to your process.

Parameters

  • request_id: The request ID returned from stream_messages()

Example

import dream_http_client/client.{host, stream_messages, cancel_stream}

let assert Ok(req_id) = client.new
  |> host("api.example.com")
  |> stream_messages()

// Later, cancel the stream
cancel_stream(req_id)
pub fn get_body(client_request: ClientRequest) -> String

Get the body from a request

Returns the request body as a string.

Example

import dream_http_client/client

let req = client.new |> client.body("{\"name\": \"Alice\"}")
let body = client.get_body(req)
// body == "{\"name\": \"Alice\"}"
pub fn get_headers(
  client_request: ClientRequest,
) -> List(#(String, String))

Get the headers from a request

Returns the list of headers configured for the request.

Example

import dream_http_client/client

let req = client.new
  |> client.add_header("Authorization", "Bearer token")
  |> client.add_header("Content-Type", "application/json")
let headers = client.get_headers(req)
// headers == [#("Content-Type", "application/json"), #("Authorization", "Bearer token")]
pub fn get_host(client_request: ClientRequest) -> String

Get the host from a request

Returns the hostname configured for the request.

Example

import dream_http_client/client

let req = client.new |> client.host("api.example.com")
let host = client.get_host(req)
// host == "api.example.com"
pub fn get_method(client_request: ClientRequest) -> http.Method

Get the HTTP method from a request

Returns the HTTP method (GET, POST, etc.) configured for the request.

Example

import dream_http_client/client
import gleam/http.{Post}

let req = client.new |> client.method(Post)
let method = client.get_method(req)
// method == Post
pub fn get_path(client_request: ClientRequest) -> String

Get the path from a request

Returns the request path configured for the request.

Example

import dream_http_client/client

let req = client.new |> client.path("/api/users")
let path = client.get_path(req)
// path == "/api/users"
pub fn get_port(
  client_request: ClientRequest,
) -> option.Option(Int)

Get the port from a request

Returns the optional port number configured for the request. If None, the default port for the scheme will be used (80 for HTTP, 443 for HTTPS).

Example

import dream_http_client/client

let req = client.new |> client.port(8080)
let port = client.get_port(req)
// port == Some(8080)
pub fn get_query(
  client_request: ClientRequest,
) -> option.Option(String)

Get the query string from a request

Returns the optional query string configured for the request.

Example

import dream_http_client/client

let req = client.new |> client.query("page=1&limit=10")
let query = client.get_query(req)
// query == Some("page=1&limit=10")
pub fn get_recorder(
  client_request: ClientRequest,
) -> option.Option(recorder.Recorder)

Get the recorder from a request

Returns the optional recorder attached to the request for recording or playback.

Example

import dream_http_client/client
import dream_http_client/recorder
import dream_http_client/matching

let assert Ok(rec) = recorder.start(
  mode: recorder.Record(directory: "mocks"),
  matching: matching.match_url_only(),
)
let req = client.new |> client.recorder(rec)
let recorder_opt = client.get_recorder(req)
// recorder_opt == Some(rec)
pub fn get_scheme(client_request: ClientRequest) -> http.Scheme

Get the URI scheme from a request

Returns the scheme (HTTP or HTTPS) configured for the request.

Example

import dream_http_client/client
import gleam/http.{Http}

let req = client.new |> client.scheme(Http)
let scheme = client.get_scheme(req)
// scheme == Http
pub fn get_timeout(
  client_request: ClientRequest,
) -> option.Option(Int)

Get the timeout from a request

Returns the optional timeout in milliseconds configured for the request. If None, the default timeout (30000ms) will be used.

Example

import dream_http_client/client

let req = client.new |> client.timeout(5000)
let timeout = client.get_timeout(req)
// timeout == Some(5000)
pub fn headers(
  client_request: ClientRequest,
  headers_value: List(#(String, String)),
) -> ClientRequest

Set the headers for the request

Replaces all existing headers with the provided list. Use add_header() to add a single header without replacing existing ones.

Parameters

  • client_request: The request to modify
  • headers_value: List of header tuples #(name, value)

Returns

A new ClientRequest with headers replaced.

Example

import dream_http_client/client

client.new
|> client.headers([
  #("Authorization", "Bearer " <> token),
  #("Content-Type", "application/json"),
])
pub fn host(
  client_request: ClientRequest,
  host_value: String,
) -> ClientRequest

Set the host for the request

Sets the server hostname or IP address. This is required for all requests.

Parameters

  • client_request: The request to modify
  • host_value: The hostname (e.g., “api.example.com” or “192.168.1.1”)

Returns

A new ClientRequest with the host updated.

Example

import dream_http_client/client

client.new
|> client.host("api.example.com")
pub fn method(
  client_request: ClientRequest,
  method_value: http.Method,
) -> ClientRequest

Set the HTTP method for the request

Configures the HTTP method (GET, POST, PUT, DELETE, etc.) for the request.

Parameters

  • client_request: The request to modify
  • method_value: The HTTP method to use

Returns

A new ClientRequest with the method updated.

Example

import dream_http_client/client
import gleam/http

client.new
|> client.method(http.Post)
pub const new: ClientRequest

Default client request configuration

Creates a new ClientRequest with sensible defaults:

  • Method: GET
  • Scheme: HTTPS
  • Host: “localhost”
  • Port: None (uses default for scheme)
  • Path: “” (empty)
  • Query: None
  • Headers: [] (empty)
  • Body: “” (empty)
  • Timeout: None (uses default 30000ms)

Use this as the starting point for building requests with the builder pattern.

Example

import dream_http_client/client.{host, path, method}
import gleam/http.{Get}

client.new
|> host("api.example.com")
|> path("/users/123")
|> method(Get)
pub fn path(
  client_request: ClientRequest,
  path_value: String,
) -> ClientRequest

Set the path for the request

Sets the request path. Should start with “/” for absolute paths.

Parameters

  • client_request: The request to modify
  • path_value: The path (e.g., “/api/users” or “/api/users/123”)

Returns

A new ClientRequest with the path updated.

Example

import dream_http_client/client

client.new
|> client.path("/api/users/123")
pub fn port(
  client_request: ClientRequest,
  port_value: Int,
) -> ClientRequest

Set the port for the request

Sets a custom port number. If not set, defaults to 80 for HTTP and 443 for HTTPS. Only set this if you’re using a non-standard port.

Parameters

  • client_request: The request to modify
  • port_value: The port number (e.g., 8080, 3000)

Returns

A new ClientRequest with the port updated.

Example

import dream_http_client/client

client.new
|> client.host("localhost")
|> client.port(3000)  // Use port 3000 instead of default
pub fn query(
  client_request: ClientRequest,
  query_value: String,
) -> ClientRequest

Set the query string for the request

Sets the query string portion of the URL. Do not include the leading “?”.

Parameters

  • client_request: The request to modify
  • query_value: The query string (e.g., “page=1&limit=10”)

Returns

A new ClientRequest with the query string updated.

Example

import dream_http_client/client

client.new
|> client.path("/api/users")
|> client.query("page=1&limit=10")
pub fn recorder(
  client_request: ClientRequest,
  recorder_value: recorder.Recorder,
) -> ClientRequest

Set the recorder for the request

Attaches a recorder to the request for recording or playback. The recorder must be started with recorder.start() before use.

Parameters

  • client_request: The request to modify
  • recorder_value: The recorder to attach

Returns

A new ClientRequest with the recorder attached.

Example

import dream_http_client/client
import dream_http_client/recorder

let assert Ok(rec) = recorder.start(
  mode: recorder.Record(directory: "mocks"),
  matching: recorder.match_url_only(),
)

client.new
|> client.host("api.example.com")
|> client.recorder(rec)
pub fn scheme(
  client_request: ClientRequest,
  scheme_value: http.Scheme,
) -> ClientRequest

Set the scheme (protocol) for the request

Configures whether to use HTTP or HTTPS. Defaults to HTTPS for security.

Parameters

  • client_request: The request to modify
  • scheme_value: The protocol scheme (http.Http or http.Https)

Returns

A new ClientRequest with the scheme updated.

Example

import dream_http_client/client
import gleam/http

client.new
|> client.scheme(http.Http)  // Use HTTP instead of HTTPS
pub fn select_stream_messages(
  selector: process.Selector(msg),
  mapper: fn(StreamMessage) -> msg,
) -> process.Selector(msg)

Add stream message handling to an OTP selector

Integrates HTTP stream messages into your OTP actor’s selector. This allows you to handle HTTP streams alongside other messages in your actor.

The mapper function converts StreamMessage to your actor’s message type.

Parameters

  • selector: Your existing selector
  • mapper: Function to wrap StreamMessage in your message type

Returns

Updated selector that handles stream messages

Example

import dream_http_client/client.{type StreamMessage, select_stream_messages}
import gleam/erlang/process.{type Selector, new_selector, selecting}

pub type Message {
  HttpStream(StreamMessage)
  OtherMessage(String)
}

fn build_selector() -> Selector(Message) {
  new_selector()
  |> select_stream_messages(HttpStream)
  |> selecting(some_subject, OtherMessage)
}
pub fn send(
  client_request: ClientRequest,
) -> Result(String, String)

Make a blocking HTTP request and get the complete response

Sends an HTTP request and collects all response chunks, returning the complete response body as a string. This is ideal for:

  • JSON API responses
  • Small files or documents
  • Any case where you need the full response before processing

For large responses or when you need OTP compatibility, use stream_yielder() or stream_messages() instead.

Parameters

  • client_request: The configured HTTP request

Returns

  • Ok(String): The complete response body as a string
  • Error(String): An error message if the request failed

Example

import dream_http_client/client.{host, path, add_header, send}
import gleam/json.{decode}

let result = client.new
  |> host("api.example.com")
  |> path("/users/123")
  |> add_header("Authorization", "Bearer " <> token)
  |> send()

case result {
  Ok(body) -> {
    case decode(body, user_decoder) {
      Ok(user) -> Ok(user)
      Error(_) -> Error("Invalid JSON response")
    }
  }
  Error(msg) -> Error("Request failed: " <> msg)
}
pub fn stream_messages(
  req: ClientRequest,
) -> Result(RequestId, String)

Start a message-based streaming HTTP request (OTP compatible)

Sends an HTTP request and returns a request ID immediately. httpc sends stream messages directly to your process mailbox. Use this for:

  • OTP actors handling multiple concurrent streams
  • Long-lived connections that need cancellation
  • Integration with OTP supervisors and selectors

For simple sequential streaming, use stream_yielder() instead.

Message Flow

Messages are sent to your process mailbox automatically:

  1. StreamStart(request_id, headers) - Headers received
  2. Chunk(request_id, data) - Zero or more data chunks
  3. StreamEnd(request_id, headers) or StreamError(request_id, reason) - Done

Parameters

  • req: The configured HTTP request

Returns

  • Ok(RequestId): Stream started, messages will arrive in your mailbox
  • Error(String): Failed to start the stream

Example

import dream_http_client/client.{
  type StreamMessage, Chunk, StreamEnd, StreamError, StreamStart,
  select_stream_messages
}
import gleam/otp/actor.{continue}
import gleam/erlang/process.{new_selector}

pub type Message {
  HttpStream(StreamMessage)
}

fn handle_message(msg: Message, state: State) {
  case msg {
    HttpStream(stream_msg) -> {
      case stream_msg {
        Chunk(req_id, data) -> process_chunk(data, state)
        StreamEnd(req_id, _) -> cleanup(req_id, state)
        StreamError(req_id, reason) -> handle_error(req_id, reason, state)
        StreamStart(_, _) -> continue(state)
      }
    }
  }
}

fn init_selector() {
  new_selector()
  |> select_stream_messages(HttpStream)
}
pub fn stream_yielder(
  req: ClientRequest,
) -> yielder.Yielder(Result(bytes_tree.BytesTree, String))

Stream HTTP response chunks using a yielder

Sends an HTTP request and returns a yielder that produces chunks of the response body as they arrive from the server. This allows you to process large responses incrementally without loading the entire response into memory.

Use this for simple sequential streaming:

  • AI/LLM inference endpoints (stream tokens)
  • Simple file downloads
  • Scripts or one-off operations

For OTP actors with concurrency, use stream_messages() instead.

Error Semantics

The yielder produces Result(BytesTree, String) for each chunk:

  • Ok(chunk) - Successful chunk, more may follow
  • Error(reason) - Terminal error, stream is done

After an Error, the yielder immediately returns Done on the next call. This design reflects that HTTP stream errors (timeouts, connection drops, etc.) are not recoverable - you cannot continue reading from a broken stream.

Normal stream completion: When the stream finishes successfully, the yielder returns Done (no more items). The stream does NOT yield an error for normal completion.

Possible error reasons (actual errors only):

  • "timeout" - Request timed out
  • Connection errors from httpc

Parameters

  • req: The configured HTTP request

Returns

A Yielder that produces Result(BytesTree, String). Always check each result - errors are terminal and mean the stream has ended.

Examples

Streaming and processing chunks as they arrive:

import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder.{each}
import gleam/bytes_tree.{to_string}
import gleam/io.{print}

client.new
  |> host("api.openai.com")
  |> path("/v1/chat/completions")
  |> stream_yielder()
  |> each(fn(result) {
    case result {
      Ok(chunk) -> print(to_string(chunk))
      Error(reason) -> {
        io.println_error("Stream error: " <> reason)
        // Stream is now done, no more chunks will arrive
      }
    }
  })

Collecting all chunks into a list:

import dream_http_client/client.{host, path, stream_yielder}
import gleam/yielder
import gleam/list
import gleam/bytes_tree
import gleam/string

// The stream automatically completes when done - no need to use take()!
let chunks = 
  client.new
  |> host("example.com")
  |> path("/data")
  |> stream_yielder()
  |> yielder.to_list()

// Handle results
case list.try_map(chunks, fn(r) { r }) {
  Ok(chunk_list) -> {
    // Concatenate all chunks
    let body = 
      chunk_list
      |> list.map(bytes_tree.to_string)
      |> list.map(fn(r) { result.unwrap(r, "") })
      |> string.join("")
    Ok(body)
  }
  Error(reason) -> Error("Stream failed: " <> reason)
}
pub fn timeout(
  client_request: ClientRequest,
  timeout_ms: Int,
) -> ClientRequest

Set the timeout for the request in milliseconds

Sets how long to wait for a response before timing out. If not set, defaults to 30000ms (30 seconds).

Parameters

  • timeout_ms: Timeout duration in milliseconds

Example

import dream_http_client/client.{host, timeout}

client.new
|> host("slow-api.example.com")
|> timeout(60_000)  // 60 second timeout
Search Document