lua atomic script implemented

This commit is contained in:
mike k
2021-01-23 13:12:37 -05:00
parent 9e02037e3f
commit 50b2e8893a
11 changed files with 484 additions and 3 deletions

View File

@@ -0,0 +1,33 @@
// const redis = require('redis')
const Redis = require("ioredis")
// const RedisClustr = require('redis-clustr')
const {promisify} = require('util')
// const {
// execMultiAsync
// } = require('../../utils')
module.exports = async function connectToRedis({redisCreds, promisifyFnNames=[]}){
let {
host,
port,
password
} = redisCreds
// debugger
// console.log(redisCreds)
port = port || 6379
let asyncRedis = new Redis({
host,
port,
password,
// Not the best solution, but we know
// that Redis Elasticasche on AWS
// has a weird tls related issue,
// so just set this to true solves it
// (not an issue when local)
tls: /amazonaws\.com$/.test(host) ? true : undefined
})
return asyncRedis
}

View File

@@ -1,3 +1,53 @@
const connectToRedis = require('../../library/connect-to-redis')
let asyncRedis
async function beforeTests(){
console.log('hello from setup!') const {
DOCKER_REDIS_HOST,
DOCKER_REDIS_PORT,
DOCKER_REDIS_PASSWORD
} = process.env
const redisCreds = {
host: DOCKER_REDIS_HOST,
port: DOCKER_REDIS_PORT || null, // testing inside of docker-compose so you don't need this
password: DOCKER_REDIS_PASSWORD
}
this.redisCreds = redisCreds
// nock.disableNetConnect()
asyncRedis = await connectToRedis({redisCreds})
}
before(async function(){
await (beforeTests.bind(this))()
})
beforeEach(async function(){
this.timeout(0)
// await dropAllMongoCollections()
// FLUSH ALL REDIS
await asyncRedis.flushall("ASYNC")
// KILL EVERYTHING
await asyncRedis.client('KILL', 'TYPE', 'normal')
await asyncRedis.client('KILL', 'TYPE', 'master')
await asyncRedis.client('KILL', 'TYPE', 'slave')
await asyncRedis.client('KILL', 'TYPE', 'pubsub')
let left = (await asyncRedis.client('list')).match(/(?<=id\=)(\d+)/g)
this.timeout(10000)
// console.log("redis connections left", left)
// debugger
})
after(async function(){
await asyncRedis.quit()
})

92
src/package-lock.json generated
View File

@@ -416,6 +416,12 @@
"mimic-response": "^1.0.0" "mimic-response": "^1.0.0"
} }
}, },
"cluster-key-slot": {
"version": "1.1.0",
"resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz",
"integrity": "sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==",
"dev": true
},
"color-convert": { "color-convert": {
"version": "1.9.3", "version": "1.9.3",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz",
@@ -511,6 +517,12 @@
"object-keys": "^1.0.12" "object-keys": "^1.0.12"
} }
}, },
"denque": {
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-1.5.0.tgz",
"integrity": "sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ==",
"dev": true
},
"diff": { "diff": {
"version": "3.5.0", "version": "3.5.0",
"resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz",
@@ -838,6 +850,41 @@
"integrity": "sha512-iKpRpXP+CrP2jyrxvg1kMUpXDyRUFDWurxbnVT1vQPx+Wz9uCYsMIqYuSBLV+PAaZG/d7kRLKRFc9oDMsH+mFQ==", "integrity": "sha512-iKpRpXP+CrP2jyrxvg1kMUpXDyRUFDWurxbnVT1vQPx+Wz9uCYsMIqYuSBLV+PAaZG/d7kRLKRFc9oDMsH+mFQ==",
"dev": true "dev": true
}, },
"ioredis": {
"version": "4.19.4",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.19.4.tgz",
"integrity": "sha512-3haQWw9dpEjcfVcRktXlayVNrrqvvc2io7Q/uiV2UsYw8/HC2YwwJr78Wql7zu5bzwci0x9bZYA69U7KkevAvw==",
"dev": true,
"requires": {
"cluster-key-slot": "^1.1.0",
"debug": "^4.1.1",
"denque": "^1.1.0",
"lodash.defaults": "^4.2.0",
"lodash.flatten": "^4.4.0",
"p-map": "^2.1.0",
"redis-commands": "1.6.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.0.1"
},
"dependencies": {
"debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"dev": true,
"requires": {
"ms": "2.1.2"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==",
"dev": true
}
}
},
"is-binary-path": { "is-binary-path": {
"version": "2.1.0", "version": "2.1.0",
"resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz",
@@ -1027,6 +1074,18 @@
"integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==", "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==",
"dev": true "dev": true
}, },
"lodash.defaults": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz",
"integrity": "sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=",
"dev": true
},
"lodash.flatten": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz",
"integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=",
"dev": true
},
"log-symbols": { "log-symbols": {
"version": "2.2.0", "version": "2.2.0",
"resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-2.2.0.tgz", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-2.2.0.tgz",
@@ -1283,6 +1342,12 @@
"p-limit": "^2.0.0" "p-limit": "^2.0.0"
} }
}, },
"p-map": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-2.1.0.tgz",
"integrity": "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==",
"dev": true
},
"p-try": { "p-try": {
"version": "2.2.0", "version": "2.2.0",
"resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz",
@@ -1391,6 +1456,27 @@
"picomatch": "^2.2.1" "picomatch": "^2.2.1"
} }
}, },
"redis-commands": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.6.0.tgz",
"integrity": "sha512-2jnZ0IkjZxvguITjFTrGiLyzQZcTvaw8DAaCXxZq/dsHXz7KfMQ3OUJy7Tz9vnRtZRVz6VRCPDvruvU8Ts44wQ==",
"dev": true
},
"redis-errors": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz",
"integrity": "sha1-62LSrbFeTq9GEMBK/hUpOEJQq60=",
"dev": true
},
"redis-parser": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz",
"integrity": "sha1-tm2CjNyv5rS4pCin3vTGvKwxyLQ=",
"dev": true,
"requires": {
"redis-errors": "^1.0.0"
}
},
"registry-auth-token": { "registry-auth-token": {
"version": "4.2.1", "version": "4.2.1",
"resolved": "https://registry.npmjs.org/registry-auth-token/-/registry-auth-token-4.2.1.tgz", "resolved": "https://registry.npmjs.org/registry-auth-token/-/registry-auth-token-4.2.1.tgz",
@@ -1471,6 +1557,12 @@
"integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=",
"dev": true "dev": true
}, },
"standard-as-callback": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.0.1.tgz",
"integrity": "sha512-NQOxSeB8gOI5WjSaxjBgog2QFw55FV8TkS6Y07BiB3VJ8xNTvUYm0wl0s8ObgQ5NhdpnNfigMIKjgPESzgr4tg==",
"dev": true
},
"string-width": { "string-width": {
"version": "2.1.1", "version": "2.1.1",
"resolved": "https://registry.npmjs.org/string-width/-/string-width-2.1.1.tgz", "resolved": "https://registry.npmjs.org/string-width/-/string-width-2.1.1.tgz",

View File

@@ -4,7 +4,7 @@
"description": "Redis leader election implementation that does not have any race conditions", "description": "Redis leader election implementation that does not have any race conditions",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {
"test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts ./src/**/*.test.js" "test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts ./src/atomicGetIsEqualDelete/**/*.test.js"
}, },
"author": "", "author": "",
"license": "ISC", "license": "ISC",
@@ -14,6 +14,7 @@
"lodash": "^4.17.14", "lodash": "^4.17.14",
"mocha": "^6.2.2", "mocha": "^6.2.2",
"nock": "^12.0.2", "nock": "^12.0.2",
"nodemon": "^2.0.4" "nodemon": "^2.0.4",
"ioredis": "^4.16.0"
} }
} }

BIN
src/src/.DS_Store vendored

Binary file not shown.

View File

@@ -0,0 +1,26 @@
const fs = require('fs')
const util = require('util')
const readFile = util.promisify(fs.readFile)
async function atomicGetIsEqualDelete({
asyncRedis,
key,
id
}){
// do lua stuff
if(!asyncRedis.getIsEqualDelete){
const file = await readFile(`${__dirname}/lua/index.lua`, 'utf8')
asyncRedis.defineCommand("getIsEqualDelete", {
numberOfKeys: 1,
lua: file
})
}
const res = await asyncRedis.getIsEqualDelete(key, id)
return res
}
module.exports.atomicGetIsEqualDelete = atomicGetIsEqualDelete

View File

@@ -0,0 +1,10 @@
local key = KEYS[1]
local id = ARGV[1]
local current_id = redis.call('GET', key)
if (id == current_id) then
redis.call('DEL', key)
return 1
else
return 0
end

View File

@@ -0,0 +1,64 @@
const {atomicGetIsEqualDelete} = require('../index')
const connectToRedis = require('../../../library/connect-to-redis')
const {tryCatchIgnore} = require('../../testHelpers')
const {assert} = require('chai')
describe("atomicGetIsEqualDelete", function(){
afterEach(async function(){
await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis")
})
it("should get, compare equality, and not delete if null id in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
assert.isFalse(!!didDelete, "lua script claimed to delete id when it was supposed to be null")
})
it("should get, compare equality, and not delete if different id in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await asyncRedis.set(key, "different-id")
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
assert.isFalse(!!didDelete, "lua script claimed to delete id when it was supposed to be a different id")
})
it("should get, compare equality, and delete when id is equal in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await asyncRedis.set(key, id)
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
console.log(didDelete)
assert.isTrue(!!didDelete, "lua script claimed to skip delete when it was supposed to be the same id")
})
})

187
src/src/index.js Normal file
View File

@@ -0,0 +1,187 @@
'use strict';
var crypto = require('crypto');
var uuid = require('uuid');
var EventEmitter = require('events').EventEmitter;
const {atomicGetIsEqualDelete} = require('./atomicGetIsEqualDelete')
// Make the key less prone to collision
var hashKey = function(key) {
return 'leader:' + crypto.createHash('sha1').update(key).digest('hex');
};
const random = ()=>
crypto.randomBytes(32).toString("base64")
class MainEmitter extends EventEmitter{}
async function createSafeRedisLeader({
asyncRedis,
ttl,
wait,
key
}){
const emitter = new MainEmitter()
const id = hashKey(random())
key = hashKey(key || random());
let renewTimeoutId,
electTimeoutId
async function renew(){
const leading = await isLeader()
if(leading){
await asyncRedis.pexpire(key, ttl)
setTimeout(renew, ttl / 2)
}
else{
clearTimeout(renewTimeoutId)
electTimeoutId = setTimeout(elect, wait);
}
}
async function elect(){
let res
try{
res = await asyncRedis.set(key, id, 'PX', ttl, 'NX')
}
catch(e){
return emitter.emit('error', e)
}
if(res !== null) {
emitter.emit('elected')
renewTimeoutId = setTimeout(renew, ttl / 2)
}
else{
electTimeoutId = setTimeout(elect, wait)
}
}
async function isLeader(){
const curId = await asyncRedis.get(this.key)
return id === curId
}
async function stop(){
// real atomic get -> isEqual -> delete
await atomicGetIsEqualDelete({key, id})
}
function on(name, fn){
emitter.on(name, fn)
}
function off(name, fn){
emitter.off(name, fn)
}
function once(name, fn){
emitter.once(name, fn)
}
function removeAllListeners(){
emitter.removeAllListeners()
}
return {
elect,
isLeader,
stop,
on,
off,
once,
removeAllListeners
}
}
module.exports.createSafeRedisLeader = createSafeRedisLeader
// function Leader(redis, options) {
// options = options || {};
// this.id = uuid.v4();
// this.redis = redis;
// this.options = {};
// this.options.ttl = options.ttl || 10000; // Lock time to live in milliseconds
// this.options.wait = options.wait || 1000; // time between 2 tries to get lock
// this.key = hashKey(options.key || 'default');
// }
// util.inherits(Leader, EventEmitter);
// /**
// * Renew leader as elected
// */
// Leader.prototype._renew = function _renew() {
// // it is safer to check we are still leader
// this.isLeader(function(err, isLeader) {
// if(isLeader) {
// this.redis.pexpire(this.key, this.options.ttl, function(err) {
// if(err) {
// this.emit('error', err);
// }
// }.bind(this));
// } else {
// clearInterval(this.renewId);
// this.electId = setTimeout(Leader.prototype.elect.bind(this), this.options.wait);
// this.emit('revoked');
// }
// }.bind(this));
// };
// /**
// * Try to get elected as leader
// */
// Leader.prototype.elect = function elect() {
// // atomic redis set
// this.redis.set(this.key, this.id, 'PX', this.options.ttl, 'NX', function(err, res) {
// if(err) {
// return this.emit('error', err);
// }
// if(res !== null) {
// this.emit('elected');
// this.renewId = setInterval(Leader.prototype._renew.bind(this), this.options.ttl / 2);
// } else {
// // use setTimeout to avoid max call stack error
// this.electId = setTimeout(Leader.prototype.elect.bind(this), this.options.wait);
// }
// }.bind(this));
// };
// Leader.prototype.isLeader = function isLeader(done) {
// this.redis.get(this.key, function(err, id) {
// if(err) {
// return done(err);
// }
// done(null, (id === this.id));
// }.bind(this));
// };
// /**
// * if leader, stop being a leader
// * stop trying to be a leader
// */
// Leader.prototype.stop = function stop() {
// this.isLeader(function(err, isLeader) {
// if(isLeader) {
// // possible race condition, cause we need atomicity on get -> isEqual -> delete
// this.redis.del(this.key, function(err) {
// if(err) {
// return this.emit('error', err);
// }
// this.emit('revoked');
// }.bind(this));
// }
// clearInterval(this.renewId);
// clearTimeout(this.electId);
// }.bind(this));
// };
// module.exports = Leader;

View File

@@ -0,0 +1,17 @@
async function tryCatchIgnore(fn, errorMessage){
try{
await fn()
}
catch(e){
if(errorMessage){
console.error(errorMessage, e)
}
// we don't care about this since this
// function is strictly used during shutdown
// (afterEach)
}
}
module.exports.tryCatchIgnore = tryCatchIgnore

View File

@@ -1,4 +1,5 @@
describe("Main", function(){ describe("Main", function(){
it("should run", async function(){ it("should run", async function(){
console.log('it ran!') console.log('it ran!')