space/packages/ketchup-common/src/index.ts
2021-10-22 17:55:18 +11:00

143 lines
3.5 KiB
TypeScript

import EventEmitter from 'events'
import last from 'lodash/last'
interface Timed {
timestamp: number
clientId?: number
}
function compareTimed(k0: Timed, k1: Timed): number {
const dt = k0.timestamp - k1.timestamp
if (dt !== 0) {
return dt
} else if (k0.clientId === undefined) {
return k1.clientId === undefined ? 0 : -1
} else {
return k1.clientId === undefined ? 1 : k0.clientId - k1.clientId
}
}
export interface UnconfAction<Action> extends Timed {
action: Action
clientId: number
}
interface Projection<State, Action> extends UnconfAction<Action> {
state: State
}
/**
* Events: projection
*/
export class KetchupSynced<State, Action> extends EventEmitter {
projs: Projection<State, Action>[] = []
constructor(
public confState: State,
public confBefore: number,
readonly reducer: (state: State, action: Action) => State,
unconfActions: UnconfAction<Action>[] = [],
) {
super()
unconfActions.forEach(a => this.tryProcess(a))
}
get projState(): State {
return last(this.projs)?.state ?? this.confState
}
unconfActions(): UnconfAction<Action>[] {
return this.projs.map(({ action, timestamp, clientId }) => ({ action, timestamp, clientId }))
}
tryProcess(msg: UnconfAction<Action>): boolean {
if (msg.timestamp < this.confBefore) {
return false
}
let { idx, exact } = this.indexSearch(msg)
if (exact) {
return false
}
this.projs.splice(idx, 0, {
...msg,
state: this.reducer(this.projs[idx - 1]?.state ?? this.confState, msg.action),
})
while (++idx < this.projs.length) {
this.projs[idx].state = this.reducer(this.projs[idx - 1].state, this.projs[idx].action)
}
this.emit('projection', this.projState)
return true
}
confirmBefore(timestamp: number): number {
if (timestamp <= this.confBefore) {
return 0
}
const { idx: numToDel } = this.indexSearch({ timestamp })
if (numToDel > 0) {
this.confState = this.projs[numToDel - 1].state
this.projs.splice(0, numToDel)
}
this.confBefore = timestamp
return numToDel
}
private indexSearch(x: Timed): { idx: number, exact: boolean } {
let lo = 0
let hi = this.projs.length
while (lo < hi) {
const mid = Math.floor((lo + hi) / 2)
const comp = compareTimed(x, this.projs[mid])
if (comp === 0) {
return { idx: mid, exact: true }
} else if (comp > 0) {
lo = mid + 1
} else {
hi = mid
}
}
return { idx: lo, exact: false }
}
get oldestUnconfTimestamp() {
return this.projs?.[0]?.timestamp
}
}
export interface ActionServerMessage<Action> extends UnconfAction<Action> {
message: 'action'
}
export interface ConfirmServerMessage {
message: 'confirm'
confBefore: number
}
export interface HandshakeServerMessage<State, Action> {
message: 'handshake'
clientId: number
confState: State
confBefore: number
unconfActions: UnconfAction<Action>[]
}
export type ServerMessage<State, Action> =
ActionServerMessage<Action> |
ConfirmServerMessage |
HandshakeServerMessage<State, Action>
export interface ActionClientMessage<Action> {
message: 'action'
action: Action
timestamp: number
}
export interface HandshakeClientMessage {
message: 'handshake'
timestamp: number
}
export interface HeartbeatClientMessage {
message: 'heartbeat'
timestamp: number
}
export type ClientMessage<State, Action> =
ActionClientMessage<Action> |
HandshakeClientMessage |
HeartbeatClientMessage