Add method "desttroy"

This commit is contained in:
Andrew Derbitov
2019-03-11 19:21:45 +03:00
parent 5f51bfa48a
commit 075766c921
3 changed files with 778 additions and 671 deletions

194
index.js
View File

@@ -128,88 +128,7 @@ function isObject(obj) {
} }
class QueryCursor { class Rs extends stream.Transform {
constructor(query, reqParams, opts) {
this.isInsert = !!query.match(/^insert/i);
this.fieldList = null;
this.query = query;
this.reqParams = _.merge({}, reqParams);
this.opts = opts;
this.useTotals = false;
}
exec(cb) {
let me = this;
if (me.opts.debug) {
console.log('exec req headers', me.reqParams.headers);
}
request.post(me.reqParams, (err, res) => {
if (me.opts.debug) {
console.log('exec', err, _.pick(res, [
'statusCode',
'body',
'statusMessage'
]));
}
if (err) {
return cb(err);
} else if (res.statusCode !== 200) {
return cb(
getErrorObj(res)
);
}
if ( ! res.body) {
return cb(null, {r: 1});
}
if (me.opts.debug) {
console.log('exec res headers', res.headers);
}
try {
let json = JSON.parse(res.body);
cb(null, me.useTotals ? json : json.data);
} catch (err2) {
cb(err2);
}
});
}
withTotals() {
this.useTotals = true;
return this;
}
toPromise() {
let me = this;
return new Promise((resolve, reject) => {
me.exec(function (err, data) {
if (err) return reject(err);
resolve(data);
})
});
}
stream() {
const
me = this,
isDebug = me.opts.debug;
if (isDebug) {
console.log('stream req headers', me.reqParams.headers);
}
if (me.isInsert) {
class Rs extends stream.Transform {
constructor(reqParams) { constructor(reqParams) {
super(); super();
@@ -303,19 +222,105 @@ class QueryCursor {
} }
}); });
} }
}
class QueryCursor {
constructor(query, reqParams, opts) {
this.isInsert = !!query.match(/^insert/i);
this.fieldList = null;
this.query = query;
this.reqParams = _.merge({}, reqParams);
this.opts = opts;
this.useTotals = false;
this._request = null;
} }
let rs = new Rs(this.reqParams);
exec(cb) {
let me = this;
if (me.opts.debug) {
console.log('exec req headers', me.reqParams.headers);
}
me._request = request.post(me.reqParams, (err, res) => {
if (me.opts.debug) {
console.log('exec', err, _.pick(res, [
'statusCode',
'body',
'statusMessage'
]));
}
if (err) {
return cb(err);
} else if (res.statusCode !== 200) {
return cb(
getErrorObj(res)
);
}
if ( ! res.body) {
return cb(null, {r: 1});
}
if (me.opts.debug) {
console.log('exec res headers', res.headers);
}
try {
let json = JSON.parse(res.body);
cb(null, me.useTotals ? json : json.data);
} catch (err2) {
cb(err2);
}
});
}
withTotals() {
this.useTotals = true;
return this;
}
toPromise() {
let me = this;
return new Promise((resolve, reject) => {
me.exec(function (err, data) {
if (err) return reject(err);
resolve(data);
})
});
}
stream() {
const
me = this,
isDebug = me.opts.debug;
if (isDebug) {
console.log('stream req headers', me.reqParams.headers);
}
if (me.isInsert) {
const rs = new Rs(this.reqParams);
rs.query = this.query; rs.query = this.query;
me._request = rs;
return rs; return rs;
} else { } else {
let toJSON = JSONStream.parse(['data', true]); const toJSON = JSONStream.parse(['data', true]);
let rs = new stream.Readable({ objectMode: true }); const rs = new stream.Readable({ objectMode: true });
rs._read = () => {}; rs._read = () => {};
rs.query = this.query; rs.query = this.query;
let tf = new stream.Transform({ objectMode: true }); const tf = new stream.Transform({ objectMode: true });
let isFirstChunck = true; let isFirstChunck = true;
tf._transform = function (chunk, encoding, cb) { tf._transform = function (chunk, encoding, cb) {
@@ -338,7 +343,7 @@ class QueryCursor {
let metaData = {}; let metaData = {};
let requestStream = request.post(this.reqParams); const requestStream = request.post(this.reqParams);
// Не делаем .pipe(rs) потому что rs - Readable, // Не делаем .pipe(rs) потому что rs - Readable,
// а для pipe нужен Writable // а для pipe нужен Writable
@@ -385,9 +390,24 @@ class QueryCursor {
toJSON.resume(); toJSON.resume();
}; };
me._request = rs;
return stream2asynciter(rs); return stream2asynciter(rs);
} }
} }
destroy() {
if (this._request instanceof stream.Readable) {
return this._request.destroy();
}
if (this._request) {
return this._request.abort();
}
throw new Error('QueryCursor.destroy error: private field _request is invalid');
}
} }

View File

@@ -1,10 +1,55 @@
{ {
"_from": "clickhouse@^1.2.15",
"_id": "clickhouse@1.2.17",
"_inBundle": false,
"_integrity": "sha512-uBB8BnlOJm09620hhvsrSJv9RmrgQ/AHJsQE1fwr7nD0Aof2kPjCw6r+ag3YE/f0Pgurk5cYzoPxyR2Io6VHwA==",
"_location": "/clickhouse",
"_phantomChildren": {
"aws-sign2": "0.7.0",
"aws4": "1.8.0",
"caseless": "0.12.0",
"combined-stream": "1.0.7",
"extend": "3.0.2",
"forever-agent": "0.6.1",
"form-data": "2.3.3",
"har-validator": "5.1.3",
"http-signature": "1.2.0",
"is-typedarray": "1.0.0",
"isstream": "0.1.2",
"json-stringify-safe": "5.0.1",
"jsonparse": "1.3.1",
"mime-types": "2.1.22",
"oauth-sign": "0.9.0",
"performance-now": "2.1.0",
"safe-buffer": "5.1.2",
"through": "2.3.8",
"tough-cookie": "2.4.3",
"tunnel-agent": "0.6.0"
},
"_requested": {
"type": "range",
"registry": true,
"raw": "clickhouse@^1.2.15",
"name": "clickhouse",
"escapedName": "clickhouse",
"rawSpec": "^1.2.15",
"saveSpec": null,
"fetchSpec": "^1.2.15"
},
"_requiredBy": [
"/"
],
"_resolved": "https://registry.npmjs.org/clickhouse/-/clickhouse-1.2.17.tgz",
"_shasum": "4922b86acdf32f0233d94704d8cfb57b9578f68d",
"_spec": "clickhouse@^1.2.15",
"_where": "/home/www/ukit_dev",
"author": { "author": {
"name": "Dmitry Berezhnov" "name": "Dmitry Berezhnov"
}, },
"scripts": { "bugs": {
"test": "mocha --timeout 60000 --slow 5000" "url": "https://github.com/TimonKK/clickhouse/issues"
}, },
"bundleDependencies": false,
"dependencies": { "dependencies": {
"JSONStream": "1.3.4", "JSONStream": "1.3.4",
"lodash": "4.17.11", "lodash": "4.17.11",
@@ -12,6 +57,7 @@
"request": "2.88.0", "request": "2.88.0",
"stream2asynciter": "1.0.0" "stream2asynciter": "1.0.0"
}, },
"deprecated": false,
"description": "Client for ClickHouse", "description": "Client for ClickHouse",
"devDependencies": { "devDependencies": {
"expect.js": "0.3.1", "expect.js": "0.3.1",
@@ -22,6 +68,10 @@
"shasum": "cf85ab5364055c479b1ee71f12ba206a3ecb80b2", "shasum": "cf85ab5364055c479b1ee71f12ba206a3ecb80b2",
"tarball": "https://registry.npmjs.org/clickhouse/-/clickhouse-1.0.0.tgz" "tarball": "https://registry.npmjs.org/clickhouse/-/clickhouse-1.0.0.tgz"
}, },
"homepage": "https://github.com/TimonKK/clickhouse#readme",
"keywords": [
"clickhouse"
],
"license": "ISC", "license": "ISC",
"main": "index.js", "main": "index.js",
"maintainers": [ "maintainers": [
@@ -31,13 +81,13 @@
} }
], ],
"name": "clickhouse", "name": "clickhouse",
"version": "1.2.17",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/TimonKK/clickhouse.git" "url": "git+https://github.com/TimonKK/clickhouse.git"
}, },
"keywords": [ "scripts": {
"clickhouse" "test": "mocha --timeout 60000 --slow 5000"
], },
"types": "index.d.ts" "types": "index.d.ts",
"version": "1.2.18"
} }

View File

@@ -11,7 +11,8 @@ const
database : database, database : database,
debug : false debug : false
}), }),
rowCount = _.random(50 * 1024, 128 * 1024), minRnd = 50 * 1024,
rowCount = _.random(minRnd, 128 * 1024),
sql = `SELECT sql = `SELECT
number, number,
toString(number * 2) AS str, toString(number * 2) AS str,
@@ -553,3 +554,39 @@ describe('Exec system queries', () => {
} }
}); });
}); });
describe('Abort query', () => {
it('exec & abort', cb => {
const $q = clickhouse.query(`SELECT number FROM system.numbers LIMIT ${rowCount}`);
let i = 0,
error = null;
const stream = $q.stream()
.on('data', () => {
++i;
if (i > minRnd) {
stream.pause();
}
})
.on('error', err => error = err)
.on('close', () => {
expect(error).to.not.be.ok();
expect(i).to.be.below(rowCount);
cb();
})
.on('end', () => {
cb(new Error('no way!'));
});
setTimeout(() => $q.destroy(), 10 * 1000);
});
});
after(async () => {
await clickhouse.query(`DROP DATABASE IF EXISTS ${database}`).toPromise();
});