import { ExponentialBackoff, Websocket, WebsocketBuilder } from 'websocket-ts' import { ActionClientMessage, ActionServerMessage, ConfirmServerMessage, HandshakeClientMessage, HandshakeServerMessage, ServerMessage, KetchupSynced, HeartbeatClientMessage } from 'ketchup-common' import EventEmitter from 'events' export default class KetchupClient extends EventEmitter { readonly ws: Websocket synced?: KetchupSynced clientId?: number lastTimestamp?: number scheduledAck?: NodeJS.Timeout constructor( url: string, readonly reducer: (state: State, action: Action) => State, private readonly timeSource = Date.now, private readonly ackDebounce = 5000, ) { super() this.ws = new WebsocketBuilder(url) .onClose(() => { this.synced = undefined this.clientId = undefined this.lastTimestamp = undefined this.cancelAck() }) .onOpen(ws => { ws.send(JSON.stringify({ message: 'handshake', timestamp: timeSource(), } as HandshakeClientMessage)) }) .onMessage((_, event) => { this.onMessage(JSON.parse(event.data)) }) .withBackoff(new ExponentialBackoff(100, 7)) .build() } private onMessage(msg: ServerMessage) { switch (msg.message) { case 'handshake': const hMsg = msg as HandshakeServerMessage this.clientId = hMsg.clientId this.synced = new KetchupSynced(hMsg.confState, hMsg.confBefore, this.reducer, hMsg.unconfActions) this.synced.on('projection', proj => this.emit('projection', proj)) this.emit('projection', this.synced.projState) this.considerAck() break case 'action': this.synced!.tryProcess(msg as ActionServerMessage) this.considerAck() break case 'confirm': this.synced!.confirmBefore((msg as ConfirmServerMessage).confBefore) this.considerAck() break default: throw new Error(`Unknown message from server: ${msg}`) } } dispatch(action: Action, timestamp = this.timeSource()) { // In case of timestamp collision, shift event 1ms into future until timestamp is novel. // TODO: Better handling of events being submitted out of timestamp order if (this.lastTimestamp !== undefined && timestamp <= this.lastTimestamp) { timestamp = this.lastTimestamp + 1 } if (this.synced!.tryProcess({ timestamp, action, clientId: this.clientId! })) { this.cancelAck() this.lastTimestamp = timestamp this.ws.send(JSON.stringify({ message: 'action', action, timestamp, } as ActionClientMessage)) } } considerAck() { if (this.scheduledAck) { return } let oldestUnconfTimestamp = this.synced?.oldestUnconfTimestamp if (oldestUnconfTimestamp !== undefined && this.lastTimestamp !== undefined) { let behind = oldestUnconfTimestamp - this.lastTimestamp if (behind > this.ackDebounce) { // Way behind, ACK now this.sendAck() } else if (behind >= 0) { // Somewhat behind, ACK soon if we don't send an action this.scheduledAck = setTimeout( () => { this.sendAck(); this.scheduledAck = undefined }, this.ackDebounce - behind, ) } } } sendAck() { this.lastTimestamp = this.timeSource() this.ws.send(JSON.stringify({ message: 'heartbeat', timestamp: this.lastTimestamp, } as HeartbeatClientMessage)) } cancelAck() { if (this.scheduledAck) { clearTimeout(this.scheduledAck) this.scheduledAck = undefined } } }