PhoenixGenApi (PhoenixGenApi v2.20.1)

Copy Markdown View Source

PhoenixGenApi is a framework for building distributed API systems with Phoenix.

This library provides a comprehensive solution for handling API requests with support for multiple execution modes (sync, async, streaming), distributed node selection, permission checking, and automatic argument validation.

Features

  • Multiple Execution Modes: Support for synchronous, asynchronous, streaming, and fire-and-forget requests
  • Distributed Execution: Execute functions on remote nodes with automatic node selection
  • Node Selection Strategies: Random, hash-based, round-robin, and custom selection strategies
  • Automatic Argument Validation: Type checking and conversion for request arguments
  • Permission Control: Built-in permission checking for requests
  • Streaming Support: Handle long-running operations with streaming responses
  • Configuration Caching: Efficient caching of function configurations with automatic updates
  • Configuration Push: Remote nodes can actively push their service and function configs to the gateway
  • Rate Limiting: Global and per-API rate limiting with sliding window algorithm
  • Relay Messages: Group-based message relaying with public, private, and strict_private group types
  • Diagnostics: Runtime health checks, statistics, debug reports, and admin-gated tracing utilities

Architecture

The library consists of several key components:

Usage Example

Basic Setup

First, define your function configurations:

config = %PhoenixGenApi.Structs.FunConfig{
  request_type: "get_user",
  service: "user_service",
  nodes: ["user@node1", "user@node2"],
  choose_node_mode: :random,
  timeout: 5000,
  mfa: {UserService, :get_user, []},
  arg_types: %{"user_id" => :string},
  arg_orders: ["user_id"],
  response_type: :sync,
  check_permission: {:arg, "user_id"},
  request_info: false
}

# Add configuration to cache
PhoenixGenApi.ConfigDb.add(config)

Execute Requests

use PhoenixGenApi

# Create a request
request = %PhoenixGenApi.Structs.Request{
  request_id: "req_123",
  request_type: "get_user",
  user_id: "user_456",
  device_id: "device_789",
  args: %{"user_id" => "user_123"}
}

# Execute the request
response = PhoenixGenApi.Executor.execute!(request)

Streaming Requests

For long-running operations, use streaming mode:

stream_config = %PhoenixGenApi.Structs.FunConfig{
  request_type: "process_data",
  service: "processing_service",
  nodes: :local,
  choose_node_mode: :random,
  timeout: :infinity,
  mfa: {DataProcessor, :process_large_dataset, []},
  arg_types: %{"dataset_id" => :string},
  arg_orders: ["dataset_id"],
  response_type: :stream,
  check_permission: false,
  request_info: true
}

# The streaming function should send results using StreamHelper:
# StreamHelper.send_result(stream, chunk_data)
# StreamHelper.send_last_result(stream, final_data)
# Or: StreamHelper.send_complete(stream)

Channel Options

When using use PhoenixGenApi in a channel, the following options are available:

  • :event (default: "phoenix_gen_api") — the event name to handle.

  • :override_user_id (default: true) — when true, the user_id from socket.assigns is injected into the request payload, but only when socket.assigns.user_id is a verified non-empty binary. This prevents a client-supplied user_id from overriding the server-side value. The user_id in assigns must be set by a verified authentication step in Phoenix.Socket.connect/3, never from client payload.

  • :require_verified_user_id (default: true) — when true, requests are rejected immediately with "Authentication required" if socket.assigns.user_id is nil or empty. This prevents unauthenticated requests from reaching permission checks or function execution. Set to false for public endpoints that use check_permission: false.

    Security note: Setting this to false disables the early rejection of unauthenticated requests. Only do this for channels that serve public data.

Configuration

Add to your config.exs:

config :phoenix_gen_api, :gen_api,
  pull_timeout: 5_000,
  pull_interval: 30_000,
  detail_error: false,
  service_configs: [
    %{
      service: "user_service",
      nodes: ["user@node1", "user@node2"],
      module: "UserService",
      function: "get_config",
      args: []
    }
  ]

Rate Limiting Configuration

Configure rate limits in your config.exs:

config :phoenix_gen_api, :rate_limiter,
  enabled: true,
  fail_open: true,
  global_limits: [
    %{key: :user_id, max_requests: 2000, window_ms: 60_000},
    %{key: :device_id, max_requests: 10000, window_ms: 60_000}
  ],
  api_limits: [
    %{
      service: "data_service",
      request_type: "export_data",
      key: :user_id,
      max_requests: 10,
      window_ms: 60_000
    }
  ]

Learn More

For detailed information about specific components, see:

Summary

Functions

Adds a single global rate limit at runtime.

Attaches a telemetry handler to all PhoenixGenApi events.

[Shell Helper] Quick view of ConfigDb cache status.

[Shell Helper] Traces the call flow for a service/request_type from the gateway to its target nodes.

[Shell Helper] Print a formatted call flow trace to the console.

Checks rate limit for a request.

Cleans up expired failed config entries (older than 24h). Returns the number of entries removed.

Clears all failed config entries regardless of expiry.

[Shell Helper] Print a formatted cluster topology view to the console.

[Shell Helper] Returns the cluster topology view from this node.

[Shell Helper] Print a formatted debug report to the console.

Returns a debug-oriented runtime report.

Detaches all telemetry handlers with the given ID.

[Shell Helper] Quick view of failed FunConfig entries.

[Shell Helper] Print a formatted table of failed FunConfig entries to the console.

[Shell Helper] Print a summary of failed FunConfig entries to the console.

[Shell Helper] Print a formatted list of all registered call flows.

Gets the current global rate limits (may differ from config.exs if changed at runtime).

Gets all configured rate limits.

Gets current rate limit status for a key.

Returns a runtime health check for the VM, distribution, and PhoenixGenApi processes.

[Shell Helper] Print a formatted health check summary to the console.

[Shell Helper] Print a formatted request inspection to the console.

[Shell Helper] Inspects a request and returns its full execution plan.

[Shell Helper] Lists all registered call flows across all services.

[Shell Helper] Quick view of Worker Pool status.

Pushes a PushConfig to this server node.

[Shell Helper] Quick view of pushed services status.

Removes a global rate limit by key at runtime.

Resets rate limit counters for a specific key.

[Shell Helper] Quick view of current rate limit configuration.

[Shell Helper] Quick view and management of global rate limits.

[Shell Helper] Quick view of rate limit status for a user.

Sets (replaces) all global rate limits at runtime.

Returns runtime VM and PhoenixGenApi statistics.

[Shell Helper] Print formatted VM and PhoenixGenApi statistics to the console.

Stops an active streaming call.

Disables legacy Erlang tracing for processes or ports.

Disables call tracing for specific MFAs.

Enables call tracing for specific MFAs.

Enables legacy Erlang tracing for processes or ports.

Returns a small trace status snapshot.

Updates rate limit configuration at runtime.

Verifies that the server has the given service and config version.

Functions

add_global_limit(limit)

@spec add_global_limit(map()) :: :ok

Adds a single global rate limit at runtime.

If a limit with the same :key already exists, it will be replaced.

Parameters

  • limit - A map with :key, :max_requests, and :window_ms

Returns

  • :ok - Limit was added

Examples

PhoenixGenApi.add_global_limit(%{
  key: :ip_address,
  max_requests: 100,
  window_ms: 60_000
})

attach_telemetry(handler_id, function, config \\ %{})

@spec attach_telemetry(String.t(), function(), map()) :: :ok

Attaches a telemetry handler to all PhoenixGenApi events.

This is a convenience function that attaches handlers to both executor and rate limiter events with a single call.

Events

Executor Events

  • [:phoenix_gen_api, :executor, :request, :start]
  • [:phoenix_gen_api, :executor, :request, :stop]
  • [:phoenix_gen_api, :executor, :request, :exception]

Rate Limiter Events

  • [:phoenix_gen_api, :rate_limiter, :check]
  • [:phoenix_gen_api, :rate_limiter, :exceeded]
  • [:phoenix_gen_api, :rate_limiter, :reset]
  • [:phoenix_gen_api, :rate_limiter, :cleanup]

Parameters

  • handler_id - A unique string identifier for the handler
  • function - A 4-arity function: fn(event, measurements, metadata, config) -> any end
  • config - Optional configuration map passed to the handler (default: %{})

Examples

# Attach a simple logging handler
PhoenixGenApi.attach_telemetry("my-app", fn event, measurements, metadata, _config ->
  ...
end)

# Attach with custom config
PhoenixGenApi.attach_telemetry("metrics", &MyApp.Metrics.handle_event/4, %{prefix: "phoenix_gen_api"})

cache_status()

[Shell Helper] Quick view of ConfigDb cache status.

Usage in IEx

iex> PhoenixGenApi.cache_status()

call_flow(service, request_type, version \\ nil)

@spec call_flow(String.t() | atom(), String.t(), String.t() | nil) :: map()

[Shell Helper] Traces the call flow for a service/request_type from the gateway to its target nodes.

Usage in IEx

iex> PhoenixGenApi.call_flow("user_service", "get_user")

call_flow_print(service, request_type, version \\ nil)

[Shell Helper] Print a formatted call flow trace to the console.

Usage in IEx

iex> PhoenixGenApi.call_flow_print("user_service", "get_user")
iex> PhoenixGenApi.call_flow_print("user_service", "get_user", "1.0.0")

check_rate_limit(request)

@spec check_rate_limit(PhoenixGenApi.Structs.Request.t()) ::
  :ok | {:error, :rate_limited, map()}

Checks rate limit for a request.

This function checks both global and per-API rate limits. It is automatically called during request execution, but can also be called manually for custom rate limiting logic.

Parameters

  • request - The Request struct to check

Returns

  • :ok - Request is within all rate limits
  • {:error, :rate_limited, details} - Request exceeds a rate limit

Examples

request = %Request{user_id: "user_123", service: "my_service", request_type: "my_api"}

case PhoenixGenApi.check_rate_limit(request) do
  :ok ->
    # Proceed with execution

  {:error, :rate_limited, details} ->
    # Handle rate limit exceeded
end

cleanup_failed_configs()

@spec cleanup_failed_configs() :: non_neg_integer()

Cleans up expired failed config entries (older than 24h). Returns the number of entries removed.

clear_failed_configs()

@spec clear_failed_configs() :: :ok

Clears all failed config entries regardless of expiry.

cluster_print()

[Shell Helper] Print a formatted cluster topology view to the console.

Usage in IEx

iex> PhoenixGenApi.cluster_print()

cluster_view()

@spec cluster_view() :: map()

[Shell Helper] Returns the cluster topology view from this node.

Usage in IEx

iex> PhoenixGenApi.cluster_view()

debug_print(opts \\ [])

[Shell Helper] Print a formatted debug report to the console.

Usage in IEx

iex> PhoenixGenApi.debug_print(process_limit: 10)

debug_report(opts \\ [])

@spec debug_report(keyword()) :: map()

Returns a debug-oriented runtime report.

Delegates to PhoenixGenApi.Diagnostics.debug_report/1.

detach_telemetry(handler_id)

@spec detach_telemetry(String.t()) :: :ok

Detaches all telemetry handlers with the given ID.

Parameters

  • handler_id - The handler ID used when attaching

Examples

PhoenixGenApi.detach_telemetry("my-app")

failed_configs(opts \\ [])

[Shell Helper] Quick view of failed FunConfig entries.

Shows configs that failed validation during pull or push, with their service, request_type, version, source, node, and reason.

Usage in IEx

iex> PhoenixGenApi.failed_configs()
iex> PhoenixGenApi.failed_configs(source: :pull)
iex> PhoenixGenApi.failed_configs(source: :push, limit: 20)

failed_configs_print(opts \\ [])

[Shell Helper] Print a formatted table of failed FunConfig entries to the console.

Usage in IEx

iex> PhoenixGenApi.failed_configs_print()
iex> PhoenixGenApi.failed_configs_print(source: :pull, limit: 10)

failed_configs_summary()

[Shell Helper] Print a summary of failed FunConfig entries to the console.

Usage in IEx

iex> PhoenixGenApi.failed_configs_summary()

flows_print(opts \\ [])

[Shell Helper] Print a formatted list of all registered call flows.

Usage in IEx

iex> PhoenixGenApi.flows_print()
iex> PhoenixGenApi.flows_print(include_disabled: true)

get_global_limits()

@spec get_global_limits() :: [map()]

Gets the current global rate limits (may differ from config.exs if changed at runtime).

Returns

A list of global rate limit maps.

Examples

PhoenixGenApi.get_global_limits()
# => [%{key: :user_id, max_requests: 2000, window_ms: 60_000}]

get_rate_limit_config()

@spec get_rate_limit_config() :: %{global: list(), api: list()}

Gets all configured rate limits.

Returns

A map with :global and :api keys containing the configured limits.

get_rate_limit_status(key_value, scope, rate_limit_key)

@spec get_rate_limit_status(
  String.t(),
  :global | {String.t() | atom(), String.t()},
  atom() | String.t()
) :: map()

Gets current rate limit status for a key.

Returns

A list of maps with current usage information for all applicable rate limits.

health_check(opts \\ [])

@spec health_check(keyword()) :: map()

Returns a runtime health check for the VM, distribution, and PhoenixGenApi processes.

Delegates to PhoenixGenApi.Diagnostics.health_check/1.

health_print(opts \\ [])

[Shell Helper] Print a formatted health check summary to the console.

Usage in IEx

iex> PhoenixGenApi.health_print()
iex> PhoenixGenApi.health_print(max_memory_bytes: 100_000_000)

inspect_print(request)

[Shell Helper] Print a formatted request inspection to the console.

Usage in IEx

iex> PhoenixGenApi.inspect_print(%{service: "user_service", request_type: "get_user"})

inspect_request(request)

@spec inspect_request(map()) :: map()

[Shell Helper] Inspects a request and returns its full execution plan.

Usage in IEx

iex> PhoenixGenApi.inspect_request(%{service: "user_service", request_type: "get_user"})

list_call_flows(opts \\ [])

@spec list_call_flows(keyword()) :: [map()]

[Shell Helper] Lists all registered call flows across all services.

Usage in IEx

iex> PhoenixGenApi.list_call_flows()
iex> PhoenixGenApi.list_call_flows(include_disabled: true)

pool_status()

[Shell Helper] Quick view of Worker Pool status.

Usage in IEx

iex> PhoenixGenApi.pool_status()

push_config(push_config, opts \\ [])

@spec push_config(
  PhoenixGenApi.Structs.PushConfig.t() | map(),
  keyword()
) :: {:ok, :accepted} | {:ok, :skipped, term()} | {:error, term()}

Pushes a PushConfig to this server node.

This is the server-side API for receiving pushed configs from remote nodes. Remote nodes should use ConfigPusher.push/2 or ConfigPusher.push_on_startup/3 instead, which make RPC calls to this function.

Parameters

  • push_config - A %PushConfig{} struct or map that can be decoded into one
  • opts - Options keyword list:
    • :force - Force push even if version matches (default: false)

Returns

  • {:ok, :accepted} - New configs were stored successfully
  • {:ok, :skipped, reason} - Push was skipped (e.g., version matches)
  • {:error, reason} - Push failed (validation error, etc.)

Examples

alias PhoenixGenApi.Structs.PushConfig

push_config = %PushConfig{
  service: "my_service",
  nodes: [:"node1@host"],
  config_version: "1.0.0",
  fun_configs: [%FunConfig{...}]
}

{:ok, :accepted} = PhoenixGenApi.push_config(push_config)
{:ok, :skipped, :version_matches} = PhoenixGenApi.push_config(push_config)
{:ok, :accepted} = PhoenixGenApi.push_config(push_config, force: true)

pushed_services_status()

[Shell Helper] Quick view of pushed services status.

Shows all services that have been registered via push, along with their config versions and auto-pull registration status.

Usage in IEx

iex> PhoenixGenApi.pushed_services_status()

remove_global_limit(key)

@spec remove_global_limit(atom() | String.t()) :: :ok

Removes a global rate limit by key at runtime.

Parameters

  • key - The rate limit key to remove (:user_id, :device_id, etc.)

Returns

  • :ok - Limit was removed (or didn't exist)

Examples

PhoenixGenApi.remove_global_limit(:ip_address)

reset_rate_limit(key_value, scope, rate_limit_key)

@spec reset_rate_limit(
  String.t(),
  :global | {String.t() | atom(), String.t()},
  atom() | String.t()
) :: :ok

Resets rate limit counters for a specific key.

Parameters

  • key_value - The key value to reset (e.g., user ID)
  • scope - Either :global or {service, request_type} tuple
  • rate_limit_key - The type of key (:user_id, :device_id, etc.)

Returns

  • :ok - Counters were reset

Examples

# Reset all rate limits for a user
PhoenixGenApi.reset_rate_limit("user_123", :global, :user_id)

# Reset API-specific rate limit
PhoenixGenApi.reset_rate_limit("user_123", {"my_service", "my_api"}, :user_id)

rl_config()

[Shell Helper] Quick view of current rate limit configuration.

Usage in IEx

iex> PhoenixGenApi.rl_config()

rl_global()

[Shell Helper] Quick view and management of global rate limits.

Usage in IEx

# View current global limits
iex> PhoenixGenApi.rl_global()

# Set new global limits
iex> PhoenixGenApi.rl_global([%{key: :user_id, max_requests: 2000, window_ms: 60_000}])

# Add a single limit
iex> PhoenixGenApi.rl_global(:add, %{key: :ip_address, max_requests: 100, window_ms: 60_000})

# Remove a limit by key
iex> PhoenixGenApi.rl_global(:remove, :ip_address)

rl_global(limits)

rl_global(atom, limit)

rl_status(user_id)

[Shell Helper] Quick view of rate limit status for a user.

Usage in IEx

iex> PhoenixGenApi.rl_status("user_123")

set_global_limits(limits)

@spec set_global_limits([map()]) :: :ok

Sets (replaces) all global rate limits at runtime.

Parameters

  • limits - A list of global rate limit maps, each with:
    • :key - The rate limit key (:user_id, :device_id, :ip_address, or custom string)
    • :max_requests - Maximum requests allowed in the window
    • :window_ms - Window duration in milliseconds

Returns

  • :ok - Limits were updated

Examples

PhoenixGenApi.set_global_limits([
  %{key: :user_id, max_requests: 2000, window_ms: 60_000},
  %{key: :device_id, max_requests: 10000, window_ms: 60_000}
])

statistics(opts \\ [])

@spec statistics(keyword()) :: map()

Returns runtime VM and PhoenixGenApi statistics.

Delegates to PhoenixGenApi.Diagnostics.statistics/1.

stats_print()

[Shell Helper] Print formatted VM and PhoenixGenApi statistics to the console.

Usage in IEx

iex> PhoenixGenApi.stats_print()

stop_stream(request_id)

@spec stop_stream(pid()) :: :ok

Stops an active streaming call.

This function gracefully terminates a streaming call process and sends a completion message to the receiver. The stream call process is identified by its PID.

Parameters

  • stream_pid - The PID of the streaming call process to stop

Returns

  • :ok - The stop signal was sent successfully

Examples

# Start a stream
{:ok, stream_pid} = StreamCall.start_link(%{
  request: request,
  fun_config: config,
  receiver: self()
})

# Later, stop the stream
PhoenixGenApi.stop_stream(stream_pid)

# Receive the completion message
receive do
  {:stream_response, response} ->
    assert response.has_more == false
end

Notes

  • The stream call will send a completion response to its receiver before terminating
  • This does not notify the data generator process; it only stops the stream relay
  • If you need to stop the data generation itself, handle that in your generator function

stop_trace(targets \\ :all, opts \\ [])

@spec stop_trace(
  term(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Disables legacy Erlang tracing for processes or ports.

Requires the :disable_tracing admin action.

stop_trace_functions(mfas \\ :all)

@spec stop_trace_functions(term()) :: {:ok, map()} | {:error, term()}

Disables call tracing for specific MFAs.

Requires the :disable_tracing admin action.

trace_functions(mfas, opts \\ [])

@spec trace_functions(
  term(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Enables call tracing for specific MFAs.

Requires the :enable_tracing admin action.

trace_processes(targets, opts \\ [])

@spec trace_processes(
  term(),
  keyword()
) :: {:ok, map()} | {:error, term()}

Enables legacy Erlang tracing for processes or ports.

Requires the :enable_tracing admin action.

trace_status()

@spec trace_status() :: map()

Returns a small trace status snapshot.

update_rate_limit_config(config)

@spec update_rate_limit_config(map()) :: :ok

Updates rate limit configuration at runtime.

Parameters

  • config - A map with :global_limits and/or :api_limits keys

Returns

  • :ok - Configuration was updated

Examples

PhoenixGenApi.update_rate_limit_config(%{
  global_limits: [
    %{key: :user_id, max_requests: 2000, window_ms: 60_000}
  ]
})

verify_config(service, config_version)

@spec verify_config(String.t() | atom(), String.t()) ::
  {:ok, :matched} | {:ok, :mismatch, String.t()} | {:error, :not_found}

Verifies that the server has the given service and config version.

Useful for checking whether a push is necessary before sending the full configuration. Remote nodes should use ConfigPusher.verify/3 instead.

Parameters

  • service - The service name (string or atom)
  • config_version - The config version string to verify

Returns

  • {:ok, :matched} - Version matches what is stored
  • {:ok, :mismatch, stored_version} - Version differs from what is stored
  • {:error, :not_found} - Service is not known

Examples

{:ok, :matched} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:ok, :mismatch, "0.9.0"} = PhoenixGenApi.verify_config("my_service", "1.0.0")
{:error, :not_found} = PhoenixGenApi.verify_config("unknown_service", "1.0.0")