AgUiClient

The AgUiClient class is the primary interface for connecting to AG-UI compatible servers. It handles HTTP communication, SSE streaming, binary protocol encoding/decoding, and provides a type-safe API for agent interactions.

Constructor

AgUiClient({
  required AgUiClientConfig config,
  http.Client? httpClient,
  Encoder? encoder,
  Decoder? decoder,
})

Parameters

  • config (required): Configuration object with server details
  • httpClient (optional): Custom HTTP client implementation
  • encoder (optional): Custom encoder for request serialization
  • decoder (optional): Custom decoder for response parsing

Methods

runAgent

Executes an agent and returns a stream of decoded events.

Stream<BaseEvent> runAgent(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})

Parameters

  • agentId: Unique identifier for the agent
  • input: Agent input containing messages, context, and configuration
  • headers: Optional additional headers for this request

Returns

A Stream<BaseEvent> that emits protocol events as they arrive.

Example

final input = SimpleRunAgentInput(
  messages: [
    UserMessage(id: 'msg_1', content: 'Hello, agent!'),
  ],
  context: {'sessionId': '12345'},
);

await for (final event in client.runAgent('chat-agent', input)) {
  switch (event) {
    case RunStartedEvent():
      print('Agent started');
    case TextMessageDeltaEvent(delta: final text):
      print('Agent says: $text');
    case RunFinishedEvent():
      print('Agent finished');
  }
}

runAgentRaw

Executes an agent and returns raw SSE messages without decoding.

Stream<SseMessage> runAgentRaw(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})

Use Cases

  • Custom event processing
  • Debugging and logging
  • Performance optimization when decoding isn't needed

Example

await for (final message in client.runAgentRaw('agent', input)) {
  print('Raw event: ${message.event}');
  print('Raw data: ${message.data}');
}

cancelAgent

Cancels an active agent execution.

Future<void> cancelAgent(String agentId)

Parameters

  • agentId: The agent ID to cancel

Behavior

  • Immediately closes the SSE connection
  • Cleans up resources
  • Causes the event stream to complete

Example

// Start long-running agent
final stream = client.runAgent('analysis-agent', input);

// Set up listener with timeout
final subscription = stream.listen(
  (event) => processEvent(event),
  onError: (error) => handleError(error),
);

// Cancel after 10 seconds
Timer(Duration(seconds: 10), () async {
  await client.cancelAgent('analysis-agent');
  await subscription.cancel();
});

dispose

Cleans up all resources held by the client.

void dispose()

Important

  • Call this when the client is no longer needed
  • Cancels all active streams
  • Closes HTTP client connections
  • Releases memory resources

Properties

config

final AgUiClientConfig config;

The configuration used to initialize the client. Read-only.

activeStreams

Map<String, bool> get activeStreams;

Returns a map of currently active agent IDs and their status.

Error Handling

The client throws specific exceptions for different error scenarios:

AgUiClientError

General client-side errors.

try {
  await for (final event in client.runAgent('agent', input)) {
    // Process events
  }
} on AgUiClientError catch (e) {
  print('Client error: ${e.message}');
  print('Error code: ${e.code}');
}

NetworkError

Network connectivity issues.

on NetworkError catch (e) {
  print('Network error: ${e.message}');
  // Implement retry logic
}

ValidationError

Input validation failures.

on ValidationError catch (e) {
  print('Validation failed: ${e.message}');
  print('Failed fields: ${e.fields}');
}

ServerError

Server-side errors (5xx status codes).

on ServerError catch (e) {
  print('Server error: ${e.statusCode}');
  print('Message: ${e.message}');
}

Advanced Usage

Custom HTTP Client

Provide a custom HTTP client for advanced scenarios:


final retryClient = RetryClient(http.Client());

final client = AgUiClient(
  config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
  httpClient: retryClient,
);

Custom Encoding/Decoding

Implement custom encoders/decoders for specialized formats:

class CustomEncoder implements Encoder {
  @override
  List<int> encode(RunAgentInput input) {
    // Custom encoding logic
    return utf8.encode(jsonEncode(input.toJson()));
  }
}

class CustomDecoder implements Decoder {
  @override
  BaseEvent decode(List<int> data) {
    // Custom decoding logic
    final json = jsonDecode(utf8.decode(data));
    return BaseEvent.fromJson(json);
  }
}

final client = AgUiClient(
  config: config,
  encoder: CustomEncoder(),
  decoder: CustomDecoder(),
);

Stream Transformations

Transform the event stream for specific use cases:

// Filter only message events
final messageStream = client
    .runAgent('agent', input)
    .where((event) => event is TextMessageEvent);

// Collect all text into a single string
final completeText = await client
    .runAgent('agent', input)
    .whereType<TextMessageDeltaEvent>()
    .map((event) => event.delta)
    .join();

Concurrent Agents

Run multiple agents concurrently:

final agent1 = client.runAgent('agent1', input1);
final agent2 = client.runAgent('agent2', input2);

// Process both streams
await Future.wait([
  agent1.forEach((event) => processAgent1(event)),
  agent2.forEach((event) => processAgent2(event)),
]);

Performance Considerations

Connection Pooling

The client reuses HTTP connections when possible. For high-throughput scenarios:

final httpClient = http.Client();
// Configure connection pooling
final client = AgUiClient(
  config: config,
  httpClient: httpClient,
);

Memory Management

For long-running streams:

  1. Process events immediately rather than buffering
  2. Cancel streams when no longer needed
  3. Dispose of the client when done

Binary Protocol

The binary protocol is more efficient than JSON for large payloads:

// Binary protocol is used automatically when supported
final stream = client.runAgent('agent', input);

Testing

Mock the client for unit tests:

class MockAgUiClient implements AgUiClient {
  @override
  Stream<BaseEvent> runAgent(String agentId, RunAgentInput input) {
    return Stream.fromIterable([
      RunStartedEvent(runId: 'test-run'),
      TextMessageStartedEvent(messageId: 'msg-1'),
      TextMessageDeltaEvent(messageId: 'msg-1', delta: 'Hello'),
      TextMessageFinishedEvent(messageId: 'msg-1'),
      RunFinishedEvent(runId: 'test-run'),
    ]);
  }
}