From e96fbc27ba19cdbc9e0f38737cd1464f9cd0b184 Mon Sep 17 00:00:00 2001 From: Kenneth Allen Date: Fri, 15 Oct 2021 19:00:18 +1100 Subject: [PATCH] Add client ACK --- packages/ketchup-client/index.ts | 49 +++++++++++++++++-- packages/ketchup-common/index.ts | 13 ++++- packages/ketchup-server/index.ts | 16 +++--- .../wonders-client/src/components/App/App.tsx | 9 ++-- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/packages/ketchup-client/index.ts b/packages/ketchup-client/index.ts index 9cd6d44..213a0dd 100644 --- a/packages/ketchup-client/index.ts +++ b/packages/ketchup-client/index.ts @@ -1,5 +1,5 @@ import { ExponentialBackoff, Websocket, WebsocketBuilder } from 'websocket-ts' -import { ActionClientMessage, ActionServerMessage, ConfirmServerMessage, HandshakeClientMessage, HandshakeServerMessage, ServerMessage, KetchupSynced } from 'ketchup-common' +import { ActionClientMessage, ActionServerMessage, ConfirmServerMessage, HandshakeClientMessage, HandshakeServerMessage, ServerMessage, KetchupSynced, HeartbeatClientMessage } from 'ketchup-common' import EventEmitter from 'events' export default class KetchupClient extends EventEmitter { @@ -7,11 +7,13 @@ export default class KetchupClient extends EventEmitter { synced?: KetchupSynced clientId?: number lastTimestamp?: number + scheduledAck?: NodeJS.Timeout constructor( url: string, readonly reducer: (state: State, action: Action) => State, - private readonly timeSource = Date.now + private readonly timeSource = Date.now, + private readonly ackDebounce = 5000, ) { super() this.ws = new WebsocketBuilder(url) @@ -19,6 +21,7 @@ export default class KetchupClient extends EventEmitter { this.synced = undefined this.clientId = undefined this.lastTimestamp = undefined + this.cancelAck() }) .onOpen(ws => { ws.send(JSON.stringify({ @@ -41,16 +44,19 @@ export default class KetchupClient extends EventEmitter { this.synced = new KetchupSynced(hMsg.confState, hMsg.confBefore, this.reducer, hMsg.unconfActions) this.synced.on('projection', proj => this.emit('projection', proj)) this.emit('projection', this.synced.projState) + this.considerAck() break case 'action': this.synced!.tryProcess(msg as ActionServerMessage) + this.considerAck() break case 'confirm': this.synced!.confirmBefore((msg as ConfirmServerMessage).confBefore) + this.considerAck() break - + default: throw new Error(`Unknown message from server: ${msg}`) } @@ -63,7 +69,8 @@ export default class KetchupClient extends EventEmitter { timestamp = this.lastTimestamp + 1 } - if (this.synced?.tryProcess({ timestamp, action, clientId: this.clientId! })) { + if (this.synced!.tryProcess({ timestamp, action, clientId: this.clientId! })) { + this.cancelAck() this.lastTimestamp = timestamp this.ws.send(JSON.stringify({ message: 'action', @@ -72,4 +79,38 @@ export default class KetchupClient extends EventEmitter { } as ActionClientMessage)) } } + + considerAck() { + if (this.scheduledAck) { + return + } + + let oldestUnconfTimestamp = this.synced?.oldestUnconfTimestamp + if (oldestUnconfTimestamp !== undefined && this.lastTimestamp !== undefined) { + let behind = oldestUnconfTimestamp - this.lastTimestamp + if (behind > this.ackDebounce) { // Way behind, ACK now + this.sendAck() + } else if (behind >= 0) { // Somewhat behind, ACK soon if we don't send an action + this.scheduledAck = setTimeout( + () => { this.sendAck(); this.scheduledAck = undefined }, + this.ackDebounce - behind, + ) + } + } + } + + sendAck() { + this.lastTimestamp = this.timeSource() + this.ws.send(JSON.stringify({ + message: 'heartbeat', + timestamp: this.lastTimestamp, + } as HeartbeatClientMessage)) + } + + cancelAck() { + if (this.scheduledAck) { + clearTimeout(this.scheduledAck) + this.scheduledAck = undefined + } + } } diff --git a/packages/ketchup-common/index.ts b/packages/ketchup-common/index.ts index c2e5b71..df6e857 100644 --- a/packages/ketchup-common/index.ts +++ b/packages/ketchup-common/index.ts @@ -88,7 +88,7 @@ export class KetchupSynced extends EventEmitter { while (lo < hi) { const mid = Math.floor((lo + hi) / 2) const comp = compareTimed(x, this.projs[mid]) - if (comp === 0 ) { + if (comp === 0) { return { idx: mid, exact: true } } else if (comp > 0) { lo = mid + 1 @@ -98,6 +98,10 @@ export class KetchupSynced extends EventEmitter { } return { idx: lo, exact: false } } + + get oldestUnconfTimestamp() { + return this.projs?.[0]?.timestamp + } } export interface ActionServerMessage extends UnconfAction { @@ -128,6 +132,11 @@ export interface HandshakeClientMessage { message: 'handshake' timestamp: number } +export interface HeartbeatClientMessage { + message: 'heartbeat' + timestamp: number +} export type ClientMessage = ActionClientMessage | - HandshakeClientMessage + HandshakeClientMessage | + HeartbeatClientMessage diff --git a/packages/ketchup-server/index.ts b/packages/ketchup-server/index.ts index b164b11..5f9f16b 100644 --- a/packages/ketchup-server/index.ts +++ b/packages/ketchup-server/index.ts @@ -1,6 +1,6 @@ import WebSocket from 'ws' import min from 'lodash/min' -import { ActionClientMessage, KetchupSynced as KetchupSynced, ActionServerMessage, HandshakeServerMessage, ConfirmServerMessage, ClientMessage, HandshakeClientMessage } from 'ketchup-common' +import { ActionClientMessage, KetchupSynced as KetchupSynced, ActionServerMessage, HandshakeServerMessage, ConfirmServerMessage, ClientMessage, HandshakeClientMessage, HeartbeatClientMessage } from 'ketchup-common' interface RemoteClient { syncedAt?: number @@ -15,11 +15,11 @@ export default class KetchupServer { constructor( initState: State, - reducer: (state: State, action: Action) => State, + reducer: (state: State, action: Action) => State, ) { this.synced = new KetchupSynced( - initState, - Date.now() - 1000, // TODO: Handle clients joining with clocks running significantly behind + initState, + Date.now() - (1000 * 60 * 60 * 24), // TODO: Handle clients joining with clocks running significantly behind reducer, ) } @@ -55,6 +55,11 @@ export default class KetchupServer { } break + case 'heartbeat': + client.syncedAt = (msg as HeartbeatClientMessage).timestamp + this.considerConfirm() + break + default: throw new Error(`Invalid message from client: ${JSON.stringify(msg)}`) } @@ -93,7 +98,6 @@ export default class KetchupServer { private considerConfirm() { const oldestSync = min([...this.clients.values()].map(c => c.syncedAt)) if (oldestSync !== undefined && this.synced.confirmBefore(oldestSync) > 0) { - console.log('Confirming before', oldestSync) const msg = JSON.stringify({ message: 'confirm', confBefore: this.synced.confBefore, @@ -101,8 +105,6 @@ export default class KetchupServer { for (const client of this.clients.values()) { client.ws.send(msg) } - } else { - console.log('Not confirming before', oldestSync) } } } diff --git a/packages/wonders-client/src/components/App/App.tsx b/packages/wonders-client/src/components/App/App.tsx index f082b3c..8c82593 100644 --- a/packages/wonders-client/src/components/App/App.tsx +++ b/packages/wonders-client/src/components/App/App.tsx @@ -1,4 +1,4 @@ -import { useEffect, useState } from 'react'; +import { useState } from 'react'; import useKetchup from 'ketchup-react' import { addPlayerAction, reducer, removePlayerAction, resetAction } from 'wonders-common' import './App.css'; @@ -10,9 +10,6 @@ import Row from 'react-bootstrap/Row' export default function App() { const [state, dispatch] = useKetchup('ws://localhost:4000', reducer) - useEffect(() => { - console.debug('State', state) - }, [state]) const [user, setUser] = useState() function selectUser(name?: string) { @@ -29,10 +26,10 @@ export default function App() { } return - p.name) ?? []} locked={state !== undefined && state.stage !== 'starting'}/> + p.name) ?? []} locked={state !== undefined && state.stage !== 'starting'} /> {state ? <> - + : Loading...