feature: ts-safe-redis-leader

This commit is contained in:
Marc G
2021-07-15 15:32:56 +00:00
parent 34b46e2fdb
commit 4c6e730a9f
36 changed files with 11724 additions and 2877 deletions

72
.eslintrc.js Normal file
View File

@@ -0,0 +1,72 @@
module.exports = {
parser: "@typescript-eslint/parser",
parserOptions: {
project: "tsconfig.json",
sourceType: "module",
},
plugins: ["@typescript-eslint/eslint-plugin", "jest"],
extends: ["airbnb-base"],
root: true,
env: {
node: true,
jest: true,
},
ignorePatterns: [".eslintrc.js"],
rules: {
"@typescript-eslint/interface-name-prefix": "off",
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/explicit-module-boundary-types": "off",
"@typescript-eslint/no-explicit-any": "off",
quotes: [2, "double", { avoidEscape: true }],
indent: [2, 2, { SwitchCase: 1 }],
"consistent-return": "off",
// overwritting AirBnB styleguide because this doesn<t play well with eslint-prettier
// "operator-linebreak": [2, "after", { overrides: { "?": "after" } }],
"class-methods-use-this": "off",
"import/prefer-default-export": "off",
"@typescript-eslint/explicit-member-accessibility": [
"error",
{
accessibility: "explicit",
overrides: {
constructors: "no-public",
},
},
],
"no-empty-function": "off",
"@typescript-eslint/no-empty-function": ["error", { allow: ["constructors"] }],
"@typescript-eslint/no-inferrable-types": "error",
"@typescript-eslint/explicit-function-return-type": "error",
"import/extensions": [
"error",
"ignorePackages",
{
ts: "never",
tsx: "never",
},
],
"no-shadow": "off",
"@typescript-eslint/no-shadow": "error",
"no-unused-vars": "off",
"@typescript-eslint/no-unused-vars": ["error", { ignoreRestSiblings: true }],
"no-useless-constructor": "off",
"@typescript-eslint/no-useless-constructor": "error",
"import/no-extraneous-dependencies": "off",
"object-curly-newline": ["error", { ObjectPattern: { multiline: true } }],
"no-unused-expressions": ["error", { allowShortCircuit: true, allowTernary: true }],
"no-use-before-define": "off",
"no-underscore-dangle": "off",
"jest/no-disabled-tests": "warn",
"jest/no-focused-tests": "error",
"jest/no-identical-title": "error",
"jest/prefer-to-have-length": "warn",
"jest/valid-expect": "error",
},
settings: {
"import/resolver": {
node: {
extensions: [".js", ".jsx", ".ts", ".tsx"],
},
},
},
};

5
.gitignore vendored
View File

@@ -1,2 +1,3 @@
dev-docker-data-cache
node_modules
node_modules
.vscode
**/.DS_Store

117
README.md
View File

@@ -1,113 +1,46 @@
# Safe Redis Leader
Fork of: [Safe Redis Leader](https://www.npmjs.com/package/safe-redis-leader)
## Goal
The Safe Redis Leader JS module is designed to provide a leader election implementation that provides tested gaurentees that there is only a single leader elected from a group of clients at one time.
The Safe Redis Leader TS module is designed to provide a leader election implementation that provides tested gaurentees that there is only a single leader elected from a group of clients at one time.
The implementation is a port of the stale [Redis Leader npm package](https://github.com/pierreinglebert/redis-leader) that implements a solution to the [known race condition](https://github.com/pierreinglebert/redis-leader/blob/c3b4db5df9802908728ad0ae4310a52e74acb462/index.js#L81). Additionally, this rewritten package:
1. Removes the usage of `.bind` and `this`, as well as prototype inheritance (Without introducing classes in the main impl)
2. Only exposes public api functions that should be exposed (no more public-but-should-be-private `_elect` fn)
3. has a test suite within docker-compose using a real redis instance, which allows anyone to run the tests with no heavy dependency setup
4. Has tests to assert the known race condition can no longer occur
5. removes the need for `new`, by providing a simple `createSafeRedisLeader(...)` public fn
6. Replace callback-hell with async/await
1. Only exposes public api functions that should be exposed (no more public-but-should-be-private `_elect` fn)
2. has a test suite within docker-compose using a real redis instance, which allows anyone to run the tests with no heavy dependency setup
3. Has tests to assert the known race condition can no longer occur
4. Replace callback-hell with async/await
5. Built with typescript
## Usage
Install the package:
```bash
npm install --save safe-redis-leader
npm install ts-safe-redis-leader
```
Exemple:
```typescript
import * as Redis from "ioredis";
import { SafeRedisLeader } from "../src";
const redisConfig: Redis.RedisOptions = {
port: 6379,
host: "localhost",
autoResubscribe: false,
lazyConnect: true,
maxRetriesPerRequest: 0,
};
const redisClient = new Redis(redisConfig);
in one terminal, run the follow index.js:
const leaderElectionKey = 'the-election';
const safeLeader = new SafeRedisLeader(redisClients[i], 1500, 3000, leaderElectionKey);
```javascript
const {createSafeRedisLeader} = require('safe-redis-leader')
const Redis = require('ioredis')
async function main(){
const asyncRedis = new Redis({
host: "locahost",
port: 6379,
password: "some-password"
})
const leaderElectionKey = 'the-election'
const safeLeader = await createSafeRedisLeader({
asyncRedis: asyncRedis,
ttl: 1500,
wait: 3000,
key: leaderElectionKey
})
safeLeader.on("elected", ()=>{
console.log("I'm the leader - 1")
})
await safeLeader.elect()
}
main().catch((e)=>{
console.error(e)
process.exit(1)
})
await safeLeader.elect();
```
In a seperate terminal/tab, run the following index.js:
```javascript
const {createSafeRedisLeader} = require('safe-redis-leader')
const Redis = require('ioredis')
async function main(){
const asyncRedis = new Redis({
host: "locahost",
port: 6379,
password: "some-password"
})
const leaderElectionKey = 'the-election'
const safeLeader = await createSafeRedisLeader({
asyncRedis: asyncRedis,
ttl: 1500,
wait: 3000,
key: leaderElectionKey
})
safeLeader.on("elected", ()=>{
console.log("I'm the leader - 2")
})
await safeLeader.elect()
}
main().catch((e)=>{
console.error(e)
process.exit(1)
})
```
## Run Library Tests
npm run docker:test
# License
MIT

View File

@@ -1,17 +0,0 @@
version: '3.7'
services:
redis_dev:
# container_name: hero-dev-redis
image: redis:alpine
restart: always
volumes:
- ${PWD}/dev-docker-data-cache/redis-data:/data
command: ['redis-server', '--appendonly', 'yes', '--requirepass', 'redis_dev_password']
ports:
- ${DOCKER_REDIS_PORT_PUBLIC:-6760}:${DOCKER_REDIS_PORT:-6379}
networks:
- safe_redis_leader_dev
networks:
safe_redis_leader_dev:
name: safe_redis_leader_dev
driver: bridge

View File

@@ -1,38 +0,0 @@
version: '3.7'
services:
safe_redis_leader_backend:
build:
context: ../../
dockerfile: ./docker/dockerfiles/Dockerfile
# container_name: safe_redis_leader_dev_backend
environment:
NODE_ENV: ${NODE_ENV}
DOCKER_REDIS_HOST: redis_dev
DOCKER_REDIS_PASSWORD: redis_dev_password
DOCKER_REDIS_PORT: ${DOCKER_REDIS_PORT:-6379}
NETWORK_NAME: safe_redis_leader_dev
tty: true
stdin_open: true
working_dir: /app
volumes:
- ${PWD}/docker:/docker:delegated
- ${PWD}/src:/app:delegated
- ${PWD}/package.json:/package.json
- ${PWD}/package-lock.json:/package-lock.json
- ${PWD}/.env:/.env
- node_modules_vol:/app/node_modules:delegated
- /var/run/docker.sock:/var/run/docker.sock
user: root
command: >
sh -c 'npm install --ignore-scripts --save --loglevel verbose && ${COMPOSE_COMMAND}'
ports:
- ${PUBLIC_NODE_DEBUG_PORT:-9220}:9229
networks:
- safe_redis_leader_dev
networks:
safe_redis_leader_dev:
name: safe_redis_leader_dev
driver: bridge
volumes:
node_modules_vol:
name: ${CLIENT_PREFIX_ID:-test-}package_node_modules

View File

@@ -1 +0,0 @@
FROM node:12-alpine3.10

View File

@@ -1,148 +0,0 @@
const { spawn, exec } = require('child_process')
async function run(){
let composeCommand = 'npm run test'
const isTest = process.env.NODE_ENV === 'test'
const exampleName = process.env.EXAMPLE
let command = exec(
`mkdir -p ./dev-docker-data-cache && mkdir -p ./dev-docker-data-cache/node_modules && mkdir -p ./dev-docker-data-cache/redis-data`,
{
env: {
...process.env,
}
}
)
await waitForCommandStatusWithStdout(command, {onError: ()=>new Error('could not create dev-docker-data-cache directories')})
if(isTest){
await startTests({composeCommand})
}
else if(exampleName === 'multi-client'){
await startMultiClientExample()
}
}
async function startTests({composeCommand}){
const child2 = spawn(
`docker-compose`,
[
"--project-name",
"safe-redis-leader",
"--project-directory",
"./docker/compose",
"-f",
"./docker/compose/test.yml",
"-f",
"./docker/compose/redis.yml",
"up",
// "--build"
],
{
env: {
...process.env,
COMPOSE_COMMAND: composeCommand
},
stdio: 'inherit'
}
);
child2.on("exit", (code, signal)=>{
process.exit(code)
})
}
async function startMultiClientExample(){
const projectName = 'safe-redis-leader-multi-client-example'
const child1 = spawn(
`docker-compose`,
[
"--project-name",
projectName,
"--project-directory",
"./docker/compose",
"-f",
"./docker/compose/redis.yml",
"up",
// "--build"
],
{
env: {
...process.env
},
stdio: 'inherit'
}
);
child1.on("exit", (code, signal)=>{
process.exit(code)
})
const totalClients = 2
for(let i = 0; i < totalClients; i++){
await startSingleClient({
projectName: `${projectName}-${i}`,
id: i,
composeCommand: `SCRIPT_CLIENT_ID=${i} npm run example:multi-client`
})
}
}
async function startSingleClient({composeCommand, projectName, id}){
const child1 = spawn(
`docker-compose`,
[
"--project-name",
projectName,
"--project-directory",
"./docker/compose",
"-f",
"./docker/compose/test.yml",
"up",
// "--build"
],
{
env: {
...process.env,
COMPOSE_COMMAND: composeCommand,
PUBLIC_NODE_DEBUG_PORT: `922${id}`,
CLIENT_PREFIX_ID: `client-${id}-`
},
stdio: 'inherit'
}
);
child1.on("exit", (code, signal)=>{
process.exit(code)
})
}
function waitForCommandStatusWithStdout(command, {onError}){
command.stdout.pipe(process.stdout)
command.stderr.pipe(process.stderr)
return new Promise((res,rej)=>command.on('close', (code) => {
if(code === 0){
res()
}
else{
rej(onError(code))
}
}))
}
run().catch((e)=>{
console.error(e)
process.exit(1)
})

11228
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,13 @@
{
"name": "safe-redis-leader",
"version": "0.0.6",
"description": "Redis leader election implementation that does not have any race conditions",
"main": "src/src/index.js",
"name": "ts-safe-redis-leader",
"version": "0.0.1",
"description": "Redis leader election implementation that does not have any race conditions in Typescript",
"main": "src/index.js",
"scripts": {
"test": "npm install && NODE_ENV=test node ./docker/scripts/runner.js",
"example:multi-client": "npm install && EXAMPLE='multi-client' node ./docker/scripts/runner.js"
"eslint": "eslint '{src,apps,libs,test}/**/*.ts' --fix",
"test:debug": "node --inspect=0.0.0.0:9229 node_modules/.bin/jest --config ./test/jest-e2e.json --runInBand",
"test": "node_modules/.bin/jest --config ./test/jest-e2e.json",
"prepublish": "npm run eslint && npm run test"
},
"author": "Michael Khirallah",
"license": "MIT",
@@ -22,5 +24,26 @@
"bugs": {
"url": "https://github.com/mkralla11/safe-redis-leader/issues"
},
"homepage": "https://github.com/mkralla11/safe-redis-leader#readme"
"homepage": "https://github.com/mkralla11/safe-redis-leader#readme",
"dependencies": {
"ioredis": "^4.27.6"
},
"devDependencies": {
"@types/ioredis": "^4.26.5",
"@types/jest": "^26.0.24",
"@types/lodash": "^4.14.171",
"@types/node": "^16.3.2",
"@typescript-eslint/eslint-plugin": "^4.28.3",
"@typescript-eslint/parser": "^4.28.3",
"eslint": "^7.30.0",
"eslint-config-airbnb-base": "^14.2.1",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-import": "^2.23.4",
"eslint-plugin-jest": "^24.3.6",
"eslint-plugin-prettier": "^3.4.0",
"jest": "^27.0.6",
"lodash": "^4.17.21",
"ts-jest": "^27.0.3",
"typescript": "^4.3.5"
}
}

View File

@@ -4,7 +4,7 @@ local id = ARGV[1]
local current_id = redis.call('GET', key)
if (id == current_id) then
redis.call('DEL', key)
return 1
return true
else
return 0
return false
end

View File

@@ -0,0 +1,27 @@
import { readFile as fsReadFile } from "fs";
import IORedis from "ioredis";
import { promisify } from "util";
const readFile = promisify(fsReadFile);
type getIsEqualDeleteType = {
getIsEqualDelete?: (key: IORedis.KeyType, id: string) => Promise<boolean>;
};
export async function atomicGetIsEqualDelete(
asyncRedis: IORedis.Redis & getIsEqualDeleteType,
key: IORedis.KeyType,
id: string,
): Promise<boolean> {
if (!asyncRedis.getIsEqualDelete) {
const file = await readFile(`${__dirname}/index.lua`, "utf8");
asyncRedis.defineCommand("getIsEqualDelete", {
numberOfKeys: 1,
lua: file,
});
}
const res = await asyncRedis.getIsEqualDelete!(key, id);
return res;
}

View File

@@ -5,7 +5,7 @@ local ms = ARGV[2]
local current_id = redis.call('GET', key)
if (id == current_id) then
redis.call('PEXPIRE', key, ms)
return 1
return true
else
return 0
return false
end

View File

@@ -0,0 +1,28 @@
import { readFile as fsReadFile } from "fs";
import IORedis from "ioredis";
import { promisify } from "util";
const readFile = promisify(fsReadFile);
type getIsEqualSetPExpireType = {
getIsEqualSetPExpire?: (key: IORedis.KeyType, id: string, ms: number) => Promise<boolean>;
};
export async function atomicGetIsEqualSetPExpire(
asyncRedis: IORedis.Redis & getIsEqualSetPExpireType,
key: IORedis.KeyType,
id: string,
ms: number,
): Promise<boolean> {
if (!asyncRedis.getIsEqualSetPExpire) {
const file = await readFile(`${__dirname}/index.lua`, "utf8");
asyncRedis.defineCommand("getIsEqualSetPExpire", {
numberOfKeys: 1,
lua: file,
});
}
const res = await asyncRedis.getIsEqualSetPExpire!(key, id, ms);
return res;
}

3
src/emitter.ts Normal file
View File

@@ -0,0 +1,3 @@
import { EventEmitter } from "events";
export class MainEmitter extends EventEmitter {}

View File

@@ -1,70 +0,0 @@
const {createSafeRedisLeader} = require('../../src')
const Redis = require('ioredis')
function randomIntFromInterval(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min)
}
function delay(ms){
return new Promise((res)=>{
setTimeout(res, ms)
})
}
async function main(){
const {
DOCKER_REDIS_HOST,
DOCKER_REDIS_PORT,
DOCKER_REDIS_PASSWORD,
SCRIPT_CLIENT_ID
} = process.env
const redisCreds = {
host: DOCKER_REDIS_HOST,
port: DOCKER_REDIS_PORT || null, // inside of docker-compose so you don't need this
password: DOCKER_REDIS_PASSWORD
}
const asyncRedis = new Redis(redisCreds)
const leaderElectionKey = 'the-election'
const safeLeader = await createSafeRedisLeader({
asyncRedis: asyncRedis,
ttl: 1000,
wait: 2000,
key: leaderElectionKey
})
safeLeader.on("elected", ()=>{
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - current leader`)
})
safeLeader.on("demoted", ()=>{
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - demoted`)
})
await delay(2000)
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - starting`)
await safeLeader.elect()
while(true){
await delay(randomIntFromInterval(1, 4) * 1000)
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - removing self from candidate pool`)
await safeLeader.stop()
await delay(randomIntFromInterval(1, 4) * 1000)
console.log(`SCRIPT_CLIENT_ID - ${SCRIPT_CLIENT_ID} - re-entering candidate pool`)
await safeLeader.elect()
}
}
main().catch((e)=>{
console.error(e)
process.exit(1)
})

149
src/index.ts Normal file
View File

@@ -0,0 +1,149 @@
import { BinaryLike, createHash, randomBytes } from "crypto";
import IORedis from "ioredis";
import { atomicGetIsEqualDelete } from "./atomicGetIsEqualDelete";
import { atomicGetIsEqualSetPExpire } from "./atomicGetIsEqualSetPExpire";
import { MainEmitter } from "./emitter";
export enum EmitterEnum {
NOT_ELECTED = "notElected",
ELECTED = "elected",
DEMOTED = "demoted",
ERROR = "error"
}
const hashKey = (key: BinaryLike): string => `leader: ${createHash("sha1").update(key).digest("hex")}`;
const random = (): string => randomBytes(32).toString("base64");
export class SafeRedisLeader {
public isStarted = false;
public wasLeading = false;
public canLead = false;
public emitter = new MainEmitter();
public id = hashKey(random());
public renewTimeoutId!: ReturnType<typeof setTimeout>;
public electTimeoutId!: ReturnType<typeof setTimeout>;
public asyncRedis: IORedis.Redis;
public ttl: number;
public wait: number;
public key: IORedis.KeyType;
constructor(
asyncRedis: IORedis.Redis,
ttl: number,
wait: number,
key: IORedis.KeyType,
) {
this.asyncRedis = asyncRedis;
this.ttl = ttl;
this.wait = wait;
this.key = hashKey(key || random());
}
public async renew(): Promise<void> {
try {
const isLeading = await atomicGetIsEqualSetPExpire(
this.asyncRedis,
this.key,
this.id,
this.ttl,
);
if (isLeading) {
this.wasLeading = true;
this.renewTimeoutId = setTimeout(this.renew.bind(this), this.ttl / 2);
} else {
if (this.wasLeading) {
this.wasLeading = false;
this.emitter.emit("demoted");
}
clearTimeout(this.renewTimeoutId);
this.electTimeoutId = setTimeout(this.runElection.bind(this), this.wait);
}
} catch (err) {
if (this.isStarted) {
this.emitter.emit(EmitterEnum.ERROR, err);
}
}
}
public async runElection(): Promise<void> {
try {
const res = await this.asyncRedis.set(this.key, this.id, "PX", this.ttl, "NX");
if (res) {
this.emitter.emit(EmitterEnum.ELECTED);
this.wasLeading = true;
if (!this.canLead) {
return this.stop();
}
this.renewTimeoutId = setTimeout(this.renew.bind(this), this.ttl / 2);
} else {
this.emitter.emit(EmitterEnum.NOT_ELECTED);
this.electTimeoutId = setTimeout(this.runElection.bind(this), this.wait);
}
} catch (err) {
if (this.isStarted) {
this.emitter.emit(EmitterEnum.ERROR, err);
}
}
}
public async elect(): Promise<void> {
this.isStarted = true;
this.canLead = true;
await this.runElection();
}
public async isLeader(): Promise<boolean> {
const curId = await this.asyncRedis.get(this.key);
return this.id === curId;
}
public async stop(): Promise<void> {
this.canLead = false;
this.renewTimeoutId && clearTimeout(this.renewTimeoutId);
this.electTimeoutId && clearTimeout(this.electTimeoutId);
const res = await atomicGetIsEqualDelete(this.asyncRedis, this.key, this.id);
if (res) {
this.emitter.emit(EmitterEnum.DEMOTED);
}
this.wasLeading = false;
}
public on(name: string, fn: () => void): void {
this.emitter.on(name, fn);
}
public off(name: string, fn: () => void): void {
this.emitter.off(name, fn);
}
public once(name: string, fn: () => void): void {
this.emitter.once(name, fn);
}
public removeAllListeners(): void {
this.emitter.removeAllListeners();
}
public async shutdown(): Promise<void> {
this.isStarted = false;
this.canLead = false;
this.renewTimeoutId && clearTimeout(this.renewTimeoutId);
this.electTimeoutId && clearTimeout(this.electTimeoutId);
await this.stop();
}
}

View File

@@ -1,33 +0,0 @@
// const redis = require('redis')
const Redis = require("ioredis")
// const RedisClustr = require('redis-clustr')
const {promisify} = require('util')
// const {
// execMultiAsync
// } = require('../../utils')
module.exports = async function connectToRedis({redisCreds, promisifyFnNames=[]}){
let {
host,
port,
password
} = redisCreds
// debugger
// console.log(redisCreds)
port = port || 6379
let asyncRedis = new Redis({
host,
port,
password,
// Not the best solution, but we know
// that Redis Elasticasche on AWS
// has a weird tls related issue,
// so just set this to true solves it
// (not an issue when local)
tls: /amazonaws\.com$/.test(host) ? true : undefined
})
return asyncRedis
}

View File

@@ -1,4 +0,0 @@
--reporter spec
--file ./mochaConfig/setup/index.js
--exclude node_modules/**/*
--timeout 10000

View File

@@ -1,53 +0,0 @@
const connectToRedis = require('../../library/connect-to-redis')
let asyncRedis
async function beforeTests(){
const {
DOCKER_REDIS_HOST,
DOCKER_REDIS_PORT,
DOCKER_REDIS_PASSWORD
} = process.env
const redisCreds = {
host: DOCKER_REDIS_HOST,
port: DOCKER_REDIS_PORT || null, // testing inside of docker-compose so you don't need this
password: DOCKER_REDIS_PASSWORD
}
this.redisCreds = redisCreds
// nock.disableNetConnect()
asyncRedis = await connectToRedis({redisCreds})
}
before(async function(){
await (beforeTests.bind(this))()
})
beforeEach(async function(){
this.timeout(0)
// await dropAllMongoCollections()
// FLUSH ALL REDIS
await asyncRedis.flushall("ASYNC")
// KILL EVERYTHING
await asyncRedis.client('KILL', 'TYPE', 'normal')
await asyncRedis.client('KILL', 'TYPE', 'master')
await asyncRedis.client('KILL', 'TYPE', 'slave')
await asyncRedis.client('KILL', 'TYPE', 'pubsub')
let left = (await asyncRedis.client('list')).match(/(?<=id\=)(\d+)/g)
this.timeout(10000)
// console.log("redis connections left", left)
// debugger
})
after(async function(){
await asyncRedis.quit()
})

1997
src/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,21 +0,0 @@
{
"name": "safe-redis-leader",
"version": "0.0.1",
"description": "Redis leader election implementation that does not have any race conditions",
"main": "index.js",
"scripts": {
"test": "NODE_ENV=test NODE_PATH='./' mocha $DEBUG_NODE $WATCH --watch --inspect-brk=0.0.0:9229 --opts ./mochaConfig/mocha.opts './src/**/*.test.js'",
"example:multi-client": "node ./examples/multi-client/index.js"
},
"author": "",
"license": "ISC",
"devDependencies": {
"chai": "^4.2.0",
"dotenv": "^8.2.0",
"lodash": "^4.17.14",
"mocha": "^6.2.2",
"nock": "^12.0.2",
"nodemon": "^2.0.4",
"ioredis": "^4.16.0"
}
}

BIN
src/src/.DS_Store vendored

Binary file not shown.

Binary file not shown.

View File

@@ -1,26 +0,0 @@
const fs = require('fs')
const util = require('util')
const readFile = util.promisify(fs.readFile)
async function atomicGetIsEqualDelete({
asyncRedis,
key,
id
}){
// do lua stuff
if(!asyncRedis.getIsEqualDelete){
const file = await readFile(`${__dirname}/lua/index.lua`, 'utf8')
asyncRedis.defineCommand("getIsEqualDelete", {
numberOfKeys: 1,
lua: file
})
}
const res = await asyncRedis.getIsEqualDelete(key, id)
return res
}
module.exports.atomicGetIsEqualDelete = atomicGetIsEqualDelete

View File

@@ -1,63 +0,0 @@
const {atomicGetIsEqualDelete} = require('../index')
const connectToRedis = require('../../../library/connect-to-redis')
const {tryCatchIgnore} = require('../../testHelpers')
const {assert} = require('chai')
describe("atomicGetIsEqualDelete", function(){
afterEach(async function(){
await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis")
})
it("should get, compare equality, and not delete if null id in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
assert.isFalse(!!didDelete, "lua script claimed to delete id when it was supposed to be null")
})
it("should get, compare equality, and not delete if different id in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await asyncRedis.set(key, "different-id")
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
assert.isFalse(!!didDelete, "lua script claimed to delete id when it was supposed to be a different id")
})
it("should get, compare equality, and delete when id is equal in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await asyncRedis.set(key, id)
const didDelete = await atomicGetIsEqualDelete({
asyncRedis,
key,
id
})
assert.isTrue(!!didDelete, "lua script claimed to skip delete when it was supposed to be the same id")
})
})

Binary file not shown.

View File

@@ -1,27 +0,0 @@
const fs = require('fs')
const util = require('util')
const readFile = util.promisify(fs.readFile)
async function atomicGetIsEqualSetPExpire({
asyncRedis,
key,
id,
ms
}){
// do lua stuff
if(!asyncRedis.getIsEqualSetPExpire){
const file = await readFile(`${__dirname}/lua/index.lua`, 'utf8')
asyncRedis.defineCommand("getIsEqualSetPExpire", {
numberOfKeys: 1,
lua: file
})
}
const res = await asyncRedis.getIsEqualSetPExpire(key, id, ms)
return res
}
module.exports.atomicGetIsEqualSetPExpire = atomicGetIsEqualSetPExpire

View File

@@ -1,67 +0,0 @@
const {atomicGetIsEqualSetPExpire} = require('../index')
const connectToRedis = require('../../../library/connect-to-redis')
const {tryCatchIgnore} = require('../../testHelpers')
const {assert} = require('chai')
describe("atomicGetIsEqualSetPExpire", function(){
afterEach(async function(){
await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis")
})
it("should get, compare equality, and not set pexpire when id not equal to self in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const ms = 3000
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await this.asyncRedis.set(key, "different-id")
const didExtend = await atomicGetIsEqualSetPExpire({
asyncRedis,
key,
id,
ms
})
assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been inequal")
})
it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const ms = 3000
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
// this.asyncRedis.set(key, "different-id")
const didExtend = await atomicGetIsEqualSetPExpire({
asyncRedis,
key,
id,
ms
})
assert.isFalse(!!didExtend, "lua script claimed to extend pexpire id should have been null")
})
it("should get, compare equality, and not set pexpire when id is null in lua script for redis", async function(){
const key = "my-key"
const id = "the-id"
const ms = 3000
const asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.asyncRedis = asyncRedis
await this.asyncRedis.set(key, id)
const didExtend = await atomicGetIsEqualSetPExpire({
asyncRedis,
key,
id,
ms
})
assert.isTrue(!!didExtend, "lua script didn't extend pexpire when id should have been equal")
})
})

View File

@@ -1,156 +0,0 @@
'use strict';
var crypto = require('crypto');
var EventEmitter = require('events').EventEmitter;
const {atomicGetIsEqualDelete} = require('./atomicGetIsEqualDelete')
const {atomicGetIsEqualSetPExpire} = require('./atomicGetIsEqualSetPExpire')
// Make the key less prone to collision
var hashKey = function(key) {
return 'leader:' + crypto.createHash('sha1').update(key).digest('hex');
};
const random = ()=>
crypto.randomBytes(32).toString("base64")
class MainEmitter extends EventEmitter{}
async function createSafeRedisLeader({
asyncRedis,
ttl,
wait,
key
}){
const emitter = new MainEmitter()
const id = hashKey(random())
key = hashKey(key || random());
let renewTimeoutId,
electTimeoutId
let isStarted = false
let wasLeading = false
let canLead = false
async function renew(){
await emitOnError(async ()=>{
const isLeading = await atomicGetIsEqualSetPExpire({
asyncRedis,
key,
id,
ms: ttl
})
if(isLeading){
wasLeading = true
renewTimeoutId = setTimeout(renew, ttl / 2)
}
else{
if(wasLeading){
wasLeading = false
emitter.emit('demoted')
}
clearTimeout(renewTimeoutId)
electTimeoutId = setTimeout(runElection, wait)
}
})
}
async function runElection(){
await emitOnError(async ()=>{
const res = await asyncRedis.set(key, id, 'PX', ttl, 'NX')
if(res !== null) {
emitter.emit('elected')
wasLeading = true
if(!canLead){
return await stop()
}
renewTimeoutId = setTimeout(renew, ttl / 2)
}
else{
electTimeoutId = setTimeout(runElection, wait)
}
})
}
async function elect(){
isStarted = true
canLead = true
await runElection()
}
async function isLeader(){
const curId = await asyncRedis.get(key)
return id === curId
}
async function stop(){
canLead = false
// real atomic get -> isEqual -> delete
renewTimeoutId && clearTimeout(renewTimeoutId)
electTimeoutId && clearTimeout(electTimeoutId)
const res = await atomicGetIsEqualDelete({asyncRedis, key, id})
// a 1 indicates that we successfully deleted
// our leadership id which means we were
// the leader at time time of stop
if(res === 1){
emitter.emit('demoted')
}
wasLeading = false
}
function on(name, fn){
emitter.on(name, fn)
}
function off(name, fn){
emitter.off(name, fn)
}
function once(name, fn){
emitter.once(name, fn)
}
function removeAllListeners(){
emitter.removeAllListeners()
}
async function emitOnError(fn){
try{
await fn()
}
catch(e){
if(isStarted){
emitter.emit('error', e)
}
}
}
async function shutdown(){
isStarted = false
canLead = false
renewTimeoutId && clearTimeout(renewTimeoutId)
electTimeoutId && clearTimeout(electTimeoutId)
await stop()
}
return {
elect,
isLeader,
stop,
on,
off,
once,
removeAllListeners,
shutdown
}
}
module.exports.createSafeRedisLeader = createSafeRedisLeader

View File

@@ -1,24 +0,0 @@
async function tryCatchIgnore(fn, errorMessage){
try{
await fn()
}
catch(e){
if(errorMessage){
console.error(errorMessage, e)
}
// we don't care about this since this
// function is strictly used during shutdown
// (afterEach)
}
}
module.exports.tryCatchIgnore = tryCatchIgnore
async function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
module.exports.delay = delay

View File

@@ -1,24 +0,0 @@
const {createSafeRedisLeader} = require('../index')
const connectToRedis = require('../../library/connect-to-redis')
const {tryCatchIgnore, delay} = require('../testHelpers')
describe("createSafeRedisLeader", function(){
afterEach(async function(){
await tryCatchIgnore(async()=> this.safeLeader && await this.safeLeader.shutdown(), "could not shutdown safeLeader")
await tryCatchIgnore(async()=> this.asyncRedis && await this.asyncRedis.quit(), "could not shutdown asyncRedis")
})
it("should instantiate a safeRedisLeader", async function(){
const key = "safe-leader"
this.asyncRedis = await connectToRedis({redisCreds: this.redisCreds})
this.safeLeader = await createSafeRedisLeader({
asyncRedis: this.asyncRedis,
ttl: 1500,
wait: 3000,
key
})
await delay(1000)
})
})

7
test/docker-compose.yml Normal file
View File

@@ -0,0 +1,7 @@
version: "3.6"
services:
redis:
image: redis:alpine
command: redis-server
ports:
- 6378:6379

41
test/helper.ts Normal file
View File

@@ -0,0 +1,41 @@
/* eslint-disable no-async-promise-executor */
import { EmitterEnum, SafeRedisLeader } from "../src";
export async function waitForElection(safeLeader: SafeRedisLeader): Promise<boolean> {
return new Promise<boolean>(async (resolve) => {
safeLeader.on(EmitterEnum.ELECTED, async () => {
resolve(true);
});
safeLeader.once(EmitterEnum.NOT_ELECTED, async () => {
resolve(false);
});
await safeLeader.elect();
});
}
export async function waitForDemotion(safeLeader: SafeRedisLeader): Promise<void> {
return new Promise<void>(async (resolve) => {
safeLeader.on(EmitterEnum.DEMOTED, async () => {
resolve();
});
});
}
export async function getLeaderIndex(safeLeaders: SafeRedisLeader[]): Promise<number> {
for (let i = 0; i < safeLeaders.length; i += 1) {
const safeLeader = safeLeaders[i];
// eslint-disable-next-line no-await-in-loop
const isLeading = await safeLeader.isLeader();
if (isLeading) {
return i;
}
}
return -1;
}
export async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

10
test/jest-e2e.json Normal file
View File

@@ -0,0 +1,10 @@
{
"moduleFileExtensions": ["js", "json", "ts"],
"rootDir": ".",
"testEnvironment": "node",
"testRegex": ".e2e-spec.ts$",
"transform": {
"^.+\\.(t|j)s$": "ts-jest"
}
}

View File

@@ -0,0 +1,84 @@
import * as Redis from "ioredis";
import { SafeRedisLeader } from "../src";
import { getLeaderIndex, sleep, waitForElection } from "./helper";
/* eslint-disable no-await-in-loop */
jest.setTimeout(1000 * 15);
describe("SafeRedisLeader", () => {
let redisClients: Redis.Redis[] = [];
let safeLeader: SafeRedisLeader;
let safeLeaders: SafeRedisLeader[] = [];
beforeAll(async () => {
const redisConfig: Redis.RedisOptions = {
port: 6378,
host: "localhost",
autoResubscribe: false,
lazyConnect: true,
maxRetriesPerRequest: 0,
};
redisClients = Array.from(Array(5)).map(() => new Redis(redisConfig));
safeLeaders = Array.from(Array(5)).map((_, i) => new SafeRedisLeader(redisClients[i], 1500, 3000, "test"));
});
afterEach(async () => {
await safeLeader?.shutdown();
await Promise.all(safeLeaders.map((sL) => sL.shutdown()));
safeLeaders.length = 0;
});
it("should be connected to Redis", async () => {
const setValue = await redisClients[0].set("test_key", "test_value");
const getValue = await redisClients[0].get("test_key");
expect(setValue).toEqual("OK");
expect(getValue).toEqual("test_value");
});
it("should initialize SafeRedisLeader", async () => {
safeLeader = new SafeRedisLeader(redisClients[0], 1500, 3000, "test");
expect(safeLeader.id).toBeTruthy();
});
it("should elect a Leader", async () => {
safeLeader = new SafeRedisLeader(redisClients[0], 1500, 3000, "test");
const isLeader = await waitForElection(safeLeader);
expect(isLeader).toEqual(true);
});
it("should only elect one Leader ", async () => {
safeLeaders = Array.from(Array(5)).map((_, i) => new SafeRedisLeader(redisClients[i], 1500, 3000, "test"));
for (let i = 0; i < safeLeaders.length; i += 1) {
const sL = safeLeaders[i];
const isLeader = await waitForElection(sL);
expect(isLeader).toEqual(i === 0);
}
});
it("Should re-elect a leader if the leader get disconnected", async () => {
safeLeaders = Array.from(Array(5)).map((_, i) => new SafeRedisLeader(redisClients[i], 500, 1000, "test"));
for (let i = 0; i < safeLeaders.length; i += 1) {
const sL = safeLeaders[i];
await waitForElection(sL);
}
const currentLeaderIndex = await getLeaderIndex(safeLeaders);
expect(currentLeaderIndex).not.toEqual(-1);
await safeLeaders[currentLeaderIndex].stop();
const afterStopLeader = await getLeaderIndex(safeLeaders);
expect(afterStopLeader).toEqual(-1);
await sleep(1000);
const newLeaderIndex = await getLeaderIndex(safeLeaders);
expect(newLeaderIndex).not.toEqual(-1);
});
});

16
tsconfig.json Normal file
View File

@@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": true,
"removeComments": true,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"allowSyntheticDefaultImports": true,
"target": "es2017",
"sourceMap": true,
"outDir": "./dist",
"baseUrl": "./",
"incremental": true,
"skipLibCheck": true
}
}