Skip to content

Commit

Permalink
multi thread socket
Browse files Browse the repository at this point in the history
  • Loading branch information
simon300000 committed Mar 9, 2024
1 parent 254fb33 commit 1369489
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 292 deletions.
12 changes: 9 additions & 3 deletions api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { vd } from './interface/vd.js'
import { hawk } from './interface/hawk.js'
import * as vdb from './interface/vdb.js'
import { socket as stateSocket } from './interface/state.js'
import { ioRaw, connectionLimit } from './interface/io.js'
import { ioRaw, setupConnectionLimit, setupConnectionMaster } from './interface/io.js'

import snake from './snake.js'

Expand All @@ -19,10 +19,17 @@ import httpAPI from './http.js'
const PARALLEL = 16
const INTERVAL = 1000 * 60 * 5

const wait = ms => new Promise(resolve => setTimeout(resolve, ms))

if (cluster.isPrimary) {
console.log('starting spider')
spider({ INTERVAL })
ant({ INTERVAL })
const server = http.createServer()
setupConnectionMaster(server)
await wait(2000)
server.listen(8001)
console.log('listening on 8001')
} else {
console.log('oh no, I am a worker')
stateSocket.on('log', log => ioRaw.to('state').emit('stateLog', log))
Expand All @@ -33,6 +40,5 @@ if (cluster.isPrimary) {
snake()
hawk(ioRaw)
ioRaw.on('connection', connect({ PARALLEL, INTERVAL }))
server.listen(8001)
connectionLimit(server, [ioRaw, vd])
setupConnectionLimit()
}
84 changes: 33 additions & 51 deletions api/interface/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import cluster from 'node:cluster'
import type { Server as HTTPServer } from 'node:http'

import { Server } from 'socket.io'
import { setupMaster, setupWorker } from '@socket.io/sticky'

import * as vdb from './vdb.js'

Expand Down Expand Up @@ -36,12 +37,10 @@ type Message = {
emitInfoArray?: boolean
sharedDB?: ShareDB
updateVDB?: true
close?: true
online?: number
}

type MessageToPrimary = {
overLimit?: true
online?: number
}

Expand Down Expand Up @@ -118,28 +117,46 @@ const rawEmit = (emit: Emit, to: To) => {
}
}

const fork = () => {
if (Object.keys(cluster.workers).length < 6) {
cluster.fork()
}
}

if (cluster.isPrimary) {
cluster.setupPrimary({
serialization: 'advanced'
})

cluster.on('disconnect', worker => {
console.log('Worker disconnect', worker.id)
fork()
})

cluster.on('exit', worker => {
console.log('Worker exited', worker.id)
fork()
})

let online = 0
cluster.fork()

setInterval(() => {
sendMessageToWorkers({ close: true })
cluster.fork()
}, 1000 * 60 * 60)
fork()
fork()
fork()
fork()
fork()
fork()

setInterval(async () => {
sendMessageToWorkers({ online })
}, ONLINE_REPORT_INTERVAL * 0.7)

cluster.on('message', (_, message: MessageToPrimary) => {
if (message.overLimit) {
cluster.fork()
}
if (message.online !== undefined) {
online += message.online
setTimeout(() => {
online -= message.online
}, ONLINE_REPORT_INTERVAL);
}, ONLINE_REPORT_INTERVAL)
}
})
} else {
Expand Down Expand Up @@ -197,45 +214,10 @@ export const updateVDB = () => {
}
}

const sumOnline = (ios: Server[]) => ios.reduce((sum, io) => sum + (io.engine as any).clientsCount as number, 0)

export const connectionLimit = (server: HTTPServer, ios: Server[]) => {
const worker = cluster.worker
let closed = false

const close = () => {
console.log('Closing Worker ', worker.id)
closed = true
server.close()
setInterval(() => {
if (sumOnline(ios) === 0) {
console.log('Killing Worker ', worker.id)
worker.kill()
}
}, 1000 * 60)
setTimeout(() => {
console.log('Timeout Worker ', worker.id)
worker.kill()
}, 1000 * 60 * 60 * 24)
}
export const setupConnectionMaster = (server: HTTPServer) => {
setupMaster(server)
}

ios.forEach(io => {
io.on('connection', () => {
if (closed) {
return
}
if (sumOnline(ios) >= 128) {
sendMessageToPrimary({ overLimit: true })
close()
}
})
})
process.on('message', ({ close: c }: Message) => {
if (c) {
if (closed) {
return
}
close()
}
})
export const setupConnectionLimit = () => {
setupWorker(ioRaw)
}
12 changes: 6 additions & 6 deletions api/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ const wsRouter = ({ socket }) => ([key, ...rest], map = []) => {
fullInfo: async () => {
const vtbs = await vdb.getPure()
return (await Promise.all(vtbs.map(({ mid }) => mid).map(async mid => {
const i = await info.get(mid)
if (i) {
return { ...i, mid }
}
return undefined
})))
const i = await info.get(mid)
if (i) {
return { ...i, mid }
}
return undefined
})))
.filter(Boolean)
},
async guardMacroK([week = false]) {
Expand Down
Loading

0 comments on commit 1369489

Please sign in to comment.