import crypto from 'crypto' const APP_CONSTANTS = { EVENTTYPES: { HBPING : 1, HBPONG : 2, STREAM_JOIN : 3, STREAM_LEAVE : 4, EVENTPAIR_RESP: 5, EVENTSUB_MSG : 6, }, } const make_kvsafe_stable_key_for_ns = function (str_ns, x) { const hash = crypto.createHash('md5').update(x).digest('hex') const out_key = str_ns + ':' + hash return out_key } const PAGE_TITLE = Bun.env['PAGE_TITLE'] const POLLING_SLEEP_MAINLOOP = 1000 const async_build_js_script = async (path_js_entry_script, build_options) => { const result = await Bun.build({ entrypoints: [ path_js_entry_script ], ...build_options, env: 'disable', minify: false, }) const str_out = await result.outputs[0].text() return str_out } const get_siteroot_html = function () {return ` ${PAGE_TITLE} `.trim()} const WS_SESSIONS_MAP = {} const wrap_send_event = (ws, subevt_type, subevt_data) => { if (ws.readyState === WebSocket.OPEN) { ws.send(new TextEncoder().encode(JSON.stringify([ subevt_type, subevt_data ]))) } else { console.warn(['ws readyState !== OPEN : dropping event send', { subevt_type, subevt_data }]) } return } const start_server = function ({ handlers, jsbuild_on_ws_connect }) { const server = Bun.serve({ hostname: Bun.env['CTX_WEBSERVER_HOST'] || '127.0.0.1', port : Bun.env['CTX_WEBSERVER_PORT'] ? parseInt(Bun.env['CTX_WEBSERVER_PORT']) : 8800, async fetch (req, server) { const tstamp_handled = Date.now() const url = new URL(req.url) let resp if (url.pathname === '/') { resp = new Response(get_siteroot_html(), { status: 200, headers: { 'Content-Type': 'text/html' } }) } else if (url.pathname === '/index.js') { const str_js = await jsbuild_on_ws_connect() resp = new Response( await async_build_js_script(import.meta.dir + '/../frontend/index.js', { define: { APP_CONSTANTS : JSON.stringify(APP_CONSTANTS), JS_ON_WS_CONNECT: JSON.stringify(str_js), } }), { status: 200, headers: { 'Content-Type': 'text/javascript' } } ) } else if (url.pathname === '/ws') { const uid_session = crypto.randomUUID() const ws_data = { uid_session, force_close: false, active_subs: {}, pending_teardowns: [], } if (server.upgrade(req, { data: ws_data, })) { // do not return a Response resp = undefined } resp = new Response('Upgrade failed', { status: 500 }) } else { resp = new Response(null, { status: 404 }) } return resp }, websocket: { message (ws, message) { try { const [ uid_envelope, evt_type, evt_data ] = JSON.parse(new TextDecoder().decode(message)) if (evt_type === APP_CONSTANTS.EVENTTYPES.HBPONG) { // distinctly does NOT need a "response" (b/c it already IS a response, to Ping) } else if (evt_type === APP_CONSTANTS.EVENTTYPES.STREAM_JOIN) { const { uid_sub, init_params, stream_key } = evt_data console.info({ uid_sub, init_params, stream_key }) const func_stream_handle = handlers[stream_key] if (func_stream_handle !== undefined) { // NOTE: allow client to set "uid_sub", BUT don't "trust" as safe key for our map const uid_safe_sub = make_kvsafe_stable_key_for_ns('safesub', uid_sub) const handler_meta = { init_params, do_stop: false, func_sub_send: (sub_msg) => { const is_final_message = false wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.EVENTSUB_MSG, [ uid_sub, { is_final_message, sub_msg } ]) return }, } func_stream_handle(handler_meta).then(() => { console.info('(stream handler finished)') const is_final_message = true const sub_msg = null wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.EVENTSUB_MSG, [ uid_sub, { is_final_message, sub_msg } ]) delete ws.data.active_subs[uid_safe_sub] return }).catch(console.error) ws.data.active_subs[uid_safe_sub] = { func_teardown: () => { handler_meta.do_stop = true return }, } wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.EVENTPAIR_RESP, [ uid_envelope, { success: true } ]) } else { wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.EVENTPAIR_RESP, [ uid_envelope, { success: false } ]) } } else if (evt_type === APP_CONSTANTS.EVENTTYPES.STREAM_LEAVE) { const { uid_sub } = evt_data // (re-make hashed key) const uid_safe_sub = make_kvsafe_stable_key_for_ns('safesub', uid_sub) const sub_info = ws.data.active_subs[uid_safe_sub] if (sub_info !== undefined) { sub_info.func_teardown() delete ws.data.active_subs[uid_safe_sub] console.info(['tore down (gracefully) Active Sub', { uid_safe_sub }]) } else { console.warn(['no sub_info for uid_safe_sub', { uid_safe_sub }]) } wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.EVENTPAIR_RESP, [ uid_envelope, { success: true } ]) } else { console.warn([ 'UNEXPECTED evt_type', evt_type ]) } } catch (err) { console.error(err) } return }, open (ws) { console.info('websocket OPEN') const { uid_session } = ws.data WS_SESSIONS_MAP[uid_session] = { ws, } return }, close (ws, code, message) { console.info('websocket CLOSE') const asyncfunc = async () => { // NOTE: blocking for (let func_teardown of ws.data.pending_teardowns) { await func_teardown() } return } asyncfunc().then(()=>{}).catch(console.error) return }, drain (ws) { console.info('websocket DRAIN') return }, }, }) return server } const async_run = async ({ handlers, jsbuild_on_ws_connect, }) => { const server = start_server({ handlers, jsbuild_on_ws_connect }) console.info(server) const ctx_obj = { do_shutdown: false, } process.on('SIGINT', () => { console.info('Received SIGINT') // NOTE: force close all WS for (let [ uid_session, { ws } ] of Object.entries(WS_SESSIONS_MAP)) { ws.close() } // let the loop "handle" whatever remains, then get out setTimeout(() => { ctx_obj.do_shutdown = true return }, 3 * 1000) return }) let loop_idx = 0 while (ctx_obj.do_shutdown === false) { loop_idx += 1 //console.info(`MAINLOOP : ${loop_idx}`) // process Clients // copy beforehand so can delete while iterating const lst_entries = [...Object.entries(WS_SESSIONS_MAP)] for (let [ uid_session, { ws } ] of lst_entries) { if (ws.readyState === WebSocket.OPEN) { if (ws.data.force_close === true) { console.warn([ 'FORCE CLOSING', ws.data.uid_session ]) ws.close() } else { wrap_send_event(ws, APP_CONSTANTS.EVENTTYPES.HBPING, true) } } else if (ws.readyState === WebSocket.CLOSED) { // handle Active Subs for (let [ uid_sub, sub_info ] of Object.entries(ws.data.active_subs)) { sub_info.func_teardown() console.info(['tore down hanging Active Sub', { uid_sub }]) } ws.data.active_subs = {} // cleanup delete WS_SESSIONS_MAP[uid_session] } } await Bun.sleep(POLLING_SLEEP_MAINLOOP) } process.exit() return } export { async_run }