Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions packages/genui/lib/src/transport/a2ui_parser_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class _A2uiParserStream {
late final StreamController<GenerationEvent> _controller;
StreamSubscription<String>? _subscription;
String _buffer = '';
bool _lastWasJson = false;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comment explaining what means true and what means false for this flag?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey!

true if the last event was an A2uiMessageEvent, false otherwise. So the parser can tell how to treat whitespace.

Would you prefer something more explicit? Maybe _wasLastEventA2UI?


Stream<GenerationEvent> get stream => _controller.stream;

Expand Down Expand Up @@ -130,22 +131,31 @@ class _A2uiParserStream {
}

if (firstPotentialStart == -1) {
// No potential JSON start. Emit all.
// No potential JSON start.
if (_buffer.isNotEmpty) {
if (_lastWasJson && _buffer.trim().isEmpty) {
// Whitespace-only after a JSON message: treat as JSONL separator.
// Hold in buffer until more data arrives or stream ends.
break;
}
Comment on lines +136 to +140
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The newly introduced logic for handling JSONL separators in the _processBuffer method allows the _buffer to grow indefinitely if a JSON message is followed by a continuous stream of whitespace. When _lastWasJson is true and the buffer contains only whitespace, the code breaks out of the processing loop without clearing or emitting the buffer (lines 136-140). An attacker providing a stream of data can exploit this by sending a valid JSON message followed by an infinite or extremely large amount of whitespace, leading to memory exhaustion and a Denial of Service (DoS) crash of the application.

To remediate this, implement a limit on the maximum amount of whitespace that can be held in the buffer as a separator. If the whitespace-only buffer exceeds a reasonable threshold (e.g., 4KB), it should be cleared or emitted as a TextEvent to prevent unbounded memory growth.

          if (_lastWasJson && _buffer.trim().isEmpty) {
            if (_buffer.length > 4096) {
              _emitText(_buffer);
              _buffer = '';
            } else {
              break;
            }
          }

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the non-JSON content would be interpreted as text, so I think that it's pretty reasonable for an infinite input stream to cause an OOM eventually. We can rely on other parts of the system to fail to prevent infinite input streams.

_emitText(_buffer);
_buffer = '';
}
break;
} else {
// Found a potential start at `firstPotentialStart`.
// Emit text BEFORE it.
if (firstPotentialStart > 0) {
_emitText(_buffer.substring(0, firstPotentialStart));
final String prefix = _buffer.substring(0, firstPotentialStart);
if (_lastWasJson && prefix.trim().isEmpty) {
// Skip whitespace-only prefix after a JSON message
// (JSONL separator).
_buffer = _buffer.substring(firstPotentialStart);
continue;
}
_emitText(prefix);
_buffer = _buffer.substring(firstPotentialStart);
}
// Now buffer starts with potential JSON.
// Since we already tried to parse and failed (if we are here),
// we must wait for more data.
// Buffer starts with potential JSON. Wait for more data.
break;
}
}
Expand All @@ -158,6 +168,7 @@ class _A2uiParserStream {
}

void _emitText(String text) {
_lastWasJson = false;
// Clean up protocol tags that might leak into text stream
final String cleanText = text
.replaceAll('<a2ui_message>', '')
Expand All @@ -172,22 +183,28 @@ class _A2uiParserStream {
if (json is Map<String, Object?>) {
try {
_controller.add(A2uiMessageEvent(A2uiMessage.fromJson(json)));
_lastWasJson = true;
} on A2uiValidationException catch (e) {
_controller.addError(e);
_lastWasJson = false;
} catch (_) {
// Failed to parse A2UI message structure (e.g. invalid type
// discriminator)
_controller.add(TextEvent(jsonEncode(json)));
_lastWasJson = false;
}
} else if (json is List) {
for (final Object? item in json) {
if (item is Map<String, Object?>) {
try {
_controller.add(A2uiMessageEvent(A2uiMessage.fromJson(item)));
_lastWasJson = true;
} on A2uiValidationException catch (e) {
_controller.addError(e);
_lastWasJson = false;
} catch (_) {
_controller.add(TextEvent(jsonEncode(item)));
_lastWasJson = false;
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions packages/genui/test/transport/a2ui_parser_transformer_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,33 @@ void main() {

await queue.cancel();
});

test('extracts messages from a JSONL stream', () async {
final StreamQueue<GenerationEvent> queue = StreamQueue(stream);

controller.add(
'{"version": "v0.9", "createSurface": '
'{"surfaceId": "foo", "catalogId": "genui"}'
'}',
);

controller.add('\n');

controller.add(
'{"version": "v0.9", "createSurface": '
'{"surfaceId": "bar", "catalogId": "genui"}'
'}',
);

final firstEvent = (await queue.next) as A2uiMessageEvent;
expect(firstEvent.message, isA<CreateSurface>());
expect((firstEvent.message as CreateSurface).surfaceId, 'foo');

final secondEvent = (await queue.next) as A2uiMessageEvent;
expect(secondEvent.message, isA<CreateSurface>());
expect((secondEvent.message as CreateSurface).surfaceId, 'bar');

await queue.cancel();
});
});
}