mirror of
https://github.com/lingble/safe-redis-leader.git
synced 2025-10-29 11:42:34 +00:00
error handling
This commit is contained in:
2
package-lock.json
generated
2
package-lock.json
generated
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"name": "safe-redis-leader",
|
||||
"version": "0.0.1",
|
||||
"version": "0.0.4",
|
||||
"lockfileVersion": 1
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "safe-redis-leader",
|
||||
"version": "0.0.4",
|
||||
"version": "0.0.5",
|
||||
"description": "Redis leader election implementation that does not have any race conditions",
|
||||
"main": "src/src/index.js",
|
||||
"scripts": {
|
||||
|
||||
@@ -26,37 +26,41 @@ async function createSafeRedisLeader({
|
||||
let renewTimeoutId,
|
||||
electTimeoutId
|
||||
|
||||
let isStarted = false
|
||||
|
||||
|
||||
|
||||
async function renew(){
|
||||
const leading = await isLeader()
|
||||
if(leading){
|
||||
await asyncRedis.pexpire(key, ttl)
|
||||
setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
clearTimeout(renewTimeoutId)
|
||||
electTimeoutId = setTimeout(elect, wait);
|
||||
}
|
||||
await emitOnError(async ()=>{
|
||||
const leading = await isLeader()
|
||||
if(leading){
|
||||
await asyncRedis.pexpire(key, ttl)
|
||||
setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
clearTimeout(renewTimeoutId)
|
||||
electTimeoutId = setTimeout(elect, wait);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function runElection(){
|
||||
await emitOnError(async ()=>{
|
||||
const res = await asyncRedis.set(key, id, 'PX', ttl, 'NX')
|
||||
|
||||
if(res !== null) {
|
||||
emitter.emit('elected')
|
||||
renewTimeoutId = setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
electTimeoutId = setTimeout(elect, wait)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function elect(){
|
||||
let res
|
||||
try{
|
||||
res = await asyncRedis.set(key, id, 'PX', ttl, 'NX')
|
||||
}
|
||||
catch(e){
|
||||
return emitter.emit('error', e)
|
||||
}
|
||||
|
||||
if(res !== null) {
|
||||
emitter.emit('elected')
|
||||
renewTimeoutId = setTimeout(renew, ttl / 2)
|
||||
}
|
||||
else{
|
||||
electTimeoutId = setTimeout(elect, wait)
|
||||
}
|
||||
isStarted = true
|
||||
await runElection()
|
||||
}
|
||||
|
||||
|
||||
@@ -88,7 +92,19 @@ async function createSafeRedisLeader({
|
||||
emitter.removeAllListeners()
|
||||
}
|
||||
|
||||
async function emitOnError(fn){
|
||||
try{
|
||||
await fn()
|
||||
}
|
||||
catch(e){
|
||||
if(isStarted){
|
||||
emitter.emit('error', e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function shutdown(){
|
||||
isStarted = false
|
||||
renewTimeoutId && clearTimeout(renewTimeoutId)
|
||||
electTimeoutId && clearTimeout(electTimeoutId)
|
||||
await stop()
|
||||
|
||||
@@ -14,4 +14,11 @@ async function tryCatchIgnore(fn, errorMessage){
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.tryCatchIgnore = tryCatchIgnore
|
||||
module.exports.tryCatchIgnore = tryCatchIgnore
|
||||
|
||||
|
||||
async function delay(ms) {
|
||||
return new Promise(resolve => setTimeout(resolve, ms))
|
||||
}
|
||||
|
||||
module.exports.delay = delay
|
||||
@@ -1,6 +1,6 @@
|
||||
const {createSafeRedisLeader} = require('../index')
|
||||
const connectToRedis = require('../../library/connect-to-redis')
|
||||
const {tryCatchIgnore} = require('../testHelpers')
|
||||
const {tryCatchIgnore, delay} = require('../testHelpers')
|
||||
|
||||
describe("createSafeRedisLeader", function(){
|
||||
afterEach(async function(){
|
||||
@@ -18,7 +18,7 @@ describe("createSafeRedisLeader", function(){
|
||||
wait: 3000,
|
||||
key
|
||||
})
|
||||
|
||||
await delay(1000)
|
||||
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user