import EventEmitter from 'events' import last from 'lodash/last' interface Timed { timestamp: number clientId?: number } function compareTimed(k0: Timed, k1: Timed): number { const dt = k0.timestamp - k1.timestamp if (dt !== 0) { return dt } else if (k0.clientId === undefined) { return k1.clientId === undefined ? 0 : -1 } else { return k1.clientId === undefined ? 1 : k0.clientId - k1.clientId } } export interface UnconfAction extends Timed { action: Action clientId: number } interface Projection extends UnconfAction { state: State } /** * Events: projection */ export class KetchupSynced extends EventEmitter { projs: Projection[] = [] constructor( public confState: State, public confBefore: number, readonly reducer: (state: State, action: Action) => State, unconfActions: UnconfAction[] = [], ) { super() unconfActions.forEach(a => this.tryProcess(a)) } get projState(): State { return last(this.projs)?.state ?? this.confState } unconfActions(): UnconfAction[] { return this.projs.map(({ action, timestamp, clientId }) => ({ action, timestamp, clientId })) } tryProcess(msg: UnconfAction): boolean { if (msg.timestamp < this.confBefore) { return false } let { idx, exact } = this.indexSearch(msg) if (exact) { return false } this.projs.splice(idx, 0, { ...msg, state: this.reducer(this.projs[idx - 1]?.state ?? this.confState, msg.action), }) while (++idx < this.projs.length) { this.projs[idx].state = this.reducer(this.projs[idx - 1].state, this.projs[idx].action) } this.emit('projection', this.projState) return true } confirmBefore(timestamp: number): number { if (timestamp <= this.confBefore) { return 0 } const { idx: numToDel } = this.indexSearch({ timestamp }) if (numToDel > 0) { this.confState = this.projs[numToDel - 1].state this.projs.splice(0, numToDel) } this.confBefore = timestamp return numToDel } private indexSearch(x: Timed): { idx: number, exact: boolean } { let lo = 0 let hi = this.projs.length while (lo < hi) { const mid = Math.floor((lo + hi) / 2) const comp = compareTimed(x, this.projs[mid]) if (comp === 0) { return { idx: mid, exact: true } } else if (comp > 0) { lo = mid + 1 } else { hi = mid } } return { idx: lo, exact: false } } get oldestUnconfTimestamp() { return this.projs?.[0]?.timestamp } } export interface ActionServerMessage extends UnconfAction { message: 'action' } export interface ConfirmServerMessage { message: 'confirm' confBefore: number } export interface HandshakeServerMessage { message: 'handshake' clientId: number confState: State confBefore: number unconfActions: UnconfAction[] } export type ServerMessage = ActionServerMessage | ConfirmServerMessage | HandshakeServerMessage export interface ActionClientMessage { message: 'action' action: Action timestamp: number } export interface HandshakeClientMessage { message: 'handshake' timestamp: number } export interface HeartbeatClientMessage { message: 'heartbeat' timestamp: number } export type ClientMessage = ActionClientMessage | HandshakeClientMessage | HeartbeatClientMessage