fix(query): add fix for query

1. add setting for generation of session id for each query
2. fix handle of error from stream query
This commit is contained in:
Dmitry
2020-04-30 15:52:38 +03:00
parent b2b02c2e77
commit 5fc7689cd7
3 changed files with 103 additions and 11 deletions

View File

@@ -318,9 +318,17 @@ class Rs extends Transform {
return resolve({ r: 1 });
}
return reject(
getErrorObj(res)
)
let body = '';
res
.on('data', data => body += data)
.on('end', () => {
res.body = body;
return reject(
getErrorObj(res)
);
});
});
if ( ! me.isPiped) {
@@ -429,10 +437,15 @@ class QueryCursor {
'Content-Type': 'text/plain'
},
}, reqParams);
const configQS = _.merge({}, config, {
query_id: me.queryId,
});
if (me.connection.opts.isSessionPerQuery) {
configQS.session_id = uuidv4();
}
if (database) {
configQS.database = database;
}
@@ -491,11 +504,15 @@ class QueryCursor {
{}
);
}
} else if (query.match(/^insert/i)) {
query += ' FORMAT TabSeparated';
if (data) {
params['body'] = me._getBodyForInsert();
} else if (me.isInsert) {
if (query.match(/values/i)) {
//
} else {
query += ' FORMAT TabSeparated';
if (data) {
params['body'] = me._getBodyForInsert();
}
}
}
}
@@ -733,7 +750,7 @@ class QueryCursor {
// we need use this hack
me.connection.query(
`KILL QUERY WHERE query_id = '${me.queryId}' SYNC`, {}, {
sessionId: Date.now(),
sessionId: uuidv4(),
}
).exec(() => {});
}
@@ -762,6 +779,7 @@ class ClickHouse {
enable_http_compression : 0
},
format: 'json',
isSessionPerQuery: false,
},
opts
);
@@ -802,6 +820,16 @@ class ClickHouse {
return this;
}
get sessionPerQuery() {
return this.opts.isSessionPerQuery;
}
setSessionPerQuery(value) {
this.opts.isSessionPerQuery = !!value;
return this;
}
get sessionTimeout() {
return this.opts.config.session_timeout;
}

View File

@@ -92,5 +92,5 @@
"test": "mocha --bail --timeout 60000 --slow 5000"
},
"types": "index.d.ts",
"version": "2.1.1"
"version": "2.1.2"
}

View File

@@ -274,7 +274,6 @@ describe('Select', () => {
});
});
describe('session', () => {
it('use session', async () => {
const sessionId = clickhouse.sessionId;
@@ -297,6 +296,53 @@ describe('session', () => {
clickhouse.sessionId = sessionId;
});
it('use setSessionPerQuery #1', async () => {
const sessionPerQuery = clickhouse.sessionPerQuery;
clickhouse.setSessionPerQuery(false);
const result = await clickhouse.query(
`CREATE TEMPORARY TABLE test_table
(_id String, str String)
ENGINE=Memory`
).toPromise();
expect(result).to.be.ok();
try {
await clickhouse.query(
`CREATE TEMPORARY TABLE test_table
(_id String, str String)
ENGINE=Memory`
).toPromise();
expect(1).to.be(0);
} catch (err) {
expect(err).to.be.ok();
expect(err.code).to.be(57);
}
clickhouse.setSessionPerQuery(sessionPerQuery);
});
it('use setSessionPerQuery #2', async () => {
const sessionPerQuery = clickhouse.sessionPerQuery;
clickhouse.setSessionPerQuery(true);
const result = await clickhouse.query(
`CREATE TEMPORARY TABLE test_table
(_id String, str String)
ENGINE=Memory`
).toPromise();
expect(result).to.be.ok();
await clickhouse.query(
`CREATE TEMPORARY TABLE test_table
(_id String, str String)
ENGINE=Memory`
).toPromise();
clickhouse.setSessionPerQuery(sessionPerQuery);
});
});
// You can use all settings from request library (https://github.com/request/request#tlsssl-protocol)
@@ -457,6 +503,24 @@ describe('queries', () => {
expect(result9).to.eql(result10);
});
it('insert && errors', async () => {
try {
const stream = clickhouse.insert('INSERT INTO BAD_TABLE').stream();
for (let i = 0; i < 10; i++) {
stream.writeRow([i, `test: ${i}`]);
}
await stream.exec();
// no way!
expect(1).to.be.equal(0);
} catch (err) {
expect(err).to.be.ok();
expect(err.code).to.be(60);
}
});
it('select number as number', async () => {
const result = await clickhouse.query('DROP TABLE IF EXISTS test_int_temp').toPromise();
expect(result).to.be.ok();