space/packages/ketchup-client/index.ts
2021-10-15 19:00:18 +11:00

117 lines
3.6 KiB
TypeScript

import { ExponentialBackoff, Websocket, WebsocketBuilder } from 'websocket-ts'
import { ActionClientMessage, ActionServerMessage, ConfirmServerMessage, HandshakeClientMessage, HandshakeServerMessage, ServerMessage, KetchupSynced, HeartbeatClientMessage } from 'ketchup-common'
import EventEmitter from 'events'
export default class KetchupClient<State, Action> extends EventEmitter {
readonly ws: Websocket
synced?: KetchupSynced<State, Action>
clientId?: number
lastTimestamp?: number
scheduledAck?: NodeJS.Timeout
constructor(
url: string,
readonly reducer: (state: State, action: Action) => State,
private readonly timeSource = Date.now,
private readonly ackDebounce = 5000,
) {
super()
this.ws = new WebsocketBuilder(url)
.onClose(() => {
this.synced = undefined
this.clientId = undefined
this.lastTimestamp = undefined
this.cancelAck()
})
.onOpen(ws => {
ws.send(JSON.stringify({
message: 'handshake',
timestamp: timeSource(),
} as HandshakeClientMessage))
})
.onMessage((_, event) => {
this.onMessage(JSON.parse(event.data))
})
.withBackoff(new ExponentialBackoff(100, 7))
.build()
}
private onMessage(msg: ServerMessage<State, Action>) {
switch (msg.message) {
case 'handshake':
const hMsg = msg as HandshakeServerMessage<State, Action>
this.clientId = hMsg.clientId
this.synced = new KetchupSynced(hMsg.confState, hMsg.confBefore, this.reducer, hMsg.unconfActions)
this.synced.on('projection', proj => this.emit('projection', proj))
this.emit('projection', this.synced.projState)
this.considerAck()
break
case 'action':
this.synced!.tryProcess(msg as ActionServerMessage<Action>)
this.considerAck()
break
case 'confirm':
this.synced!.confirmBefore((msg as ConfirmServerMessage).confBefore)
this.considerAck()
break
default:
throw new Error(`Unknown message from server: ${msg}`)
}
}
dispatch(action: Action, timestamp = this.timeSource()) {
// In case of timestamp collision, shift event 1ms into future until timestamp is novel.
// TODO: Better handling of events being submitted out of timestamp order
if (this.lastTimestamp !== undefined && timestamp <= this.lastTimestamp) {
timestamp = this.lastTimestamp + 1
}
if (this.synced!.tryProcess({ timestamp, action, clientId: this.clientId! })) {
this.cancelAck()
this.lastTimestamp = timestamp
this.ws.send(JSON.stringify({
message: 'action',
action,
timestamp,
} as ActionClientMessage<Action>))
}
}
considerAck() {
if (this.scheduledAck) {
return
}
let oldestUnconfTimestamp = this.synced?.oldestUnconfTimestamp
if (oldestUnconfTimestamp !== undefined && this.lastTimestamp !== undefined) {
let behind = oldestUnconfTimestamp - this.lastTimestamp
if (behind > this.ackDebounce) { // Way behind, ACK now
this.sendAck()
} else if (behind >= 0) { // Somewhat behind, ACK soon if we don't send an action
this.scheduledAck = setTimeout(
() => { this.sendAck(); this.scheduledAck = undefined },
this.ackDebounce - behind,
)
}
}
}
sendAck() {
this.lastTimestamp = this.timeSource()
this.ws.send(JSON.stringify({
message: 'heartbeat',
timestamp: this.lastTimestamp,
} as HeartbeatClientMessage))
}
cancelAck() {
if (this.scheduledAck) {
clearTimeout(this.scheduledAck)
this.scheduledAck = undefined
}
}
}