Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions crates/goat-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ pub enum ClientError {
pub struct Attachment {
pub ops: mpsc::Sender<Op>,
pub events: mpsc::Receiver<Event>,
pub presence: mpsc::Receiver<usize>,
pub client_id: u64,
pub pump: JoinHandle<()>,
}

const OPS_CAPACITY: usize = 32;
const EVENTS_CAPACITY: usize = 512;
const PRESENCE_CAPACITY: usize = 16;

pub async fn connect(
socket_path: &Path,
Expand All @@ -52,17 +55,18 @@ pub async fn connect(
version: PROTOCOL_VERSION,
})
.await?;
match conn.recv().await? {
ServerFrame::Welcome { version, .. } => {
let client_id = match conn.recv().await? {
ServerFrame::Welcome { version, client_id } => {
if version != PROTOCOL_VERSION {
return Err(ClientError::VersionMismatch(version));
}
client_id.0
}
ServerFrame::VersionMismatch { daemon_version } => {
return Err(ClientError::VersionMismatch(daemon_version));
}
_ => return Err(ClientError::Handshake),
}
};

conn.send(&ClientFrame::OpenSession {
cwd: cwd.display().to_string(),
Expand All @@ -75,12 +79,13 @@ pub async fn connect(
_ => return Err(ClientError::Handshake),
};

Ok(spawn_pumps(conn, session))
Ok(spawn_pumps(conn, session, client_id))
}

fn spawn_pumps(conn: ClientConn<Stream>, session: SessionId) -> Attachment {
fn spawn_pumps(conn: ClientConn<Stream>, session: SessionId, client_id: u64) -> Attachment {
let (ops_tx, mut ops_rx) = mpsc::channel::<Op>(OPS_CAPACITY);
let (events_tx, events_rx) = mpsc::channel::<Event>(EVENTS_CAPACITY);
let (presence_tx, presence_rx) = mpsc::channel::<usize>(PRESENCE_CAPACITY);

let (sink, mut source) = conn.split();
let mut sink = Box::pin(sink);
Expand Down Expand Up @@ -140,6 +145,10 @@ fn spawn_pumps(conn: ClientConn<Stream>, session: SessionId) -> Attachment {
idmap.lock().await.record_correlation(*correlation, *task);
continue;
}
if let ServerFrame::Presence { clients, .. } = &frame {
let _ = presence_tx.try_send(clients.len());
continue;
}
if let ServerFrame::Snapshot { watermark, .. } = &frame {
expected_seq = Some(*watermark);
}
Expand All @@ -165,6 +174,8 @@ fn spawn_pumps(conn: ClientConn<Stream>, session: SessionId) -> Attachment {
Attachment {
ops: ops_tx,
events: events_rx,
presence: presence_rx,
client_id,
pump,
}
}
Expand Down
10 changes: 8 additions & 2 deletions crates/goat-code/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,15 @@ async fn run_tui(worktree_label: Option<String>, r#continue: bool) -> color_eyre
};

let attachment = goat_client::connect(&socket_path, &daemon_exe, cwd, resume).await?;
let goat_client::Attachment { ops, events, pump } = attachment;
let goat_client::Attachment {
ops,
events,
presence,
pump,
..
} = attachment;

goat_tui::run(ops, events, theme, Vec::new()).await?;
goat_tui::run(ops, events, presence, theme, Vec::new()).await?;
pump.abort();
Ok(())
}
Expand Down
51 changes: 28 additions & 23 deletions crates/goat-daemon/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,53 +59,58 @@ pub(crate) async fn handle_connection(
}
});

let mut graceful = false;
while let Some(Ok(frame)) = source.next().await {
if !dispatch(&manager, client_id, &out_tx, &shutdown, frame).await {
break;
match dispatch(&manager, client_id, &out_tx, &shutdown, frame).await {
Disposition::Continue => {}
Disposition::Closed => {
graceful = true;
break;
}
}
}

if graceful {
tracing::debug!(client = client_id.0, "client disconnected gracefully");
} else {
tracing::warn!(client = client_id.0, "client disconnected unexpectedly");
}
manager.drop_client(client_id).await;
writer.abort();
}

enum Disposition {
Continue,
Closed,
}

async fn dispatch(
manager: &Manager,
client_id: goat_wire::ClientId,
out_tx: &mpsc::Sender<ServerFrame>,
shutdown: &tokio_util::sync::CancellationToken,
frame: ClientFrame,
) -> bool {
) -> Disposition {
match frame {
ClientFrame::Hello { .. } => true,
ClientFrame::Hello { .. } => Disposition::Continue,
ClientFrame::OpenSession { cwd, resume } => {
let cwd_path = PathBuf::from(&cwd);
match manager.open_or_attach(cwd_path, resume).await {
Ok(session) => {
let _ = out_tx
.send(ServerFrame::SessionOpened {
session,
correlation_ok: true,
})
.await;
let _ = out_tx.send(ServerFrame::SessionOpened { session }).await;
let _ = manager.subscribe(session, client_id, out_tx.clone()).await;
}
Err(message) => {
let _ = out_tx.send(ServerFrame::Error { message }).await;
}
}
true
Disposition::Continue
}
ClientFrame::Attach { session } => {
if let Err(message) = manager.subscribe(session, client_id, out_tx.clone()).await {
let _ = out_tx.send(ServerFrame::Error { message }).await;
}
true
}
ClientFrame::Detach { session } => {
manager.unsubscribe(session, client_id).await;
let _ = out_tx.send(ServerFrame::Detached { session }).await;
true
Disposition::Continue
}
ClientFrame::Submit {
session,
Expand All @@ -115,29 +120,29 @@ async fn dispatch(
if let Err(message) = manager.submit(session, out_tx, correlation, op).await {
let _ = out_tx.send(ServerFrame::Error { message }).await;
}
true
Disposition::Continue
}
ClientFrame::Control { session, op } => {
if let Err(message) = manager.control(session, op).await {
let _ = out_tx.send(ServerFrame::Error { message }).await;
}
true
Disposition::Continue
}
ClientFrame::ListSessions => {
let sessions = manager.list_sessions().await;
let _ = out_tx.send(ServerFrame::SessionList { sessions }).await;
true
Disposition::Continue
}
ClientFrame::KillSession { session } => {
if let Err(message) = manager.kill_session(session).await {
let _ = out_tx.send(ServerFrame::Error { message }).await;
}
true
Disposition::Continue
}
ClientFrame::StopDaemon => {
shutdown.cancel();
false
Disposition::Closed
}
ClientFrame::Goodbye => false,
ClientFrame::Goodbye => Disposition::Closed,
}
}
18 changes: 0 additions & 18 deletions crates/goat-daemon/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,6 @@ impl Manager {
Ok(())
}

pub(crate) async fn unsubscribe(&self, session: SessionId, client: ClientId) {
let live = {
let table = self.inner.sessions.lock().await;
table.get(&session).cloned()
};
let mut evict = false;
if let Some(live) = live {
let mut inner = live.inner.lock().await;
crate::session::subscriber_map_remove(&mut inner.subscribers, client);
let clients = inner.presence();
broadcast_presence(&mut inner, clients);
evict = inner.evictable();
}
if evict {
self.evict_if_idle(session).await;
}
}

pub(crate) async fn drop_client(&self, client: ClientId) {
let lives: Vec<(SessionId, LiveSession)> = {
let table = self.inner.sessions.lock().await;
Expand Down
46 changes: 45 additions & 1 deletion crates/goat-tui/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub(crate) enum AppEvent {
Tick,
Engine(EngineEvent),
EngineClosed,
Presence(usize),
}

#[allow(clippy::struct_excessive_bools)]
Expand All @@ -97,6 +98,7 @@ pub struct App {
pub(crate) highlighter: SyntectHighlighter,
pub(crate) cwd: String,
pub(crate) next_task: u64,
pub(crate) window_count: usize,
pub(crate) spinner: usize,
pub(crate) quit_arm: Option<u16>,
pub(crate) clear_arm: Option<u16>,
Expand Down Expand Up @@ -174,6 +176,7 @@ impl App {
highlighter: SyntectHighlighter::new(),
cwd,
next_task: 1,
window_count: 1,
spinner: 0,
quit_arm: None,
clear_arm: None,
Expand Down Expand Up @@ -300,6 +303,13 @@ impl App {
self.should_quit = true;
Vec::new()
}
AppEvent::Presence(count) => {
if self.window_count != count {
self.window_count = count;
self.dirty = true;
}
Vec::new()
}
}
}

Expand Down Expand Up @@ -1022,13 +1032,22 @@ fn shorten_home(path: &Path) -> String {
pub async fn run(
ops: Sender<Op>,
mut events: Receiver<EngineEvent>,
mut presence: Receiver<usize>,
theme: Theme,
initial_ops: Vec<Op>,
) -> color_eyre::Result<()> {
let mut app = App::new(theme);
let (mut terminal, picker) = tui::init(app.mouse_capture)?;
app.picker = picker;
let result = event_loop(&mut terminal, &ops, &mut events, app, initial_ops).await;
let result = event_loop(
&mut terminal,
&ops,
&mut events,
&mut presence,
app,
initial_ops,
)
.await;
tui::restore();
let _ = ops.send(Op::Shutdown).await;
result
Expand All @@ -1038,6 +1057,7 @@ async fn event_loop(
terminal: &mut DefaultTerminal,
ops: &Sender<Op>,
events: &mut Receiver<EngineEvent>,
presence: &mut Receiver<usize>,
mut app: App,
initial_ops: Vec<Op>,
) -> color_eyre::Result<()> {
Expand All @@ -1062,6 +1082,7 @@ async fn event_loop(
Some(ev) => AppEvent::Engine(ev),
None => AppEvent::EngineClosed,
},
Some(count) = presence.recv() => AppEvent::Presence(count),
};

for op in app.update(event) {
Expand Down Expand Up @@ -2092,4 +2113,27 @@ mod tests {
);
assert!(app.current_context_window().is_none());
}

#[test]
fn presence_updates_window_count_and_marks_dirty() {
let mut app = App::new(Theme::dark());
app.take_dirty();
assert_eq!(app.window_count, 1);

let ops = app.update(super::AppEvent::Presence(3));
assert!(ops.is_empty());
assert_eq!(app.window_count, 3);
assert!(app.take_dirty());
}

#[test]
fn presence_with_same_count_is_not_dirty() {
let mut app = App::new(Theme::dark());
app.update(super::AppEvent::Presence(2));
app.take_dirty();

let ops = app.update(super::AppEvent::Presence(2));
assert!(ops.is_empty());
assert!(!app.take_dirty());
}
}
29 changes: 27 additions & 2 deletions crates/goat-tui/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ fn ctx_label(app: &App) -> Option<(String, f32)> {
.map(|(pct, _, _)| (format!("ctx {pct:.0}%"), pct))
}

pub(crate) fn window_label(window_count: usize) -> Option<String> {
(window_count > 1).then(|| format!("\u{29c9} {window_count}"))
}

fn render_header(frame: &mut Frame, area: Rect, app: &App, theme: Theme) {
let row = Rect { height: 1, ..area }.inner(Margin {
horizontal: PAD_X,
Expand All @@ -353,10 +357,15 @@ fn render_header(frame: &mut Frame, area: Rect, app: &App, theme: Theme) {

let model = model_label(app);
let ctx = ctx_label(app);
let windows = window_label(app.window_count);
let model_w = model.as_ref().map_or(0, |label| label.width());
let ctx_w = ctx.as_ref().map_or(0, |(label, _)| label.width());
let status_gap = usize::from(model.is_some()) * 2 + usize::from(ctx.is_some()) * 2;
let status_w = model_w + ctx_w + status_gap;
let windows_w = windows.as_ref().map_or(0, |label| label.width());
let status_gap = (usize::from(model.is_some())
+ usize::from(ctx.is_some())
+ usize::from(windows.is_some()))
* 2;
let status_w = model_w + ctx_w + windows_w + status_gap;
let cwd = fit_cwd(app.cwd(), inner_w.saturating_sub(status_w));

let mut spans: Vec<Span> = vec![Span::styled(cwd.clone(), theme.muted())];
Expand All @@ -365,6 +374,10 @@ fn render_header(frame: &mut Frame, area: Rect, app: &App, theme: Theme) {
if pad > 0 {
spans.push(Span::raw(" ".repeat(pad)));
}
if let Some(label) = windows {
spans.push(Span::raw(" "));
spans.push(Span::styled(label, theme.muted()));
}
if let Some(label) = model {
spans.push(Span::raw(" "));
spans.push(Span::styled(label, theme.key()));
Expand Down Expand Up @@ -568,4 +581,16 @@ mod tests {
"anthropic:work/claude-sonnet-4"
);
}

#[test]
fn window_label_hidden_for_single_window() {
assert_eq!(super::window_label(0), None);
assert_eq!(super::window_label(1), None);
}

#[test]
fn window_label_shown_for_multiple_windows() {
assert_eq!(super::window_label(2), Some("\u{29c9} 2".to_owned()));
assert_eq!(super::window_label(5), Some("\u{29c9} 5".to_owned()));
}
}
Loading
Loading