134 lines
3.3 KiB
TypeScript
134 lines
3.3 KiB
TypeScript
import EventEmitter from 'events'
|
|
import last from 'lodash/last'
|
|
|
|
interface Timed {
|
|
timestamp: number
|
|
clientId?: number
|
|
}
|
|
|
|
function compareTimed(k0: Timed, k1: Timed): number {
|
|
const dt = k0.timestamp - k1.timestamp
|
|
if (dt !== 0) {
|
|
return dt
|
|
} else if (k0.clientId === undefined) {
|
|
return k1.clientId === undefined ? 0 : -1
|
|
} else {
|
|
return k1.clientId === undefined ? 1 : k0.clientId - k1.clientId
|
|
}
|
|
}
|
|
|
|
export interface UnconfAction<Action> extends Timed {
|
|
action: Action
|
|
clientId: number
|
|
}
|
|
|
|
interface Projection<State, Action> extends UnconfAction<Action> {
|
|
state: State
|
|
}
|
|
|
|
/**
|
|
* Events: projection
|
|
*/
|
|
export class KetchupSynced<State, Action> extends EventEmitter {
|
|
projs: Projection<State, Action>[] = []
|
|
|
|
constructor(
|
|
public confState: State,
|
|
public confBefore: number,
|
|
readonly reducer: (state: State, action: Action) => State,
|
|
unconfActions: UnconfAction<Action>[] = [],
|
|
) {
|
|
super()
|
|
unconfActions.forEach(a => this.tryProcess(a))
|
|
}
|
|
|
|
get projState(): State {
|
|
return last(this.projs)?.state ?? this.confState
|
|
}
|
|
|
|
unconfActions(): UnconfAction<Action>[] {
|
|
return this.projs.map(({ action, timestamp, clientId }) => ({ action, timestamp, clientId }))
|
|
}
|
|
|
|
tryProcess(msg: UnconfAction<Action>): boolean {
|
|
if (msg.timestamp < this.confBefore) {
|
|
return false
|
|
}
|
|
let { idx, exact } = this.indexSearch(msg)
|
|
if (exact) {
|
|
return false
|
|
}
|
|
this.projs.splice(idx, 0, {
|
|
...msg,
|
|
state: this.reducer(this.projs[idx - 1]?.state ?? this.confState, msg.action),
|
|
})
|
|
while (++idx < this.projs.length) {
|
|
this.projs[idx].state = this.reducer(this.projs[idx - 1].state, this.projs[idx].action)
|
|
}
|
|
this.emit('projection', this.projState)
|
|
return true
|
|
}
|
|
|
|
confirmBefore(timestamp: number): number {
|
|
if (timestamp <= this.confBefore) {
|
|
return 0
|
|
}
|
|
const { idx: numToDel } = this.indexSearch({ timestamp })
|
|
if (numToDel > 0) {
|
|
this.confState = this.projs[numToDel - 1].state
|
|
this.projs.splice(0, numToDel)
|
|
}
|
|
this.confBefore = timestamp
|
|
return numToDel
|
|
}
|
|
|
|
private indexSearch(x: Timed): { idx: number, exact: boolean } {
|
|
let lo = 0
|
|
let hi = this.projs.length
|
|
while (lo < hi) {
|
|
const mid = Math.floor((lo + hi) / 2)
|
|
const comp = compareTimed(x, this.projs[mid])
|
|
if (comp === 0 ) {
|
|
return { idx: mid, exact: true }
|
|
} else if (comp > 0) {
|
|
lo = mid + 1
|
|
} else {
|
|
hi = mid
|
|
}
|
|
}
|
|
return { idx: lo, exact: false }
|
|
}
|
|
}
|
|
|
|
export interface ActionServerMessage<Action> extends UnconfAction<Action> {
|
|
message: 'action'
|
|
}
|
|
export interface ConfirmServerMessage {
|
|
message: 'confirm'
|
|
confBefore: number
|
|
}
|
|
export interface HandshakeServerMessage<State, Action> {
|
|
message: 'handshake'
|
|
clientId: number
|
|
confState: State
|
|
confBefore: number
|
|
unconfActions: UnconfAction<Action>[]
|
|
}
|
|
export type ServerMessage<State, Action> =
|
|
ActionServerMessage<Action> |
|
|
ConfirmServerMessage |
|
|
HandshakeServerMessage<State, Action>
|
|
|
|
export interface ActionClientMessage<Action> {
|
|
message: 'action'
|
|
action: Action
|
|
timestamp: number
|
|
}
|
|
export interface HandshakeClientMessage {
|
|
message: 'handshake'
|
|
timestamp: number
|
|
}
|
|
export type ClientMessage<State, Action> =
|
|
ActionClientMessage<Action> |
|
|
HandshakeClientMessage
|