mirror of
https://github.com/lingble/safe-redis-leader.git
synced 2025-10-29 11:42:34 +00:00
refactor and ensure no timer related race conditions
This commit is contained in:
@@ -53,7 +53,6 @@ in one terminal, run the follow index.js:
|
||||
console.log("I'm the leader - 1")
|
||||
})
|
||||
|
||||
|
||||
await safeLeader.elect()
|
||||
}
|
||||
|
||||
@@ -92,7 +91,6 @@ In a seperate terminal/tab, run the following index.js:
|
||||
console.log("I'm the leader - 2")
|
||||
})
|
||||
|
||||
|
||||
await safeLeader.elect()
|
||||
}
|
||||
|
||||
|
||||
@@ -11,3 +11,7 @@ services:
|
||||
- ${DOCKER_REDIS_PORT_PUBLIC:-6760}:${DOCKER_REDIS_PORT:-6379}
|
||||
networks:
|
||||
- safe_redis_leader_dev
|
||||
networks:
|
||||
safe_redis_leader_dev:
|
||||
name: safe_redis_leader_dev
|
||||
driver: bridge
|
||||
@@ -20,18 +20,19 @@ services:
|
||||
- ${PWD}/package.json:/package.json
|
||||
- ${PWD}/package-lock.json:/package-lock.json
|
||||
- ${PWD}/.env:/.env
|
||||
- ${PWD}/dev-docker-data-cache/test_node_modules:/node_modules:delegated
|
||||
- ${PWD}/dev-docker-data-cache/package_node_modules:/app/node_modules:delegated
|
||||
- node_modules_vol:/app/node_modules:delegated
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
user: root
|
||||
command: >
|
||||
sh -c 'npm install --ignore-scripts --save --loglevel verbose && ${COMPOSE_COMMAND}'
|
||||
ports:
|
||||
- 9220:9229
|
||||
- ${PUBLIC_NODE_DEBUG_PORT:-9220}:9229
|
||||
networks:
|
||||
- safe_redis_leader_dev
|
||||
networks:
|
||||
safe_redis_leader_dev:
|
||||
name: safe_redis_leader_dev
|
||||
driver: bridge
|
||||
# docker network ls -f driver=bridge --format "{{.ID}}" | xargs docker network inspect | grep Name
|
||||
volumes:
|
||||
node_modules_vol:
|
||||
name: ${CLIENT_PREFIX_ID:-test-}package_node_modules
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
const { spawn, exec } = require('child_process')
|
||||
|
||||
async function run(){
|
||||
let backendComposeCommand = 'npm run test'
|
||||
let composeCommand = 'npm run test'
|
||||
const isTest = process.env.NODE_ENV === 'test'
|
||||
const exampleName = process.env.EXAMPLE
|
||||
|
||||
|
||||
let command = exec(
|
||||
@@ -15,7 +16,15 @@ async function run(){
|
||||
)
|
||||
await waitForCommandStatusWithStdout(command, {onError: ()=>new Error('could not create dev-docker-data-cache directories')})
|
||||
|
||||
if(isTest){
|
||||
await startTests({composeCommand})
|
||||
}
|
||||
else if(exampleName === 'multi-client'){
|
||||
await startMultiClientExample()
|
||||
}
|
||||
}
|
||||
|
||||
async function startTests({composeCommand}){
|
||||
const child2 = spawn(
|
||||
`docker-compose`,
|
||||
[
|
||||
@@ -33,7 +42,7 @@ async function run(){
|
||||
{
|
||||
env: {
|
||||
...process.env,
|
||||
COMPOSE_COMMAND: backendComposeCommand
|
||||
COMPOSE_COMMAND: composeCommand
|
||||
},
|
||||
stdio: 'inherit'
|
||||
}
|
||||
@@ -42,7 +51,75 @@ async function run(){
|
||||
child2.on("exit", (code, signal)=>{
|
||||
process.exit(code)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
async function startMultiClientExample(){
|
||||
const projectName = 'safe-redis-leader-multi-client-example'
|
||||
const child1 = spawn(
|
||||
`docker-compose`,
|
||||
[
|
||||
"--project-name",
|
||||
projectName,
|
||||
"--project-directory",
|
||||
"./docker/compose",
|
||||
"-f",
|
||||
"./docker/compose/redis.yml",
|
||||
"up",
|
||||
// "--build"
|
||||
],
|
||||
{
|
||||
env: {
|
||||
...process.env
|
||||
},
|
||||
stdio: 'inherit'
|
||||
}
|
||||
);
|
||||
|
||||
child1.on("exit", (code, signal)=>{
|
||||
process.exit(code)
|
||||
})
|
||||
|
||||
const totalClients = 2
|
||||
|
||||
for(let i = 0; i < totalClients; i++){
|
||||
await startSingleClient({
|
||||
projectName: `${projectName}-${i}`,
|
||||
id: i,
|
||||
composeCommand: `SCRIPT_CLIENT_ID=${i} npm run example:multi-client`
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
async function startSingleClient({composeCommand, projectName, id}){
|
||||
const child1 = spawn(
|
||||
`docker-compose`,
|
||||
[
|
||||
"--project-name",
|
||||
projectName,
|
||||
"--project-directory",
|
||||
"./docker/compose",
|
||||
"-f",
|
||||
"./docker/compose/test.yml",
|
||||
"up",
|
||||
// "--build"
|
||||
],
|
||||
{
|
||||
env: {
|
||||
...process.env,
|
||||
COMPOSE_COMMAND: composeCommand,
|
||||
PUBLIC_NODE_DEBUG_PORT: `922${id}`,
|
||||
CLIENT_PREFIX_ID: `client-${id}-`
|
||||
},
|
||||
|
||||
stdio: 'inherit'
|
||||
}
|
||||
);
|
||||
|
||||
child1.on("exit", (code, signal)=>{
|
||||
process.exit(code)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
2
package-lock.json
generated
2
package-lock.json
generated
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"name": "safe-redis-leader",
|
||||
"version": "0.0.4",
|
||||
"version": "0.0.5",
|
||||
"lockfileVersion": 1
|
||||
}
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
"description": "Redis leader election implementation that does not have any race conditions",
|
||||
"main": "src/src/index.js",
|
||||
"scripts": {
|
||||
"test": "npm install && node ./docker/scripts/runner.js"
|
||||
"test": "npm install && NODE_ENV=test node ./docker/scripts/runner.js",
|
||||
"example:multi-client": "npm install && EXAMPLE='multi-client' node ./docker/scripts/runner.js"
|
||||
},
|
||||
"author": "Michael Khirallah",
|
||||
"license": "MIT",
|
||||
|
||||
70
src/examples/multi-client/index.js
Normal file
70
src/examples/multi-client/index.js
Normal file
@@ -0,0 +1,70 @@
|
||||
const {createSafeRedisLeader} = require('../../src')
|
||||
const Redis = require('ioredis')
|
||||
|
||||
function randomIntFromInterval(min, max) {
|
||||
return Math.floor(Math.random() * (max - min + 1) + min)
|
||||
}
|
||||
|
||||
function delay(ms){
|
||||
return new Promise((res)=>{
|
||||
setTimeout(res, ms)
|
||||
})
|
||||
}
|
||||
|
||||
async function main(){
|
||||
const {
|
||||
DOCKER_REDIS_HOST,
|
||||
DOCKER_REDIS_PORT,
|
||||
DOCKER_REDIS_PASSWORD,
|
||||
SCRIPT_CLIENT_ID
|
||||
} = process.env
|
||||
|
||||
const redisCreds = {
|
||||
host: DOCKER_REDIS_HOST,
|
||||
port: DOCKER_REDIS_PORT || null, // inside of docker-compose so you don't need this
|
||||
password: DOCKER_REDIS_PASSWORD
|
||||
}
|
||||
|
||||
const asyncRedis = new Redis(redisCreds)
|
||||
|
||||
|
||||
const leaderElectionKey = 'the-election'
|
||||
|
||||
const safeLeader = await createSafeRedisLeader({
|
||||
asyncRedis: asyncRedis,
|
||||
ttl: 1000,
|
||||
wait: 2000,
|
||||
key: leaderElectionKey
|
||||
})
|
||||
|
||||
safeLeader.on("elected", ()=>{
|
||||
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - current leader`)
|
||||
})
|
||||
|
||||
safeLeader.on("demoted", ()=>{
|
||||
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - demoted`)
|
||||
})
|
||||
|
||||
await delay(2000)
|
||||
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - starting`)
|
||||
await safeLeader.elect()
|
||||
|
||||
|
||||
while(true){
|
||||
await delay(randomIntFromInterval(1, 4) * 1000)
|
||||
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - removing self from candidate pool`)
|
||||
await safeLeader.stop()
|
||||
await delay(randomIntFromInterval(1, 4) * 1000)
|
||||
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - re-entering candidate pool`)
|
||||
await safeLeader.elect()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
main().catch((e)=>{
|
||||
console.error(e)
|
||||
process.exit(1)
|
||||
})
|
||||
@@ -4,7 +4,8 @@
|
||||
"description": "Redis leader election implementation that does not have any race conditions",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts ./src/**/*.test.js"
|
||||
"test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts './src/**/*.test.js'",
|
||||
"example:multi-client": "node ./examples/multi-client/index.js"
|
||||
},
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
|
||||
BIN
src/src/atomicGetIsEqualDelete/.DS_Store
vendored
Normal file
BIN
src/src/atomicGetIsEqualDelete/.DS_Store
vendored
Normal file
Binary file not shown.
@@ -54,7 +54,6 @@ describe("atomicGetIsEqualDelete", function(){
|
||||
key,
|
||||
id
|
||||
})
|
||||
console.log(didDelete)
|
||||
assert.isTrue(!!didDelete, "lua script claimed to skip delete when it was supposed to be the same id")
|
||||
|
||||
})
|
||||
|
||||
BIN
src/src/atomicGetIsEqualSetPExpire/.DS_Store
vendored
Normal file
BIN
src/src/atomicGetIsEqualSetPExpire/.DS_Store
vendored
Normal file
Binary file not shown.
27
src/src/atomicGetIsEqualSetPExpire/index.js
Normal file
27
src/src/atomicGetIsEqualSetPExpire/index.js
Normal file
@@ -0,0 +1,27 @@
|
||||
const fs = require('fs')
|
||||
const util = require('util')
|
||||
const readFile = util.promisify(fs.readFile)
|
||||
|
||||
|
||||
async function atomicGetIsEqualSetPExpire({
|
||||
asyncRedis,
|
||||
key,
|
||||
id,
|
||||
ms
|
||||
}){
|
||||
// do lua stuff
|
||||
|
||||
if(!asyncRedis.getIsEqualSetPExpire){
|
||||
const file = await readFile(`${__dirname}/lua/index.lua`, 'utf8')
|
||||
|
||||
asyncRedis.defineCommand("getIsEqualSetPExpire", {
|
||||
numberOfKeys: 1,
|
||||
lua: file
|
||||
})
|
||||
}
|
||||
|
||||
const res = await asyncRedis.getIsEqualSetPExpire(key, id, ms)
|
||||
return res
|
||||
}
|
||||
|
||||
module.exports.atomicGetIsEqualSetPExpire = atomicGetIsEqualSetPExpire
|
||||
11
src/src/atomicGetIsEqualSetPExpire/lua/index.lua
Normal file
11
src/src/atomicGetIsEqualSetPExpire/lua/index.lua
Normal file
@@ -0,0 +1,11 @@
|
||||
local key = KEYS[1]
|
||||
local id = ARGV[1]
|
||||
local ms = ARGV[2]
|
||||
|
||||
local current_id = redis.call('GET', key)
|
||||
if (id == current_id) then
|
||||
redis.call('PEXPIRE', key, ms)
|
||||
return 1
|
||||
else
|
||||
return 0
|
||||
end
|
||||
67
src/src/atomicGetIsEqualSetPExpire/tests/index.test.js
Normal file
67
src/src/atomicGetIsEqualSetPExpire/tests/index.test.js
Normal file
@@ -0,0 +1,67 @@
|
||||
const {atomicGetIsEqualSetPExpire} = require('../index')
|
||||
const connectToRedis = require('../../../library/connect-to-redis')
|
||||
const {tryCatchIgnore} = require('../../testHelpers')
|
||||
const {assert} = require('chai')
|
||||
|
||||
describe("atomicGetIsEqualSetPExpire", function(){
|
||||
|
||||
afterEach(async function(){
|
||||
await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis")
|
||||
})
|
||||
|
||||
it("should get, compare equality, and not set pexpire when id not equal to self in lua script for redis", async function(){
|
||||
const key = "my-key"
|
||||
const id = "the-id"
|
||||
const ms = 3000
|
||||
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
|
||||
this.asyncRedis = asyncRedis
|
||||
await this.asyncRedis.set(key, "different-id")
|
||||
|
||||
const didExtend = await atomicGetIsEqualSetPExpire({
|
||||
asyncRedis,
|
||||
key,
|
||||
id,
|
||||
ms
|
||||
})
|
||||
|
||||
assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been inequal")
|
||||
})
|
||||
|
||||
|
||||
it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){
|
||||
const key = "my-key"
|
||||
const id = "the-id"
|
||||
const ms = 3000
|
||||
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
|
||||
this.asyncRedis = asyncRedis
|
||||
// this.asyncRedis.set(key, "different-id")
|
||||
|
||||
const didExtend = await atomicGetIsEqualSetPExpire({
|
||||
asyncRedis,
|
||||
key,
|
||||
id,
|
||||
ms
|
||||
})
|
||||
|
||||
assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been null")
|
||||
})
|
||||
|
||||
|
||||
it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){
|
||||
const key = "my-key"
|
||||
const id = "the-id"
|
||||
const ms = 3000
|
||||
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
|
||||
this.asyncRedis = asyncRedis
|
||||
await this.asyncRedis.set(key, id)
|
||||
|
||||
const didExtend = await atomicGetIsEqualSetPExpire({
|
||||
asyncRedis,
|
||||
key,
|
||||
id,
|
||||
ms
|
||||
})
|
||||
assert.isTrue(!!didExtend, "lua script didn't extend pexpire when id should have been equal")
|
||||
})
|
||||
|
||||
})
|
||||
@@ -3,6 +3,8 @@
|
||||
var crypto = require('crypto');
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
const {atomicGetIsEqualDelete} = require('./atomicGetIsEqualDelete')
|
||||
const {atomicGetIsEqualSetPExpire} = require('./atomicGetIsEqualSetPExpire')
|
||||
|
||||
|
||||
// Make the key less prone to collision
|
||||
var hashKey = function(key) {
|
||||
@@ -28,18 +30,31 @@ async function createSafeRedisLeader({
|
||||
|
||||
let isStarted = false
|
||||
|
||||
let wasLeading = false
|
||||
let canLead = false
|
||||
|
||||
|
||||
async function renew(){
|
||||
await emitOnError(async ()=>{
|
||||
const leading = await isLeader()
|
||||
if(leading){
|
||||
await asyncRedis.pexpire(key, ttl)
|
||||
setTimeout(renew, ttl / 2)
|
||||
const isLeading = await atomicGetIsEqualSetPExpire({
|
||||
asyncRedis,
|
||||
key,
|
||||
id,
|
||||
ms: ttl
|
||||
})
|
||||
|
||||
|
||||
if(isLeading){
|
||||
wasLeading = true
|
||||
renewTimeoutId = setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
if(wasLeading){
|
||||
wasLeading = false
|
||||
emitter.emit('demoted')
|
||||
}
|
||||
clearTimeout(renewTimeoutId)
|
||||
electTimeoutId = setTimeout(elect, wait);
|
||||
electTimeoutId = setTimeout(runElection, wait)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -47,19 +62,23 @@ async function createSafeRedisLeader({
|
||||
async function runElection(){
|
||||
await emitOnError(async ()=>{
|
||||
const res = await asyncRedis.set(key, id, 'PX', ttl, 'NX')
|
||||
|
||||
if(res !== null) {
|
||||
emitter.emit('elected')
|
||||
wasLeading = true
|
||||
if(!canLead){
|
||||
return await stop()
|
||||
}
|
||||
renewTimeoutId = setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
electTimeoutId = setTimeout(elect, wait)
|
||||
electTimeoutId = setTimeout(runElection, wait)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function elect(){
|
||||
isStarted = true
|
||||
canLead = true
|
||||
await runElection()
|
||||
}
|
||||
|
||||
@@ -72,8 +91,18 @@ async function createSafeRedisLeader({
|
||||
}
|
||||
|
||||
async function stop(){
|
||||
canLead = false
|
||||
// real atomic get -> isEqual -> delete
|
||||
await atomicGetIsEqualDelete({asyncRedis, key, id})
|
||||
renewTimeoutId && clearTimeout(renewTimeoutId)
|
||||
electTimeoutId && clearTimeout(electTimeoutId)
|
||||
const res = await atomicGetIsEqualDelete({asyncRedis, key, id})
|
||||
// a 1 indicates that we successfully deleted
|
||||
// our leadership id which means we were
|
||||
// the leader at time time of stop
|
||||
if(res === 1){
|
||||
emitter.emit('demoted')
|
||||
}
|
||||
wasLeading = false
|
||||
}
|
||||
|
||||
function on(name, fn){
|
||||
@@ -105,6 +134,7 @@ async function createSafeRedisLeader({
|
||||
|
||||
async function shutdown(){
|
||||
isStarted = false
|
||||
canLead = false
|
||||
renewTimeoutId && clearTimeout(renewTimeoutId)
|
||||
electTimeoutId && clearTimeout(electTimeoutId)
|
||||
await stop()
|
||||
|
||||
Reference in New Issue
Block a user