diff --git a/transport/internet/browser_dialer/dialer.go b/transport/internet/browser_dialer/dialer.go index 1991284d607f..7018fedc9aec 100644 --- a/transport/internet/browser_dialer/dialer.go +++ b/transport/internet/browser_dialer/dialer.go @@ -6,70 +6,150 @@ import ( _ "embed" "encoding/base64" "encoding/json" + "io" "net/http" + "strings" + "sync" "time" "github.com/gorilla/websocket" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/platform" + u "github.com/xtls/xray-core/common/utils" "github.com/xtls/xray-core/common/uuid" ) //go:embed dialer.html var webpage []byte -type task struct { - Method string `json:"method"` - URL string `json:"url"` - Extra any `json:"extra,omitempty"` +//go:embed dialer.mjs +var dialerModule []byte + +type pageWithConnMap struct { + UUID string + ControlConn *websocket.Conn + ConnMap map[string]chan *websocket.Conn + ConnMapLock sync.Mutex } -var conns chan *websocket.Conn +var globalConnMap *u.TypedSyncMap[string, *pageWithConnMap] + +type task struct { + Method string `json:"m"` // request method + URL string `json:"u"` // destination URL + ConnUUID string `json:"c"` // connection UUID + Extra any `json:"e,omitempty"` // extra information (headers, WS subprotocol, referrer...) +} var upgrader = &websocket.Upgrader{ ReadBufferSize: 0, WriteBufferSize: 0, HandshakeTimeout: time.Second * 4, CheckOrigin: func(r *http.Request) bool { - return true + if r.URL.Query().Get("token") == csrfToken { + return true + } else { + errors.LogError(context.Background(), "Browser dialer rejected connection: Invalid CSRF token") + return false + } }, } +var csrfToken string + func init() { addr := platform.NewEnvFlag(platform.BrowserDialerAddress).GetValue(func() string { return "" }) - if addr != "" { - token := uuid.New() - csrfToken := token.String() - webpage = bytes.ReplaceAll(webpage, []byte("csrfToken"), []byte(csrfToken)) - conns = make(chan *websocket.Conn, 256) - go http.ListenAndServe(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/websocket" { - if r.URL.Query().Get("token") == csrfToken { - if conn, err := upgrader.Upgrade(w, r, nil); err == nil { - conns <- conn - } else { - errors.LogError(context.Background(), "Browser dialer http upgrade unexpected error") - } + if addr == "" { + return + } + token := uuid.New() + csrfToken = token.String() + globalConnMap = u.NewTypedSyncMap[string, *pageWithConnMap]() + webpage = bytes.ReplaceAll(webpage, []byte("__CSRF_TOKEN__"), []byte(csrfToken)) + go http.ListenAndServe(addr, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // user requests the HTML page + if r.URL.Path == "/dialer.mjs" { + w.Header().Set("Content-Type", "text/javascript; charset=utf-8") + w.Write(dialerModule) + return + } + if !strings.HasPrefix(r.URL.Path, "/ws") { + w.Write(webpage) + return + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + errors.LogError(context.Background(), "Browser dialer failed: Unhandled error") + return + } + path := strings.TrimPrefix(r.URL.Path, "/ws/") + pathParts := strings.Split(path, "/") + if len(pathParts) < 2 { + errors.LogError(context.Background(), "Browser dialer failed WebSocket upgrade: Insufficient UUID") + return + } + pageUUID := pathParts[0] + connUUID := pathParts[1] + if connUUID == "ctrl" { + page := &pageWithConnMap{ + UUID: pageUUID, + ControlConn: conn, + ConnMap: make(map[string]chan *websocket.Conn), + } + if _, ok := globalConnMap.Load(pageUUID); ok { + errors.LogError(context.Background(), "Browser dialer received duplicate control connection with same page UUID") + conn.Close() + return + } + globalConnMap.Store(pageUUID, page) + go func() { + _, reader, err := conn.NextReader() + if err != nil { + return } - } else { - w.Write(webpage) + // design and implement control message handling in the future if needed + io.Copy(io.Discard, reader) + }() + } else { + var page *pageWithConnMap + if page, _ = globalConnMap.Load(pageUUID); page == nil { + errors.LogError(context.Background(), "Browser dialer received sub-connection without existing control connection") + conn.Close() + return } - })) - } + page.ConnMapLock.Lock() + c := page.ConnMap[connUUID] + page.ConnMapLock.Unlock() + if c == nil { + errors.LogError(context.Background(), "Browser dialer received a sub-connection but we didn't request it") + conn.Close() + return + } + select { + case c <- conn: + case <-time.After(5 * time.Second): + conn.Close() + errors.LogError(context.Background(), "Browser dialer http upgrade unexpected error") + } + } + })) + go monitor() } func HasBrowserDialer() bool { - return conns != nil + return globalConnMap != nil } type webSocketExtra struct { - Protocol string `json:"protocol,omitempty"` + Protocol string `json:"p,omitempty"` } func DialWS(uri string, ed []byte) (*websocket.Conn, error) { + UUID := uuid.New() task := task{ - Method: "WS", - URL: uri, + Method: "WS", + URL: uri, + ConnUUID: UUID.String(), } if ed != nil { @@ -82,8 +162,8 @@ func DialWS(uri string, ed []byte) (*websocket.Conn, error) { } type httpExtra struct { - Referrer string `json:"referrer,omitempty"` - Headers map[string]string `json:"headers,omitempty"` + Referrer string `json:"r,omitempty"` + Headers map[string]string `json:"h,omitempty"` } func httpExtraFromHeaders(headers http.Header) *httpExtra { @@ -108,20 +188,24 @@ func httpExtraFromHeaders(headers http.Header) *httpExtra { } func DialGet(uri string, headers http.Header) (*websocket.Conn, error) { + UUID := uuid.New() task := task{ - Method: "GET", - URL: uri, - Extra: httpExtraFromHeaders(headers), + Method: "GET", + URL: uri, + ConnUUID: UUID.String(), + Extra: httpExtraFromHeaders(headers), } return dialTask(task) } func DialPost(uri string, headers http.Header, payload []byte) error { + UUID := uuid.New() task := task{ - Method: "POST", - URL: uri, - Extra: httpExtraFromHeaders(headers), + Method: "POST", + URL: uri, + ConnUUID: UUID.String(), + Extra: httpExtraFromHeaders(headers), } conn, err := dialTask(task) @@ -134,11 +218,6 @@ func DialPost(uri string, headers http.Header, payload []byte) error { return err } - err = CheckOK(conn) - if err != nil { - return err - } - conn.Close() return nil } @@ -149,31 +228,51 @@ func dialTask(task task) (*websocket.Conn, error) { return nil, err } - var conn *websocket.Conn - for { - conn = <-conns - if conn.WriteMessage(websocket.TextMessage, data) != nil { - conn.Close() - } else { - break - } + var Page *pageWithConnMap + // the order of iterating a map is random + globalConnMap.Range(func(_ string, page *pageWithConnMap) bool { + Page = page + return false + }) + if Page == nil { + return nil, errors.New("no control connection available") } - err = CheckOK(conn) + var conn *websocket.Conn + connChan := make(chan *websocket.Conn, 1) + Page.ConnMapLock.Lock() + Page.ConnMap[task.ConnUUID] = connChan + Page.ConnMapLock.Unlock() + defer func() { + Page.ConnMapLock.Lock() + delete(Page.ConnMap, task.ConnUUID) + Page.ConnMapLock.Unlock() + }() + err = Page.ControlConn.WriteMessage(websocket.TextMessage, data) if err != nil { - return nil, err + return nil, errors.New("failed to send task to control connection").Base(err) + } + select { + case conn = <-connChan: + return conn, nil + case <-time.After(5 * time.Second): + return nil, errors.New("timeout waiting for connection") } - - return conn, nil } -func CheckOK(conn *websocket.Conn) error { - if _, p, err := conn.ReadMessage(); err != nil { - conn.Close() - return err - } else if s := string(p); s != "ok" { - conn.Close() - return errors.New(s) +func monitor() { + ticker := time.NewTicker(16 * time.Second) + defer ticker.Stop() + for { + <-ticker.C + var pageToDel []*pageWithConnMap + globalConnMap.Range(func(_ string, page *pageWithConnMap) bool { + if err := page.ControlConn.WriteControl(websocket.PingMessage, []byte{}, time.Time{}); err != nil { + pageToDel = append(pageToDel, page) + } + return true + }) + for _, page := range pageToDel { + globalConnMap.Delete(page.UUID) + } } - - return nil } diff --git a/transport/internet/browser_dialer/dialer.html b/transport/internet/browser_dialer/dialer.html index c62135ae6b70..28f8e5e392ed 100644 --- a/transport/internet/browser_dialer/dialer.html +++ b/transport/internet/browser_dialer/dialer.html @@ -1,172 +1,46 @@ - + + Browser Dialer - - - + + - diff --git a/transport/internet/browser_dialer/dialer.mjs b/transport/internet/browser_dialer/dialer.mjs new file mode 100644 index 000000000000..27c7e2b84016 --- /dev/null +++ b/transport/internet/browser_dialer/dialer.mjs @@ -0,0 +1,297 @@ +// 2025 (C) Team Cloudchaser +// Licensed under MIT License + +"use strict"; + +const FutureSignal = class FutureSignal { + #trueResolve; + finished = false; + onfinish; + constructor() { + let upThis = this; + upThis.onfinish = new Promise((p) => { + upThis.#trueResolve = p; + }); + }; + resolve() { + this.finished = true; + this.#trueResolve(); + }; +}; +const denoClient = self.Deno?.createHttpClient({ + "allowHost": true +}); + +export default class AppatDialer { + report = true; // Set to false to disable response status reporting + instanceId; + #wsNext = true; // Set to true to trigger WS fallback + #rqNext = true; // Set to true to trigger fetch fallback + #isBrowser = 2; // 2 for half duplex browser, 1 for full duplex, 0 for non-browser + #prefix; + #csrf; + #compiledPrefix; + #compiledWsPrefix; + #controller; + #aborter; + #uploader = new Map(); + #uploadDeny = new Set(); + removeUploader(key) { + this.#uploader.get(key)[1].close(); + this.#uploader.delete(key); + }; + constructor(prefix, csrf) { + if (!Request.prototype.hasOwnProperty("body")) { + this.#rqNext = false; + throw(new Error("Fetch requests do not support streamable bodies")); + }; + if (typeof self?.WebSocketStream !== "function") { + this.#wsNext = false; + throw(new Error("WebSocket does not support streaming")); + }; + if (typeof self?.Deno !== "undefined") { + this.#isBrowser = 0; + }; + this.#csrf = csrf; + this.#prefix = prefix; + }; + async start() { + let upThis = this; + if (upThis.#controller) { + switch (upThis.#controller.readyState) { + case WebSocket.CLOSING: + case WebSocket.CLOSED: { + throw(new Error(`Attempted reconnection for an active dialer`)); + break; + }; + }; + }; + // Generate a new page ID + upThis.instanceId = self.crypto?.randomUUID(); + upThis.#compiledPrefix = `${upThis.#prefix}/ws/${upThis.instanceId}`; + upThis.#compiledWsPrefix = upThis.#compiledPrefix.replace("http", "ws"); + upThis.#controller = new WebSocket(`${upThis.#compiledPrefix}/ctrl?token=${upThis.#csrf}`); + upThis.#controller.addEventListener("error", (ev) => { + console.warn(`Control socket has errored out:`, ev.error); + }); + upThis.#controller.addEventListener("close", (ev) => { + console.warn(`Control socket closed.`); + upThis.#aborter?.abort(); + }); + upThis.#controller.addEventListener("opened", (ev) => { + console.warn(`Control socket is now ready.`); + upThis.#aborter = new AbortController(); + }); + upThis.#controller.addEventListener("message", async (ev) => { + let data = JSON.parse(ev.data); + console.debug(data); + if (upThis.#uploader.has(data.c)) { + upThis.#controller.send(`{"c":"${data.c}","s":0,"t":"AppatError","e":"appat.cidCollision"}`); + console.warn(`Connection "${data.c}" already existed. Request cancelled.`); + return; + }; + switch (data.m) { + case "PING": { + console.debug(`Pong!`); + break; + }; + case "APPAT": { + switch (data.e?.appat) { + case "requestEnd": { + if (upThis.#uploader.has(data.c)) { + upThis.removeUploader(data.c)(); + console.info(`Closed an ongoing upload.`); + } else { + upThis.#uploadDeny.add(data.c); + console.info(`Closed a future upload.`); + }; + break; + }; + }; + break; + }; + case "WS": { + //console.debug("Mweh!"); + if (upThis.#uploadDeny.has(data.c)) { + // Just cancel initiation altogether. + upThis.#uploadDeny.delete(data.c); + break; + }; + let destUrl = data.u; + if (destUrl.substring(0, 4) === "http") { + destUrl = `ws${destUrl.substring(4)}`; + }; + let wsOpt = {}; + if (data.e?.hasOwnProperty("p")) { + wsOpt.protocols = [data.e.p]; + }; + try { + let wsStream = new WebSocketStream(`${upThis.#compiledWsPrefix}/${data.c}?token=${upThis.#csrf}`); + let wsExternal = new WebSocketStream(destUrl, wsOpt); + upThis.#uploader.set(data.c, [ + data.m, + wsStream + ]); + let wsClosed = 0; // not closed, local close, remote close + wsStream.closed.then((closeObj) => { + if (wsClosed === 0) { + wsClosed = 1; + wsExternal.close(closeObj); + }; + }); + wsExternal.closed.then((closeObj) => { + if (wsClosed === 0) { + wsClosed = 2; + wsStream.close(closeObj); + if (upThis.report) { + let report = { + "c": data.c, + "s": closeObj.closeCode, + "t": closeObj.reason + }; + upThis.#controller.send(JSON.stringify(report)); + }; + }; + }); + let wsTunLocal = await wsStream.opened; + let wsTunRemote = await wsExternal.opened; + if (upThis.report) { + let report = { + "c": data.c, + "s": 101, + "t": "WebSocket upgrade" + }; + upThis.#controller.send(JSON.stringify(report)); + }; + wsTunLocal.readable.pipeTo(wsTunRemote.writable); + wsTunRemote.readable.pipeTo(wsTunLocal.writable); + } catch (err) { + console.warn(err); + if (upThis.report) { + let report = { + "c": data.c, + "s": 0, + "t": err.name, + "e": `${err.message}\n${err.stack}` + }; + upThis.#controller.send(JSON.stringify(report)); + }; + }; + break; + }; + case "WT": { + console.warn("WebTransport is not yet supported."); + break; + }; + case "HEAD": + case "GET": + case "POST": + case "PUT": + case "DELETE": + case "OPTIONS": + case "PATCH": { + let opt = { + "method": data.m, + "signal": upThis.#aborter + }; + if (data.hasOwnProperty("e")) { + if (data.e.hasOwnProperty("r")) { + opt.referrerPolicy = "unsafe-url"; + // Would this change when web safety gets disabled? + let rUrl = new URL(data.e.r); + opt.referrer = data.e.r.replace(`${rUrl.protocol}//${rUrl.hostname}`, ""); + }; + if (data.e.hasOwnProperty("h")) { + opt.headers = data.e.h; + }; + }; + try { + let wsStream = new WebSocketStream(`${upThis.#compiledWsPrefix}/${data.c}?token=${upThis.#csrf}`); + let wssTun = await wsStream.opened; + switch (data.m) { + case "POST": + case "PUT": + case "DELETE": + case "OPTIONS": + case "PATCH": { + // Add the request body + let sourceReader = wssTun.readable.getReader(); + opt.body = new ReadableStream({ + "queueingStrategy": new ByteLengthQueuingStrategy({ + "highWaterMark": 65536 + }), + "start": async (controller) => { + upThis.#uploader.set(data.c, [ + data.m, + controller, + sourceReader + ]) + }, + "pull": async (controller) => { + if (upThis.#uploadDeny.has(data.c)) { + upThis.removeUploader(data.c)(); + upThis.#uploadDeny.delete(data.c); + } else if (upThis.#uploader.has(data.c)) { + // Only pipe when still active + let {value, done} = await sourceReader.read(); + if (value !== undefined) { + controller.enqueue(value); + }; + if (done || upThis.#uploadDeny.has(data.c)) { + upThis.removeUploader(data.c)(); + upThis.#uploadDeny.delete(data.c); + }; + }; + } + }); + if (upThis.#isBrowser > 1) { + opt.duplex = "half"; + upThis.#controller.send(`{"c":"${data.c}","s":1,"t":"AppatError","e":"appat.halfDuplex"}`); + } else { + opt.duplex = "full"; + }; + break; + }; + }; + if (denoClient) { + opt.client = denoClient; + }; + let req = await fetch(data.u, opt); + if (upThis.report) { + let report = { + "c": data.c, + "s": req.status, + "t": req.statusText, + "h": {} + }; + for (const [k, v] of req.headers.entries()) { + report.h[k] = v; + }; + upThis.#controller.send(JSON.stringify(report)); + }; + if (data.m === "HEAD") { + wssTun.close(); + } else { + await req.body.pipeTo(wssTun.writable); + }; + } catch (err) { + console.warn(err); + if (upThis.report) { + let report = { + "c": data.c, + "s": 0, + "t": err.name, + "e": `${err.message}\n${err.stack}` + }; + upThis.#controller.send(JSON.stringify(report)); + }; + }; + break; + }; + default: { + console.warn(`Unsupported method: ${data.m}`); + }; + }; + }); + }; +};