109 lines
3.2 KiB
TypeScript
109 lines
3.2 KiB
TypeScript
import WebSocket from 'ws'
|
|
import min from 'lodash/min'
|
|
import { ActionClientMessage, KetchupSynced as KetchupSynced, ActionServerMessage, HandshakeServerMessage, ConfirmServerMessage, ClientMessage, HandshakeClientMessage } from 'ketchup-common'
|
|
|
|
interface RemoteClient {
|
|
syncedAt?: number
|
|
ws: WebSocket
|
|
handshook: boolean
|
|
}
|
|
|
|
export default class KetchupServer<State, Action> {
|
|
clients = new Map<number, RemoteClient>()
|
|
synced: KetchupSynced<State, Action>
|
|
nextClientId = 0
|
|
|
|
constructor(
|
|
initState: State,
|
|
reducer: (state: State, action: Action) => State,
|
|
) {
|
|
this.synced = new KetchupSynced<State, Action>(
|
|
initState,
|
|
Date.now() - 1000, // TODO: Handle clients joining with clocks running significantly behind
|
|
reducer,
|
|
)
|
|
}
|
|
|
|
addRemoteClient(ws: WebSocket) {
|
|
const clientId = this.nextClientId++
|
|
this.clients.set(clientId, { ws, handshook: false })
|
|
ws.onmessage = event => this.onMessage(clientId, JSON.parse(event.data as string) as ClientMessage<State, Action>)
|
|
ws.onclose = () => this.onRemoteClose(clientId)
|
|
}
|
|
|
|
private onMessage(clientId: number, msg: ClientMessage<State, Action>) {
|
|
const client = this.clients.get(clientId)
|
|
if (!client) {
|
|
return
|
|
}
|
|
switch (msg.message) {
|
|
case 'handshake':
|
|
client.syncedAt = (msg as HandshakeClientMessage).timestamp
|
|
client.ws.send(JSON.stringify({
|
|
message: 'handshake',
|
|
clientId,
|
|
confState: this.synced.confState,
|
|
confBefore: this.synced.confBefore,
|
|
unconfActions: this.synced.unconfActions(),
|
|
} as HandshakeServerMessage<State, Action>))
|
|
client.handshook = true
|
|
break
|
|
|
|
case 'action':
|
|
if (client.handshook) {
|
|
this.onActionMessage(clientId, msg)
|
|
}
|
|
break
|
|
|
|
default:
|
|
throw new Error(`Invalid message from client: ${JSON.stringify(msg)}`)
|
|
}
|
|
}
|
|
|
|
private onActionMessage(clientId: number, cMsg: ActionClientMessage<Action>) {
|
|
const client = this.clients.get(clientId)
|
|
if (client && this.synced.tryProcess({
|
|
clientId,
|
|
action: cMsg.action,
|
|
timestamp: cMsg.timestamp
|
|
})) {
|
|
const sMsg = JSON.stringify({
|
|
message: 'action',
|
|
clientId,
|
|
action: cMsg.action,
|
|
timestamp: cMsg.timestamp,
|
|
} as ActionServerMessage<Action>)
|
|
for (const [id, client] of this.clients) {
|
|
if (id !== clientId) {
|
|
client.ws.send(sMsg)
|
|
}
|
|
}
|
|
|
|
client.syncedAt = cMsg.timestamp
|
|
this.considerConfirm()
|
|
}
|
|
}
|
|
|
|
private onRemoteClose(clientId: number) {
|
|
if (this.clients.delete(clientId)) {
|
|
this.considerConfirm()
|
|
}
|
|
}
|
|
|
|
private considerConfirm() {
|
|
const oldestSync = min([...this.clients.values()].map(c => c.syncedAt))
|
|
if (oldestSync !== undefined && this.synced.confirmBefore(oldestSync) > 0) {
|
|
console.log('Confirming before', oldestSync)
|
|
const msg = JSON.stringify({
|
|
message: 'confirm',
|
|
confBefore: this.synced.confBefore,
|
|
} as ConfirmServerMessage)
|
|
for (const client of this.clients.values()) {
|
|
client.ws.send(msg)
|
|
}
|
|
} else {
|
|
console.log('Not confirming before', oldestSync)
|
|
}
|
|
}
|
|
}
|