117 lines
3.6 KiB
TypeScript
117 lines
3.6 KiB
TypeScript
import { ExponentialBackoff, Websocket, WebsocketBuilder } from 'websocket-ts'
|
|
import { ActionClientMessage, ActionServerMessage, ConfirmServerMessage, HandshakeClientMessage, HandshakeServerMessage, ServerMessage, KetchupSynced, HeartbeatClientMessage } from 'ketchup-common'
|
|
import EventEmitter from 'events'
|
|
|
|
export default class KetchupClient<State, Action> extends EventEmitter {
|
|
readonly ws: Websocket
|
|
synced?: KetchupSynced<State, Action>
|
|
clientId?: number
|
|
lastTimestamp?: number
|
|
scheduledAck?: NodeJS.Timeout
|
|
|
|
constructor(
|
|
url: string,
|
|
readonly reducer: (state: State, action: Action) => State,
|
|
private readonly timeSource = Date.now,
|
|
private readonly ackDebounce = 5000,
|
|
) {
|
|
super()
|
|
this.ws = new WebsocketBuilder(url)
|
|
.onClose(() => {
|
|
this.synced = undefined
|
|
this.clientId = undefined
|
|
this.lastTimestamp = undefined
|
|
this.cancelAck()
|
|
})
|
|
.onOpen(ws => {
|
|
ws.send(JSON.stringify({
|
|
message: 'handshake',
|
|
timestamp: timeSource(),
|
|
} as HandshakeClientMessage))
|
|
})
|
|
.onMessage((_, event) => {
|
|
this.onMessage(JSON.parse(event.data))
|
|
})
|
|
.withBackoff(new ExponentialBackoff(100, 7))
|
|
.build()
|
|
}
|
|
|
|
private onMessage(msg: ServerMessage<State, Action>) {
|
|
switch (msg.message) {
|
|
case 'handshake':
|
|
const hMsg = msg as HandshakeServerMessage<State, Action>
|
|
this.clientId = hMsg.clientId
|
|
this.synced = new KetchupSynced(hMsg.confState, hMsg.confBefore, this.reducer, hMsg.unconfActions)
|
|
this.synced.on('projection', proj => this.emit('projection', proj))
|
|
this.emit('projection', this.synced.projState)
|
|
this.considerAck()
|
|
break
|
|
|
|
case 'action':
|
|
this.synced!.tryProcess(msg as ActionServerMessage<Action>)
|
|
this.considerAck()
|
|
break
|
|
|
|
case 'confirm':
|
|
this.synced!.confirmBefore((msg as ConfirmServerMessage).confBefore)
|
|
this.considerAck()
|
|
break
|
|
|
|
default:
|
|
throw new Error(`Unknown message from server: ${msg}`)
|
|
}
|
|
}
|
|
|
|
dispatch(action: Action, timestamp = this.timeSource()) {
|
|
// In case of timestamp collision, shift event 1ms into future until timestamp is novel.
|
|
// TODO: Better handling of events being submitted out of timestamp order
|
|
if (this.lastTimestamp !== undefined && timestamp <= this.lastTimestamp) {
|
|
timestamp = this.lastTimestamp + 1
|
|
}
|
|
|
|
if (this.synced!.tryProcess({ timestamp, action, clientId: this.clientId! })) {
|
|
this.cancelAck()
|
|
this.lastTimestamp = timestamp
|
|
this.ws.send(JSON.stringify({
|
|
message: 'action',
|
|
action,
|
|
timestamp,
|
|
} as ActionClientMessage<Action>))
|
|
}
|
|
}
|
|
|
|
considerAck() {
|
|
if (this.scheduledAck) {
|
|
return
|
|
}
|
|
|
|
let oldestUnconfTimestamp = this.synced?.oldestUnconfTimestamp
|
|
if (oldestUnconfTimestamp !== undefined && this.lastTimestamp !== undefined) {
|
|
let behind = oldestUnconfTimestamp - this.lastTimestamp
|
|
if (behind > this.ackDebounce) { // Way behind, ACK now
|
|
this.sendAck()
|
|
} else if (behind >= 0) { // Somewhat behind, ACK soon if we don't send an action
|
|
this.scheduledAck = setTimeout(
|
|
() => { this.sendAck(); this.scheduledAck = undefined },
|
|
this.ackDebounce - behind,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
sendAck() {
|
|
this.lastTimestamp = this.timeSource()
|
|
this.ws.send(JSON.stringify({
|
|
message: 'heartbeat',
|
|
timestamp: this.lastTimestamp,
|
|
} as HeartbeatClientMessage))
|
|
}
|
|
|
|
cancelAck() {
|
|
if (this.scheduledAck) {
|
|
clearTimeout(this.scheduledAck)
|
|
this.scheduledAck = undefined
|
|
}
|
|
}
|
|
}
|