AgUiClient
The AgUiClient class is the primary interface for connecting to AG-UI compatible servers. It handles HTTP communication, SSE streaming, binary protocol encoding/decoding, and provides a type-safe API for agent interactions.
Constructor
AgUiClient({
required AgUiClientConfig config,
http.Client? httpClient,
Encoder? encoder,
Decoder? decoder,
})Parameters
config(required): Configuration object with server detailshttpClient(optional): Custom HTTP client implementationencoder(optional): Custom encoder for request serializationdecoder(optional): Custom decoder for response parsing
Methods
runAgent
Executes an agent and returns a stream of decoded events.
Stream<BaseEvent> runAgent(
String agentId,
RunAgentInput input, {
Map<String, String>? headers,
})Parameters
agentId: Unique identifier for the agentinput: Agent input containing messages, context, and configurationheaders: Optional additional headers for this request
Returns
A Stream<BaseEvent> that emits protocol events as they arrive.
Example
final input = SimpleRunAgentInput(
messages: [
UserMessage(id: 'msg_1', content: 'Hello, agent!'),
],
context: {'sessionId': '12345'},
);
await for (final event in client.runAgent('chat-agent', input)) {
switch (event) {
case RunStartedEvent():
print('Agent started');
case TextMessageDeltaEvent(delta: final text):
print('Agent says: $text');
case RunFinishedEvent():
print('Agent finished');
}
}runAgentRaw
Executes an agent and returns raw SSE messages without decoding.
Stream<SseMessage> runAgentRaw(
String agentId,
RunAgentInput input, {
Map<String, String>? headers,
})Use Cases
- Custom event processing
- Debugging and logging
- Performance optimization when decoding isn't needed
Example
await for (final message in client.runAgentRaw('agent', input)) {
print('Raw event: ${message.event}');
print('Raw data: ${message.data}');
}cancelAgent
Cancels an active agent execution.
Future<void> cancelAgent(String agentId)Parameters
agentId: The agent ID to cancel
Behavior
- Immediately closes the SSE connection
- Cleans up resources
- Causes the event stream to complete
Example
// Start long-running agent
final stream = client.runAgent('analysis-agent', input);
// Set up listener with timeout
final subscription = stream.listen(
(event) => processEvent(event),
onError: (error) => handleError(error),
);
// Cancel after 10 seconds
Timer(Duration(seconds: 10), () async {
await client.cancelAgent('analysis-agent');
await subscription.cancel();
});dispose
Cleans up all resources held by the client.
void dispose()Important
- Call this when the client is no longer needed
- Cancels all active streams
- Closes HTTP client connections
- Releases memory resources
Properties
config
final AgUiClientConfig config;The configuration used to initialize the client. Read-only.
activeStreams
Map<String, bool> get activeStreams;Returns a map of currently active agent IDs and their status.
Error Handling
The client throws specific exceptions for different error scenarios:
AgUiClientError
General client-side errors.
try {
await for (final event in client.runAgent('agent', input)) {
// Process events
}
} on AgUiClientError catch (e) {
print('Client error: ${e.message}');
print('Error code: ${e.code}');
}NetworkError
Network connectivity issues.
on NetworkError catch (e) {
print('Network error: ${e.message}');
// Implement retry logic
}ValidationError
Input validation failures.
on ValidationError catch (e) {
print('Validation failed: ${e.message}');
print('Failed fields: ${e.fields}');
}ServerError
Server-side errors (5xx status codes).
on ServerError catch (e) {
print('Server error: ${e.statusCode}');
print('Message: ${e.message}');
}Advanced Usage
Custom HTTP Client
Provide a custom HTTP client for advanced scenarios:
final retryClient = RetryClient(http.Client());
final client = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
httpClient: retryClient,
);Custom Encoding/Decoding
Implement custom encoders/decoders for specialized formats:
class CustomEncoder implements Encoder {
@override
List<int> encode(RunAgentInput input) {
// Custom encoding logic
return utf8.encode(jsonEncode(input.toJson()));
}
}
class CustomDecoder implements Decoder {
@override
BaseEvent decode(List<int> data) {
// Custom decoding logic
final json = jsonDecode(utf8.decode(data));
return BaseEvent.fromJson(json);
}
}
final client = AgUiClient(
config: config,
encoder: CustomEncoder(),
decoder: CustomDecoder(),
);Stream Transformations
Transform the event stream for specific use cases:
// Filter only message events
final messageStream = client
.runAgent('agent', input)
.where((event) => event is TextMessageEvent);
// Collect all text into a single string
final completeText = await client
.runAgent('agent', input)
.whereType<TextMessageDeltaEvent>()
.map((event) => event.delta)
.join();Concurrent Agents
Run multiple agents concurrently:
final agent1 = client.runAgent('agent1', input1);
final agent2 = client.runAgent('agent2', input2);
// Process both streams
await Future.wait([
agent1.forEach((event) => processAgent1(event)),
agent2.forEach((event) => processAgent2(event)),
]);Performance Considerations
Connection Pooling
The client reuses HTTP connections when possible. For high-throughput scenarios:
final httpClient = http.Client();
// Configure connection pooling
final client = AgUiClient(
config: config,
httpClient: httpClient,
);Memory Management
For long-running streams:
- Process events immediately rather than buffering
- Cancel streams when no longer needed
- Dispose of the client when done
Binary Protocol
The binary protocol is more efficient than JSON for large payloads:
// Binary protocol is used automatically when supported
final stream = client.runAgent('agent', input);Testing
Mock the client for unit tests:
class MockAgUiClient implements AgUiClient {
@override
Stream<BaseEvent> runAgent(String agentId, RunAgentInput input) {
return Stream.fromIterable([
RunStartedEvent(runId: 'test-run'),
TextMessageStartedEvent(messageId: 'msg-1'),
TextMessageDeltaEvent(messageId: 'msg-1', delta: 'Hello'),
TextMessageFinishedEvent(messageId: 'msg-1'),
RunFinishedEvent(runId: 'test-run'),
]);
}
}