From a114f2c25e06fb7b29a8eea3fd0644d890b124f3 Mon Sep 17 00:00:00 2001 From: mike k Date: Mon, 1 Feb 2021 17:23:31 -0500 Subject: [PATCH] refactor and ensure no timer related race conditions --- README.md | 2 - docker/compose/redis.yml | 6 +- docker/compose/test.yml | 9 +- docker/scripts/runner.js | 81 +++++++++++++++++- package-lock.json | 2 +- package.json | 3 +- src/examples/multi-client/index.js | 70 +++++++++++++++ src/package.json | 3 +- src/src/atomicGetIsEqualDelete/.DS_Store | Bin 0 -> 6148 bytes .../tests/index.test.js | 1 - src/src/atomicGetIsEqualSetPExpire/.DS_Store | Bin 0 -> 6148 bytes src/src/atomicGetIsEqualSetPExpire/index.js | 27 ++++++ .../atomicGetIsEqualSetPExpire/lua/index.lua | 11 +++ .../tests/index.test.js | 67 +++++++++++++++ src/src/index.js | 46 ++++++++-- 15 files changed, 307 insertions(+), 21 deletions(-) create mode 100644 src/examples/multi-client/index.js create mode 100644 src/src/atomicGetIsEqualDelete/.DS_Store create mode 100644 src/src/atomicGetIsEqualSetPExpire/.DS_Store create mode 100644 src/src/atomicGetIsEqualSetPExpire/index.js create mode 100644 src/src/atomicGetIsEqualSetPExpire/lua/index.lua create mode 100644 src/src/atomicGetIsEqualSetPExpire/tests/index.test.js diff --git a/README.md b/README.md index 6531bda..5add50c 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,6 @@ in one terminal, run the follow index.js: console.log("I'm the leader - 1") }) - await safeLeader.elect() } @@ -92,7 +91,6 @@ In a seperate terminal/tab, run the following index.js: console.log("I'm the leader - 2") }) - await safeLeader.elect() } diff --git a/docker/compose/redis.yml b/docker/compose/redis.yml index 3555950..d8b4cfa 100644 --- a/docker/compose/redis.yml +++ b/docker/compose/redis.yml @@ -10,4 +10,8 @@ services: ports: - ${DOCKER_REDIS_PORT_PUBLIC:-6760}:${DOCKER_REDIS_PORT:-6379} networks: - - safe_redis_leader_dev \ No newline at end of file + - safe_redis_leader_dev +networks: + safe_redis_leader_dev: + name: safe_redis_leader_dev + driver: bridge \ No newline at end of file diff --git a/docker/compose/test.yml b/docker/compose/test.yml index 9998531..ddf9a78 100644 --- a/docker/compose/test.yml +++ b/docker/compose/test.yml @@ -20,18 +20,19 @@ services: - ${PWD}/package.json:/package.json - ${PWD}/package-lock.json:/package-lock.json - ${PWD}/.env:/.env - - ${PWD}/dev-docker-data-cache/test_node_modules:/node_modules:delegated - - ${PWD}/dev-docker-data-cache/package_node_modules:/app/node_modules:delegated + - node_modules_vol:/app/node_modules:delegated - /var/run/docker.sock:/var/run/docker.sock user: root command: > sh -c 'npm install --ignore-scripts --save --loglevel verbose && ${COMPOSE_COMMAND}' ports: - - 9220:9229 + - ${PUBLIC_NODE_DEBUG_PORT:-9220}:9229 networks: - safe_redis_leader_dev networks: safe_redis_leader_dev: name: safe_redis_leader_dev driver: bridge -# docker network ls -f driver=bridge --format "{{.ID}}" | xargs docker network inspect | grep Name +volumes: + node_modules_vol: + name: ${CLIENT_PREFIX_ID:-test-}package_node_modules diff --git a/docker/scripts/runner.js b/docker/scripts/runner.js index ad781e0..80ed527 100644 --- a/docker/scripts/runner.js +++ b/docker/scripts/runner.js @@ -1,8 +1,9 @@ const { spawn, exec } = require('child_process') async function run(){ - let backendComposeCommand = 'npm run test' + let composeCommand = 'npm run test' const isTest = process.env.NODE_ENV === 'test' + const exampleName = process.env.EXAMPLE let command = exec( @@ -15,7 +16,15 @@ async function run(){ ) await waitForCommandStatusWithStdout(command, {onError: ()=>new Error('could not create dev-docker-data-cache directories')}) + if(isTest){ + await startTests({composeCommand}) + } + else if(exampleName === 'multi-client'){ + await startMultiClientExample() + } +} +async function startTests({composeCommand}){ const child2 = spawn( `docker-compose`, [ @@ -33,7 +42,7 @@ async function run(){ { env: { ...process.env, - COMPOSE_COMMAND: backendComposeCommand + COMPOSE_COMMAND: composeCommand }, stdio: 'inherit' } @@ -42,7 +51,75 @@ async function run(){ child2.on("exit", (code, signal)=>{ process.exit(code) }) +} + +async function startMultiClientExample(){ + const projectName = 'safe-redis-leader-multi-client-example' + const child1 = spawn( + `docker-compose`, + [ + "--project-name", + projectName, + "--project-directory", + "./docker/compose", + "-f", + "./docker/compose/redis.yml", + "up", + // "--build" + ], + { + env: { + ...process.env + }, + stdio: 'inherit' + } + ); + + child1.on("exit", (code, signal)=>{ + process.exit(code) + }) + + const totalClients = 2 + + for(let i = 0; i < totalClients; i++){ + await startSingleClient({ + projectName: `${projectName}-${i}`, + id: i, + composeCommand: `SCRIPT_CLIENT_ID=${i} npm run example:multi-client` + }) + } + +} + +async function startSingleClient({composeCommand, projectName, id}){ + const child1 = spawn( + `docker-compose`, + [ + "--project-name", + projectName, + "--project-directory", + "./docker/compose", + "-f", + "./docker/compose/test.yml", + "up", + // "--build" + ], + { + env: { + ...process.env, + COMPOSE_COMMAND: composeCommand, + PUBLIC_NODE_DEBUG_PORT: `922${id}`, + CLIENT_PREFIX_ID: `client-${id}-` + }, + + stdio: 'inherit' + } + ); + + child1.on("exit", (code, signal)=>{ + process.exit(code) + }) } diff --git a/package-lock.json b/package-lock.json index 0846e9d..79c9c09 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,5 +1,5 @@ { "name": "safe-redis-leader", - "version": "0.0.4", + "version": "0.0.5", "lockfileVersion": 1 } diff --git a/package.json b/package.json index e160328..17b6d40 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "description": "Redis leader election implementation that does not have any race conditions", "main": "src/src/index.js", "scripts": { - "test": "npm install && node ./docker/scripts/runner.js" + "test": "npm install && NODE_ENV=test node ./docker/scripts/runner.js", + "example:multi-client": "npm install && EXAMPLE='multi-client' node ./docker/scripts/runner.js" }, "author": "Michael Khirallah", "license": "MIT", diff --git a/src/examples/multi-client/index.js b/src/examples/multi-client/index.js new file mode 100644 index 0000000..c67c94d --- /dev/null +++ b/src/examples/multi-client/index.js @@ -0,0 +1,70 @@ +const {createSafeRedisLeader} = require('../../src') +const Redis = require('ioredis') + +function randomIntFromInterval(min, max) { + return Math.floor(Math.random() * (max - min + 1) + min) +} + +function delay(ms){ + return new Promise((res)=>{ + setTimeout(res, ms) + }) +} + +async function main(){ + const { + DOCKER_REDIS_HOST, + DOCKER_REDIS_PORT, + DOCKER_REDIS_PASSWORD, + SCRIPT_CLIENT_ID + } = process.env + + const redisCreds = { + host: DOCKER_REDIS_HOST, + port: DOCKER_REDIS_PORT || null, // inside of docker-compose so you don't need this + password: DOCKER_REDIS_PASSWORD + } + + const asyncRedis = new Redis(redisCreds) + + + const leaderElectionKey = 'the-election' + + const safeLeader = await createSafeRedisLeader({ + asyncRedis: asyncRedis, + ttl: 1000, + wait: 2000, + key: leaderElectionKey + }) + + safeLeader.on("elected", ()=>{ + console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - current leader`) + }) + + safeLeader.on("demoted", ()=>{ + console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - demoted`) + }) + + await delay(2000) + console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - starting`) + await safeLeader.elect() + + + while(true){ + await delay(randomIntFromInterval(1, 4) * 1000) + console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - removing self from candidate pool`) + await safeLeader.stop() + await delay(randomIntFromInterval(1, 4) * 1000) + console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - re-entering candidate pool`) + await safeLeader.elect() + } + +} + + + + +main().catch((e)=>{ + console.error(e) + process.exit(1) +}) \ No newline at end of file diff --git a/src/package.json b/src/package.json index 1571433..a8d5a3e 100644 --- a/src/package.json +++ b/src/package.json @@ -4,7 +4,8 @@ "description": "Redis leader election implementation that does not have any race conditions", "main": "index.js", "scripts": { - "test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts ./src/**/*.test.js" + "test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts './src/**/*.test.js'", + "example:multi-client": "node ./examples/multi-client/index.js" }, "author": "", "license": "ISC", diff --git a/src/src/atomicGetIsEqualDelete/.DS_Store b/src/src/atomicGetIsEqualDelete/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..681c500bdd9cdc884e269399f646b26ea3256ec9 GIT binary patch literal 6148 zcmeH~J!%6%427R!7lt%jrkutH$PET#pTHNeA&|z{5OC@_dY*n7Z&I5jJc0B^niaeI z6+0^cw!4440~3G^-4$C8Gc)EZTyV$v`}lRe+^(Ojcolewm>DY*X8W}*5djep0TB=Z z5m*p`JjJnjUC=Y>QA9ulmO;S34~_2HOGj#aIv8REpsraC<2q&uYV!iMmyT3cXjapM zRjb7q;`L~!wz{sDj?`?2)$n0;XY(nBX4wvFOla0a6huG-W&~E5Pk#O%=zq=svlgWy zAOio4fUWn({f;kHXX~%m^ZKc(z8-XHT+Zz6qLeq~xU{DZ& HrxN%9xC{}C literal 0 HcmV?d00001 diff --git a/src/src/atomicGetIsEqualDelete/tests/index.test.js b/src/src/atomicGetIsEqualDelete/tests/index.test.js index 6ab6397..eb0e486 100644 --- a/src/src/atomicGetIsEqualDelete/tests/index.test.js +++ b/src/src/atomicGetIsEqualDelete/tests/index.test.js @@ -54,7 +54,6 @@ describe("atomicGetIsEqualDelete", function(){ key, id }) - console.log(didDelete) assert.isTrue(!!didDelete, "lua script claimed to skip delete when it was supposed to be the same id") }) diff --git a/src/src/atomicGetIsEqualSetPExpire/.DS_Store b/src/src/atomicGetIsEqualSetPExpire/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..681c500bdd9cdc884e269399f646b26ea3256ec9 GIT binary patch literal 6148 zcmeH~J!%6%427R!7lt%jrkutH$PET#pTHNeA&|z{5OC@_dY*n7Z&I5jJc0B^niaeI z6+0^cw!4440~3G^-4$C8Gc)EZTyV$v`}lRe+^(Ojcolewm>DY*X8W}*5djep0TB=Z z5m*p`JjJnjUC=Y>QA9ulmO;S34~_2HOGj#aIv8REpsraC<2q&uYV!iMmyT3cXjapM zRjb7q;`L~!wz{sDj?`?2)$n0;XY(nBX4wvFOla0a6huG-W&~E5Pk#O%=zq=svlgWy zAOio4fUWn({f;kHXX~%m^ZKc(z8-XHT+Zz6qLeq~xU{DZ& HrxN%9xC{}C literal 0 HcmV?d00001 diff --git a/src/src/atomicGetIsEqualSetPExpire/index.js b/src/src/atomicGetIsEqualSetPExpire/index.js new file mode 100644 index 0000000..a355394 --- /dev/null +++ b/src/src/atomicGetIsEqualSetPExpire/index.js @@ -0,0 +1,27 @@ +const fs = require('fs') +const util = require('util') +const readFile = util.promisify(fs.readFile) + + +async function atomicGetIsEqualSetPExpire({ + asyncRedis, + key, + id, + ms +}){ + // do lua stuff + + if(!asyncRedis.getIsEqualSetPExpire){ + const file = await readFile(`${__dirname}/lua/index.lua`, 'utf8') + + asyncRedis.defineCommand("getIsEqualSetPExpire", { + numberOfKeys: 1, + lua: file + }) + } + + const res = await asyncRedis.getIsEqualSetPExpire(key, id, ms) + return res +} + +module.exports.atomicGetIsEqualSetPExpire = atomicGetIsEqualSetPExpire \ No newline at end of file diff --git a/src/src/atomicGetIsEqualSetPExpire/lua/index.lua b/src/src/atomicGetIsEqualSetPExpire/lua/index.lua new file mode 100644 index 0000000..68893e8 --- /dev/null +++ b/src/src/atomicGetIsEqualSetPExpire/lua/index.lua @@ -0,0 +1,11 @@ +local key = KEYS[1] +local id = ARGV[1] +local ms = ARGV[2] + +local current_id = redis.call('GET', key) +if (id == current_id) then + redis.call('PEXPIRE', key, ms) + return 1 +else + return 0 +end \ No newline at end of file diff --git a/src/src/atomicGetIsEqualSetPExpire/tests/index.test.js b/src/src/atomicGetIsEqualSetPExpire/tests/index.test.js new file mode 100644 index 0000000..ffc71f3 --- /dev/null +++ b/src/src/atomicGetIsEqualSetPExpire/tests/index.test.js @@ -0,0 +1,67 @@ +const {atomicGetIsEqualSetPExpire} = require('../index') +const connectToRedis = require('../../../library/connect-to-redis') +const {tryCatchIgnore} = require('../../testHelpers') +const {assert} = require('chai') + +describe("atomicGetIsEqualSetPExpire", function(){ + + afterEach(async function(){ + await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis") + }) + + it("should get, compare equality, and not set pexpire when id not equal to self in lua script for redis", async function(){ + const key = "my-key" + const id = "the-id" + const ms = 3000 + const asyncRedis = await connectToRedis({redisCreds: this.redisCreds}) + this.asyncRedis = asyncRedis + await this.asyncRedis.set(key, "different-id") + + const didExtend = await atomicGetIsEqualSetPExpire({ + asyncRedis, + key, + id, + ms + }) + + assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been inequal") + }) + + + it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){ + const key = "my-key" + const id = "the-id" + const ms = 3000 + const asyncRedis = await connectToRedis({redisCreds: this.redisCreds}) + this.asyncRedis = asyncRedis + // this.asyncRedis.set(key, "different-id") + + const didExtend = await atomicGetIsEqualSetPExpire({ + asyncRedis, + key, + id, + ms + }) + + assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been null") + }) + + + it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){ + const key = "my-key" + const id = "the-id" + const ms = 3000 + const asyncRedis = await connectToRedis({redisCreds: this.redisCreds}) + this.asyncRedis = asyncRedis + await this.asyncRedis.set(key, id) + + const didExtend = await atomicGetIsEqualSetPExpire({ + asyncRedis, + key, + id, + ms + }) + assert.isTrue(!!didExtend, "lua script didn't extend pexpire when id should have been equal") + }) + +}) \ No newline at end of file diff --git a/src/src/index.js b/src/src/index.js index 15eba7a..cd3f95e 100644 --- a/src/src/index.js +++ b/src/src/index.js @@ -3,6 +3,8 @@ var crypto = require('crypto'); var EventEmitter = require('events').EventEmitter; const {atomicGetIsEqualDelete} = require('./atomicGetIsEqualDelete') +const {atomicGetIsEqualSetPExpire} = require('./atomicGetIsEqualSetPExpire') + // Make the key less prone to collision var hashKey = function(key) { @@ -28,18 +30,31 @@ async function createSafeRedisLeader({ let isStarted = false + let wasLeading = false + let canLead = false async function renew(){ await emitOnError(async ()=>{ - const leading = await isLeader() - if(leading){ - await asyncRedis.pexpire(key, ttl) - setTimeout(renew, ttl / 2) + const isLeading = await atomicGetIsEqualSetPExpire({ + asyncRedis, + key, + id, + ms: ttl + }) + + + if(isLeading){ + wasLeading = true + renewTimeoutId = setTimeout(renew, ttl / 2) } else{ + if(wasLeading){ + wasLeading = false + emitter.emit('demoted') + } clearTimeout(renewTimeoutId) - electTimeoutId = setTimeout(elect, wait); + electTimeoutId = setTimeout(runElection, wait) } }) } @@ -47,19 +62,23 @@ async function createSafeRedisLeader({ async function runElection(){ await emitOnError(async ()=>{ const res = await asyncRedis.set(key, id, 'PX', ttl, 'NX') - if(res !== null) { emitter.emit('elected') + wasLeading = true + if(!canLead){ + return await stop() + } renewTimeoutId = setTimeout(renew, ttl / 2) } else{ - electTimeoutId = setTimeout(elect, wait) + electTimeoutId = setTimeout(runElection, wait) } }) } async function elect(){ isStarted = true + canLead = true await runElection() } @@ -72,8 +91,18 @@ async function createSafeRedisLeader({ } async function stop(){ + canLead = false // real atomic get -> isEqual -> delete - await atomicGetIsEqualDelete({asyncRedis, key, id}) + renewTimeoutId && clearTimeout(renewTimeoutId) + electTimeoutId && clearTimeout(electTimeoutId) + const res = await atomicGetIsEqualDelete({asyncRedis, key, id}) + // a 1 indicates that we successfully deleted + // our leadership id which means we were + // the leader at time time of stop + if(res === 1){ + emitter.emit('demoted') + } + wasLeading = false } function on(name, fn){ @@ -105,6 +134,7 @@ async function createSafeRedisLeader({ async function shutdown(){ isStarted = false + canLead = false renewTimeoutId && clearTimeout(renewTimeoutId) electTimeoutId && clearTimeout(electTimeoutId) await stop()