refactor(index): global refactoring

This commit is contained in:
Dmitry
2020-02-05 16:35:30 +03:00
parent 05f46b49b2
commit d0d19ca18e
5 changed files with 566 additions and 484 deletions

View File

@@ -26,10 +26,11 @@ const clickhouse = new ClickHouse({
isUseGzip: false, isUseGzip: false,
format: "json", // "json" || "csv" || "tsv" format: "json", // "json" || "csv" || "tsv"
config: { config: {
session_id : 'session_id if neeed',
session_timeout : 60, session_timeout : 60,
output_format_json_quote_64bit_integers : 0, output_format_json_quote_64bit_integers : 0,
enable_http_compression : 0, enable_http_compression : 0,
database : 'my_database_name', database : 'my_database_name',
}, },
// This object merge with request params (see request lib docs) // This object merge with request params (see request lib docs)

692
index.js
View File

@@ -1,17 +1,15 @@
'use strict'; 'use strict';
const zlib = require('zlib'); const zlib = require('zlib');
const _ = require('lodash');
const const request = require('request');
_ = require('lodash'), const { Transform, Readable, } = require('stream');
request = require('request'), const JSONStream = require('JSONStream');
stream = require('stream'), const through = require('through');
querystring = require('querystring'), const stream2asynciter = require('stream2asynciter');
JSONStream = require('JSONStream'), const { URL } = require('url');
through = require('through'), const tsv = require('tsv');
stream2asynciter = require('stream2asynciter'), const uuidv4 = require('uuid/v4');
{ URL } = require('url'),
tsv = require('tsv');
/** /**
@@ -24,26 +22,38 @@ const
* session_timeout * session_timeout
*/ */
var SEPARATORS = { const SEPARATORS = {
TSV: "\t", TSV: "\t",
CSV: ",", CSV: ",",
Values: "," Values: ","
}; };
var ALIASES = { const ALIASES = {
TabSeparated: "TSV" TabSeparated: "TSV"
}; };
var ESCAPE_STRING = { var ESCAPE_STRING = {
TSV: function (v, quote) {return v.replace (/\\/g, '\\\\').replace (/\\/g, '\\').replace(/\t/g, '\\t').replace(/\n/g, '\\n')}, /**
CSV: function (v, quote) {return v.replace (/\"/g, '""')}, * @return {string}
*/
TSV: function (value) {
return value
.replace(/\\/g, '\\\\')
.replace(/\\/g, '\\')
.replace(/\t/g, '\\t')
.replace(/\n/g, '\\n');
},
CSV: function (value) {
return value.replace (/\"/g, '""');
},
}; };
var ESCAPE_NULL = { var ESCAPE_NULL = {
TSV: "\\N", TSV: "\\N",
CSV: "\\N", CSV: "\\N",
Values: "\\N", Values: "\\N",
// JSONEachRow: "\\N", JSONEachRow: "\\N",
}; };
const R_ERROR = new RegExp('Code: ([0-9]{2}), .*Exception:'); const R_ERROR = new RegExp('Code: ([0-9]{2}), .*Exception:');
@@ -55,27 +65,43 @@ const PORT = 8123;
const DATABASE = 'default'; const DATABASE = 'default';
const USERNAME = 'default'; const USERNAME = 'default';
const FORMATS = {
'json': 'JSON',
'tsv': 'TabSeparatedWithNames',
'csv': 'CSVWithNames',
};
const REVERSE_FORMATS = Object.keys(FORMATS).reduce(
function(obj, format) {
obj[FORMATS[format]] = format;
return obj;
},
{}
);
const R_FORMAT_PARSER = new RegExp(
`FORMAT (${Object.keys(FORMATS).map(k => FORMATS[k]).join('|')})`,
'mi'
);
function parseCSV(body, options = { header: true }) { function parseCSV(body, options = { header: true }) {
const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body); const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body);
data.splice(data.length - 1, 1); data.splice(data.length - 1, 1);
return data; return data;
} }
function parseJSON(body) {
return JSON.parse(body);
}
function parseTSV(body, options = { header: true }) { function parseTSV(body, options = { header: true }) {
const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body); const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body);
data.splice(data.length - 1, 1); data.splice(data.length - 1, 1);
return data; return data;
} }
function parseCSVStream(s) { function parseCSVStream(s = new Set()) {
let isFirst = true; let isFirst = true;
let ref = { let ref = {
fields: [] fields: []
}; };
return through(function (chunk) { return through(function (chunk) {
let str = chunk.toString(); let str = chunk.toString();
let parsed = parseCSV(str, {header: isFirst}); let parsed = parseCSV(str, {header: isFirst});
@@ -97,11 +123,12 @@ function parseJSONStream() {
return JSONStream.parse(['data', true]); return JSONStream.parse(['data', true]);
} }
function parseTSVStream(s) { function parseTSVStream(s = new Set()) {
let isFirst = true; let isFirst = true;
let ref = { let ref = {
fields: [] fields: []
}; };
return through(function (chunk) { return through(function (chunk) {
let str = chunk.toString(); let str = chunk.toString();
let parsed = parseTSV(str, {header: isFirst}); let parsed = parseTSV(str, {header: isFirst});
@@ -135,25 +162,33 @@ function chunkBuilder(isFirst, ref, chunk, parsed) {
} }
} }
function encodeValue(quote, v, format, isArray) { function encodeValue(quote, v, _format, isArray) {
format = ALIASES[format] || format; const format = ALIASES[_format] || _format;
switch (typeof v) { switch (typeof v) {
case 'string': case 'string':
if (isArray) { if (isArray) {
return `'${ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v}'`; return `'${ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v}'`;
} else {
return ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v;
} }
return ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v;
case 'number': case 'number':
if (isNaN (v)) if (isNaN(v)) {
return 'nan'; return 'nan';
if (v === +Infinity) }
if (v === +Infinity) {
return '+inf'; return '+inf';
if (v === -Infinity) }
if (v === -Infinity) {
return '-inf'; return '-inf';
if (v === Infinity) }
if (v === Infinity) {
return 'inf'; return 'inf';
}
return v; return v;
case 'object': case 'object':
@@ -164,7 +199,6 @@ function encodeValue(quote, v, format, isArray) {
// you can add array items // you can add array items
if (v instanceof Array) { if (v instanceof Array) {
// return '[' + v.map(encodeValue.bind(this, true, format)).join (',') + ']';
return '[' + v.map(function (i) { return '[' + v.map(function (i) {
return encodeValue(true, i, format, true); return encodeValue(true, i, format, true);
}).join(',') + ']'; }).join(',') + ']';
@@ -172,7 +206,7 @@ function encodeValue(quote, v, format, isArray) {
// TODO: tuples support // TODO: tuples support
if (!format) { if (!format) {
console.trace (); console.trace();
} }
if (v === null) { if (v === null) {
@@ -182,17 +216,19 @@ function encodeValue(quote, v, format, isArray) {
return format in ESCAPE_NULL ? ESCAPE_NULL[format] : v; return format in ESCAPE_NULL ? ESCAPE_NULL[format] : v;
case 'boolean': case 'boolean':
return v === true ? 1 : 0; return v === true ? 1 : 0;
default:
return v;
} }
} }
function getErrorObj(res) { function getErrorObj(res) {
let err = new Error(`${res.statusCode}: ${res.body || res.statusMessage}`); const err = new Error(`${res.statusCode}: ${res.body || res.statusMessage}`);
if (res.body) { if (res.body) {
let m = res.body.match(R_ERROR); const m = res.body.match(R_ERROR);
if (m) { if (m) {
if (m[1] && isNaN(parseInt(m[1])) === false) { if (m[1] && isNaN(parseInt(m[1])) === false) {
err.code = parseInt(m[1]); err.code = parseInt(m[1]);
} }
if (m[2]) { if (m[2]) {
@@ -210,11 +246,11 @@ function isObject(obj) {
} }
class Rs extends stream.Transform { class Rs extends Transform {
constructor(reqParams) { constructor(reqParams) {
super(); super();
let me = this; const me = this;
me.ws = request.post(reqParams); me.ws = request.post(reqParams);
@@ -296,30 +332,197 @@ class Rs extends stream.Transform {
class QueryCursor { class QueryCursor {
constructor(query, reqParams, opts) { constructor(connection, query, data, opts = {}) {
this.isInsert = !!query.match(/^insert/i); this.connection = connection;
this.fieldList = null;
this.query = query; this.query = query;
this.reqParams = _.merge({}, reqParams); this.data = data;
this.opts = opts;
this.opts = _.merge({}, opts, { format: this.connection.opts.format });
// Sometime needs to override format by query
const formatFromQuery = ClickHouse.getFormatFromQuery(this.query);
if (formatFromQuery && formatFromQuery !== this.format) {
this.opts.format = formatFromQuery;
}
this.useTotals = false; this.useTotals = false;
this._request = null; this._request = null;
this.queryId = opts.queryId || uuidv4();
console.log('QueryCursor', {query: this.query, data: this.data, opts: this.opts})
}
get isInsert() {
return !!this.query.match(/^insert/i);
}
get isDebug() {
return this.connection.opts.debug;
}
get format() {
return this.opts.format;
}
// TODO Add check for white list of formats
set format(newFormat) {
this.opts.format = newFormat;
}
_getBodyForInsert() {
const me = this;
let query = me.query;
let data = me.data;
let values = [],
fieldList = [],
isFirstElObject = false;
if (Array.isArray(data) && Array.isArray(data[0])) {
values = data;
} else if (Array.isArray(data) && isObject(data[0])) {
values = data;
isFirstElObject = true;
} else if (isObject(data)) {
values = [data];
isFirstElObject = true;
} else {
throw new Error('ClickHouse._getBodyForInsert: data is invalid format');
}
if (isFirstElObject) {
let m = query.match(/INSERT INTO (.+?) \((.+?)\)/);
if (m) {
fieldList = m[2].split(',').map(s => s.trim());
} else {
throw new Error('insert query wasnt parsed field list after TABLE_NAME');
}
}
return values.map(row => {
if (isFirstElObject) {
return ClickHouse.mapRowAsObject(fieldList, row);
} else {
return ClickHouse.mapRowAsArray(row);
}
}).join('\n');
} }
exec(cb) { _getReqParams() {
let me = this; const me = this;
if (me.opts.debug) { const {
console.log('exec req headers', me.reqParams.headers); reqParams,
config,
username,
password,
database,
} = me.connection.opts;
const params = _.merge({
headers: {
'Content-Type': 'text/plain'
},
}, reqParams);
const configQS = _.merge({}, config, {
query_id: me.queryId,
});
if (database) {
configQS.database = database;
} }
me._request = request.post(me.reqParams, (err, res) => { const url = new URL(me.connection.url);
if (me.opts.debug) {
console.log('exec', err, _.pick(res, [ if (username) {
url.searchParams.append('user', username);
}
if (password) {
url.searchParams.append('password', password);
}
Object.keys(configQS).forEach(k => {
url.searchParams.append(k, configQS[k]);
});
let data = me.data;
let query = me.query;
if (typeof query === 'string') {
if (/with totals/i.test(query)) {
me.useTotals = true;
}
// Hack for Sequelize ORM
query = query.trim().trimEnd().replace(/;$/gm, "");
if (query.match(/^(select|show|exists)/i)) {
if ( ! R_FORMAT_PARSER.test(query)) {
query += ` FORMAT ${ClickHouse.getFullFormatName(me.format)}`;
}
query += ';';
if (data && data.external) {
params['formData'] = data.external.reduce(
function(formData, external) {
url.searchParams.append(
`${external.name}_structure`,
external.structure || 'str String'
);
formData[external.name] = {
value: external.data.join('\n'),
options: {
filename: external.name,
contentType: 'text/plain'
}
};
return formData;
},
{}
);
}
} else if (query.match(/^insert/i)) {
query += ' FORMAT TabSeparated';
if (data) {
params['body'] = me._getBodyForInsert();
}
}
}
url.searchParams.append('query', query);
if (me.connection.isUseGzip) {
params.headers['Accept-Encoding'] = 'gzip';
}
if (me.isDebug) {
console.log('QueryCursor._getReqParams: params', me.query, params);
}
params['url'] = url.toString();
return params;
}
exec(cb) {
const me = this;
const reqParams = me._getReqParams();
me._request = request.post(reqParams, (err, res) => {
if (me.isDebug) {
console.log('QueryCursor.exec: result', me.query, err, _.pick(res, [
'statusCode', 'statusCode',
'body', 'body',
'statusMessage' 'statusMessage',
'headers'
])); ]));
} }
@@ -334,42 +537,66 @@ class QueryCursor {
if ( ! res.body) { if ( ! res.body) {
return cb(null, {r: 1}); return cb(null, {r: 1});
} }
if (me.opts.debug) {
console.log('exec res headers', res.headers);
}
try { try {
const result = this._parseRowsByFormat(res.body); let data = me.getBodyParser()(res.body);
cb(null, me.useTotals ? result : result.data || result)
} catch (e) { if (me.format === 'json') {
cb(e); data = data.data;
}
if (me.useTotals) {
const totals = JSON.parse(res.headers['x-clickhouse-summary']);
return cb(
null,
{
meta: {},
data: data,
totals,
rows: data.length,
statistics: {},
}
);
}
cb(null, data);
} catch (err) {
cb(err);
} }
}); });
} }
_parseRowsByFormat(body, isStream = false) { getBodyParser() {
let result = null; if (this.format === 'json') {
let ws; return JSON.parse;
switch (this._getFormat()) {
case "json":
result = !isStream && parseJSON(body) || parseJSONStream();
break;
case "tsv":
result = !isStream && parseTSV(body) || parseTSVStream(new Set());
break;
case "csv":
result = !isStream && parseCSV(body) || parseCSVStream(new Set());
break;
default:
result = body;
} }
return result;
if (this.format === 'tsv') {
return parseTSV;
}
if (this.format === 'csv') {
return parseCSV;
}
throw new Error(`CursorQuery.getBodyParser: unknown format "${this.format}"`);
}; };
_getFormat() { getStreamParser() {
return this.opts.sessionFormat || this.opts.format; if (this.format === 'json') {
} return parseJSONStream;
}
if (this.format === 'tsv') {
return parseTSVStream;
}
if (this.format === 'csv') {
return parseCSVStream;
}
throw new Error(`CursorQuery.getStreamParser: unknown format "${this.format}"`);
}
withTotals() { withTotals() {
this.useTotals = true; this.useTotals = true;
@@ -388,69 +615,59 @@ class QueryCursor {
}); });
} }
stream() { stream() {
const const me = this;
me = this,
isDebug = me.opts.debug;
if (isDebug) { const reqParams = me._getReqParams();
console.log('stream req headers', me.reqParams.headers);
}
if (me.isInsert) { if (me.isInsert) {
const rs = new Rs(this.reqParams); const rs = new Rs(reqParams);
rs.query = this.query; rs.query = me.query;
me._request = rs; me._request = rs;
return rs; return rs;
} else { } else {
const streamParser = this._parseRowsByFormat(null, true); const streamParser = this.getStreamParser()();
const rs = new stream.Readable({ objectMode: true }); const rs = new Readable({ objectMode: true });
rs._read = () => {}; rs._read = () => {};
rs.query = this.query; rs.query = me.query;
const tf = new stream.Transform({ objectMode: true }); const tf = new Transform({ objectMode: true });
let isFirstChunck = true; let isFirstChunk = true;
tf._transform = function (chunk, encoding, cb) { tf._transform = function (chunk, encoding, cb) {
// Если для первого chuck первый символ блока данных не '{', тогда:
// 1. в теле ответа не JSON
// 2. сервер нашел ошибку в данных запроса
if (isFirstChunck && (
(me._getFormat() === "json" && chunk[0] !== 123) &&
(me._getFormat() === "csv" && chunk[0] !== 110) &&
(me._getFormat() === "tsv" && chunk[0] !== 110)
)) {
this.error = new Error(chunk.toString());
streamParser.emit("error", this.error);
rs.emit('close');
return cb();
}
isFirstChunck = false; // В независимости от формата, в случае ошибки, в теле ответа будет текс,
// подпадающий под регулярку R_ERROR.
if (isFirstChunk) {
isFirstChunk = false;
if (R_ERROR.test(chunk.toString())) {
streamParser.emit('error', new Error(chunk.toString()));
rs.emit('close');
return cb();
}
}
cb(null, chunk); cb(null, chunk);
}; };
let metaData = {}; let metaData = {};
const requestStream = request.post(this.reqParams); const requestStream = request.post(reqParams);
// Не делаем .pipe(rs) потому что rs - Readable, // Не делаем .pipe(rs) потому что rs - Readable,
// а для pipe нужен Writable // а для pipe нужен Writable
let s = null; let s;
if (me.opts.isUseGzip) { if (me.connection.isUseGzip) {
const z = zlib.createGunzip(); const z = zlib.createGunzip();
s = requestStream.pipe(z).pipe(tf).pipe(streamParser) s = requestStream.pipe(z).pipe(tf).pipe(streamParser)
} else { } else {
s = requestStream.pipe(tf).pipe(streamParser) s = requestStream.pipe(tf).pipe(streamParser)
} }
s s
.on('error', function (err) { .on('error', function (err) {
rs.emit('error', err); rs.emit('error', err);
@@ -493,12 +710,32 @@ class QueryCursor {
destroy() { destroy() {
if (this._request instanceof stream.Readable) { const me = this;
return this._request.destroy();
let isCallDestroy = false;
if (me._request instanceof Readable) {
isCallDestroy = true;
me._request.destroy();
} else if (me._request) {
isCallDestroy = true;
me._request.abort();
} }
if (this._request) { // To trying to kill query by query id
return this._request.abort(); if (me.queryId) {
// Because this realesation work with session witout any ideas,
// we need use this hack
me.connection.query(
`KILL QUERY WHERE query_id = '${me.queryId}' SYNC`, {}, {
sessionId: Date.now(),
}
).exec(() => {});
}
if (isCallDestroy) {
return ;
} }
throw new Error('QueryCursor.destroy error: private field _request is invalid'); throw new Error('QueryCursor.destroy error: private field _request is invalid');
@@ -507,11 +744,7 @@ class QueryCursor {
class ClickHouse { class ClickHouse {
constructor(opts) { constructor(opts = {}) {
if ( ! opts) {
opts = {};
}
this.opts = _.extend( this.opts = _.extend(
{ {
debug: false, debug: false,
@@ -520,12 +753,11 @@ class ClickHouse {
basicAuth: null, basicAuth: null,
isUseGzip: false, isUseGzip: false,
config: { config: {
// session_id : Date.now(),
session_timeout : 60, session_timeout : 60,
output_format_json_quote_64bit_integers : 0, output_format_json_quote_64bit_integers : 0,
enable_http_compression : 0 enable_http_compression : 0
}, },
format: "json", // "json" || "csv" || "tsv" format: 'json',
}, },
opts opts
); );
@@ -602,205 +834,79 @@ class ClickHouse {
return this; return this;
} }
get isUseGzip() { get isUseGzip() {
return this.opts.isUseGzip; return this.opts.isUseGzip;
} }
set isUseGzip(val) { set isUseGzip(val) {
this.opts.isUseGzip = !!val; this.opts.isUseGzip = !!val;
this.opts.config.enable_http_compression = this.opts.isUseGzip ? 1 : 0; this.opts.config.enable_http_compression = this.opts.isUseGzip ? 1 : 0;
} }
escape(str) {
return str.replace(/\t|\n/g, '\\t');
}
static mapRowAsArray(row) { static mapRowAsArray(row) {
return row.map(function(value) { return row
return encodeValue(false, value, 'TabSeparated'); .map(value => encodeValue(false, value, 'TabSeparated'))
}).join('\t'); .join('\t');
}
_getFormat(query) {
let format = "";
switch (this.opts.format) {
case "json":
format = this._parseFormat(query, " format JSON");
break;
case "tsv":
format = this._parseFormat(query, " format TabSeparatedWithNames");
break;
case "csv":
format = this._parseFormat(query, " format CSVWithNames");
break;
default:
format = " ";
}
return format;
};
_parseFormat(query, def) {
if (query.match(/format/mg) === null) {
this.opts.sessionFormat = this.opts.format;
return def;
}
if (query.match(/format JSON/mg) !== null) {
this.opts.sessionFormat = "json";
} else if (query.match(/format TabSeparated/mg) !== null) {
this.opts.sessionFormat = "tsv";
} else if (query.match(/format CSV/mg) !== null) {
this.opts.sessionFormat = "csv";
}
return "";
}
_mapRowAsObject(fieldList, row) {
return fieldList.map(f => encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated')).join('\t');
} }
static mapRowAsObject(fieldList, row) {
_getBodyForInsert(query, data) { return fieldList
let values = [], .map(f => {
fieldList = [], return encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated');
isFirstElObject = false; })
.join('\t');
if (Array.isArray(data) && Array.isArray(data[0])) {
values = data;
} else if (Array.isArray(data) && isObject(data[0])) {
values = data;
isFirstElObject = true;
} else if (isObject(data)) {
values = [data];
isFirstElObject = true;
} else {
throw new Error('ClickHouse._getBodyForInsert: data is invalid format');
}
if (isFirstElObject) {
let m = query.match(/INSERT INTO (.+?) \((.+?)\)/);
if (m) {
fieldList = m[2].split(',').map(s => s.trim());
} else {
throw new Error('insert query wasnt parsed field list after TABLE_NAME');
}
}
return values.map(row => {
if (isFirstElObject) {
return this._mapRowAsObject(fieldList, row);
} else {
return ClickHouse.mapRowAsArray(row);
}
}).join('\n');
} }
static getFullFormatName(format = '') {
_getReqParams(query, data) { if ( ! FORMATS[format]) {
let me = this; throw new Error(`Clickhouse.getFullFormatName: unknown format "${format}`);
let reqParams = _.merge({}, me.opts.reqParams),
configQS = _.merge({}, me.opts.config);
if (me.opts.database) {
configQS.database = me.opts.database;
} }
if (typeof query === 'string') { return FORMATS[format];
let sql = query.trim(); }
// Hack for Sequelize ORM static getFormatFromQuery(query = '') {
sql = sql.trimEnd().replace(/;$/gm, ""); if ( ! query) {
throw new Error(`Clickhouse.getFormatFromQuery: query is empty!`);
if (sql.match(/^(select|show|exists)/i)) { }
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + this._getFormat(sql) + ';') + '&' + querystring.stringify(configQS);
if (me.opts.username) { // We use regexp with "g" flag then match doen't return first group.
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; // So, use exec.
} const m = R_FORMAT_PARSER.exec(query);
if (m) {
if (this.opts.password) { const format = m[1];
reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password; if ( ! REVERSE_FORMATS[format]) {
} throw new Error(`Clickhouse.getFormatFromQuery: unknown format "${format}"!`);
if (data && data.external) {
let formData = {};
for (let external of data.external) {
reqParams.url += `&${external.name}_structure=${external.structure || 'str String'}`;
formData[external.name] = {
value: external.data.join('\n'),
options: {
filename: external.name,
contentType: 'text/plain'
}
}
}
reqParams['formData'] = formData;
}
} else if (query.match(/^insert/i)) {
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS);
if (me.opts.username) {
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username;
}
if (this.opts.password) {
reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password;
}
if (data) {
reqParams['body'] = me._getBodyForInsert(sql, data);
}
} else {
reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ";") + '&' + querystring.stringify(configQS);
if (me.opts.username) {
reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username;
}
if (this.opts.password) {
reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password;
}
} }
reqParams['headers'] = { return REVERSE_FORMATS[format];
'Content-Type': 'text/plain'
}
} }
if (me.opts.isUseGzip) { return '';
//reqParams.headers['Content-Encoding'] = 'gzip';
reqParams.headers['Accept-Encoding'] = 'gzip';
// reqParams['gzip'] = true;
}
if (me.opts.debug) {
console.log('DEBUG', reqParams);
}
return reqParams;
} }
static getFormats() {
return Object.keys(FORMATS).map(k => ({ format: k, fullFormatExpr: FORMATS[k], }));
}
query(...args) { query(...args) {
if (args.length === 2 && typeof args[args.length - 1] === 'function') { if (typeof args[args.length - 1] === 'function') {
return new QueryCursor(args[0], this._getReqParams(args[0], null), this.opts).exec(args[args.length - 1]); const newArgs = args.slice(0, args.length);
} else { const cb = args[args.length - 1];
return new QueryCursor(args[0], this._getReqParams(args[0], args[1]), this.opts);
return new QueryCursor(this, ...newArgs).exec(cb);
} }
return new QueryCursor(this, ...args);
} }
insert(query, data) { insert(query, data) {
return new QueryCursor(query, this._getReqParams(query, data), this.opts); return new QueryCursor(this, query, data);
} }
} }
module.exports = { module.exports = {
ClickHouse ClickHouse,
}; };

8
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{ {
"name": "clickhouse", "name": "clickhouse",
"version": "1.2.21", "version": "1.2.24",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {
@@ -555,9 +555,9 @@
} }
}, },
"uuid": { "uuid": {
"version": "3.3.2", "version": "3.4.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-3.3.2.tgz", "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz",
"integrity": "sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==" "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A=="
}, },
"verror": { "verror": {
"version": "1.10.0", "version": "1.10.0",

View File

@@ -56,8 +56,9 @@
"querystring": "0.2.0", "querystring": "0.2.0",
"request": "2.88.0", "request": "2.88.0",
"stream2asynciter": "1.0.0", "stream2asynciter": "1.0.0",
"through": "^2.3.8", "through": "2.3.8",
"tsv": "^0.2.0" "tsv": "0.2.0",
"uuid": "3.4.0"
}, },
"deprecated": false, "deprecated": false,
"description": "Client for ClickHouse", "description": "Client for ClickHouse",
@@ -91,5 +92,5 @@
"test": "mocha --bail --timeout 60000 --slow 5000" "test": "mocha --bail --timeout 60000 --slow 5000"
}, },
"types": "index.d.ts", "types": "index.d.ts",
"version": "1.2.24" "version": "2.0.0"
} }

View File

@@ -1,8 +1,12 @@
const 'use strict';
stream = require('stream'),
expect = require('expect.js'), const stream = require('stream');
_ = require('lodash'), const expect = require('expect.js');
{ ClickHouse } = require('../.'); const _ = require('lodash');
const https = require('https');
const fs = require('fs');
const { ClickHouse } = require('../.');
const database = 'test_' + _.random(1000, 100000); const database = 'test_' + _.random(1000, 100000);
@@ -44,7 +48,14 @@ describe('Exec', () => {
) )
ENGINE=MergeTree(date, (mark, time), 8192)`, ENGINE=MergeTree(date, (mark, time), 8192)`,
'OPTIMIZE TABLE session_temp PARTITION 201807 FINAL' 'OPTIMIZE TABLE session_temp PARTITION 201807 FINAL',
`
SELECT
*
FROM session_temp
LIMIT 10
`,
]; ];
for(const query of sqlList) { for(const query of sqlList) {
@@ -56,7 +67,7 @@ describe('Exec', () => {
}); });
describe('Select', () => { describe('Select', () => {
it('use callback', callback => { it('callback #1', callback => {
clickhouse.query(sql).exec((err, rows) => { clickhouse.query(sql).exec((err, rows) => {
expect(err).to.not.be.ok(); expect(err).to.not.be.ok();
@@ -67,8 +78,7 @@ describe('Select', () => {
}); });
}); });
it('callback #2', callback => {
it('use callback #2', callback => {
clickhouse.query(sql, (err, rows) => { clickhouse.query(sql, (err, rows) => {
expect(err).to.not.be.ok(); expect(err).to.not.be.ok();
@@ -78,103 +88,20 @@ describe('Select', () => {
callback(); callback();
}); });
}); });
it('use callback #3 with csv format', callback => { it('promise and await/async', async () => {
clickhouse.query(`${sql} format CSVWithNames`).exec((err, rows) => { const rows = await clickhouse.query(sql).toPromise();
expect(err).to.not.be.ok();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' });
callback();
});
});
it('use callback #4 with csv format', callback => {
clickhouse.query(`${sql} format CSVWithNames`, (err, rows) => {
expect(err).to.not.be.ok();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' });
callback();
});
});
it('use callback #5 with tsv format', callback => {
clickhouse.query(`${sql} format TabSeparatedWithNames`).exec((err, rows) => {
expect(err).to.not.be.ok();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' });
callback();
});
});
it('use callback #6 with tsv format', callback => {
clickhouse.query(`${sql} format TabSeparatedWithNames`, (err, rows) => {
expect(err).to.not.be.ok();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' });
callback();
});
});
it('use stream', function(callback) {
this.timeout(10000);
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' });
});
it('stream', function(callback) {
let i = 0; let i = 0;
let error = null; let error = null;
clickhouse.query(sql).stream() clickhouse.query(sql).stream()
.on('data', () => ++i) .on('data', () => ++i)
// TODO: on this case you should catch error
.on('error', err => callback(err))
.on('end', () => {
expect(error).to.not.be.ok();
expect(i).to.be(rowCount);
callback();
});
});
it('use stream with csv format', function(callback) {
// this.timeout(10000);
let i = 0;
let error = null;
clickhouse.query(`${sql} format CSVWithNames`).stream()
.on('data', () => {
++i
})
// TODO: on this case you should catch error
.on('error', err => callback(err))
.on('end', () => {
expect(error).to.not.be.ok();
expect(i).to.be(rowCount);
callback();
});
});
it('use stream with tsv format', function(callback) {
// this.timeout(10000);
let i = 0;
let error = null;
clickhouse.query(`${sql} format TabSeparatedWithNames`).stream()
.on('data', () => {
++i
})
// TODO: on this case you should catch error
.on('error', err => error = err) .on('error', err => error = err)
.on('end', () => { .on('end', () => {
expect(error).to.not.be.ok(); expect(error).to.not.be.ok();
@@ -184,8 +111,7 @@ describe('Select', () => {
}); });
}); });
it('stream with pause/resume', function(callback) {
it('use stream with pause/resume', function(callback) {
const const
count = 10, count = 10,
pause = 1000, pause = 1000,
@@ -216,10 +142,9 @@ describe('Select', () => {
}) })
}); });
const nodeVersion = process.version.split('.')[0].substr(1); const nodeVersion = process.version.split('.')[0].substr(1);
if (parseInt(nodeVersion, 10) >= 10) { if (parseInt(nodeVersion, 10) >= 10) {
it('use async for', async function() { it('async for', async function() {
let i = 0; let i = 0;
for await (const row of clickhouse.query(sql).stream()) { for await (const row of clickhouse.query(sql).stream()) {
@@ -231,58 +156,93 @@ describe('Select', () => {
expect(i).to.be(rowCount); expect(i).to.be(rowCount);
}); });
it('use async for with csv format', async function() {
let i = 0;
for await (const row of clickhouse.query(`${sql} format CSVWithNames`).stream()) {
++i;
expect(row).to.have.key('number');
expect(row).to.have.key('str');
expect(row).to.have.key('date');
}
expect(i).to.be(rowCount);
});
it('use async for with tsv format', async function() {
let i = 0;
for await (const row of clickhouse.query(`${sql} format TabSeparatedWithNames`).stream()) {
++i;
expect(row).to.have.key('number');
expect(row).to.have.key('str');
expect(row).to.have.key('date');
}
expect(i).to.be(rowCount);
});
} }
it('use promise and await/async', async () => { it('select with external', async () => {
let rows = await clickhouse.query(sql).toPromise(); const result = await clickhouse.query(
'SELECT count(*) AS count FROM temp_table',
expect(rows).to.have.length(rowCount); {
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); external: [
}); {
name: 'temp_table',
data: _.range(0, rowCount).map(i => `str${i}`)
it('use select with external', async () => { },
const result = await clickhouse.query('SELECT count(*) AS count FROM temp_table', { ]
external: [ }
{ ).toPromise();
name: 'temp_table',
data: _.range(0, rowCount).map(i => `str${i}`)
},
]
}).toPromise();
expect(result).to.be.ok(); expect(result).to.be.ok();
expect(result).to.have.length(1); expect(result).to.have.length(1);
expect(result[0]).to.have.key('count'); expect(result[0]).to.have.key('count');
expect(result[0].count).to.be(rowCount); expect(result[0].count).to.be(rowCount);
}); });
it('catch error', async () => {
try {
await clickhouse.query(sql + '1').toPromise();
} catch (err) {
expect(err).to.be.ok();
}
});
[
{
format: 'fake_name',
fullFormatExpr: 'FakeName',
},
...ClickHouse.getFormats(),
].forEach(({ format, fullFormatExpr }) => {
// The string "foRmat" is used because different forms of writing are found in real code.
const fullSql = sql + (format === 'fake_name' ? '' : ` foRmat ${fullFormatExpr}`);
it(`with "${fullFormatExpr}" into sql`, async () => {
const rows = await clickhouse.query(fullSql).toPromise();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' });
});
if (format !== 'fake_name') {
it(`with "${fullFormatExpr}" in options`, async () => {
const rows = await clickhouse.query(sql, {}, { format }).toPromise();
expect(rows).to.have.length(rowCount);
expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' });
});
}
it(`with promise "${fullFormatExpr}"`, async () => {
try {
await clickhouse.query(`SELECT * FROM random_table_name ${fullFormatExpr}`).toPromise();
} catch (err) {
expect(err).to.be.ok();
}
});
it(`with stream ${fullFormatExpr}"`, cb => {
let i = 0,
error = null;
const stream = clickhouse.query(`SELECT * FROM random_table_name ${fullFormatExpr}`).stream();
stream
.on('data', () => ++i)
.on('error', err => error = err)
.on('close', () => {
expect(error).to.be.ok();
expect(error.toString()).to.match(new RegExp(`Table ${database}.random_table_name doesn\'t exist`));
expect(i).to.eql(0);
cb();
})
.on('end', () => {
cb(new Error('no way #2'));
});
});
});
}); });
@@ -310,21 +270,16 @@ describe('session', () => {
}); });
}); });
// You can use all settings from request library (https://github.com/request/request#tlsssl-protocol) // You can use all settings from request library (https://github.com/request/request#tlsssl-protocol)
describe('TLS/SSL Protocol', () => { describe('TLS/SSL Protocol', () => {
it('use TLS/SSL Protocol', async () => { it('use TLS/SSL Protocol', async () => {
const
https = require('https'),
fs = require('fs');
let server = null; let server = null;
try { try {
server = https.createServer( server = https.createServer(
{ {
key : fs.readFileSync('test/cert/server.key'), key : fs.readFileSync('test/cert/server.key'),
cert : fs.readFileSync('test/cert/server.crt') cert : fs.readFileSync('test/cert/server.crt'),
}, },
(req, res) => { (req, res) => {
res.writeHead(200); res.writeHead(200);
@@ -341,12 +296,11 @@ describe('TLS/SSL Protocol', () => {
cert: fs.readFileSync('test/cert/server.crt'), cert: fs.readFileSync('test/cert/server.crt'),
key: fs.readFileSync('test/cert/server.key'), key: fs.readFileSync('test/cert/server.key'),
} }
} },
debug : false,
}); });
const r = await temp.query('SELECT 1 + 1 Format JSON').toPromise();
const r = await temp.query('SELECT 1 + 1').toPromise();
expect(r).to.be.ok(); expect(r).to.be.ok();
expect(r[0]).to.have.key('plus(1, 1)'); expect(r[0]).to.have.key('plus(1, 1)');
expect(r[0]['plus(1, 1)']).to.be(2); expect(r[0]['plus(1, 1)']).to.be(2);
@@ -364,7 +318,6 @@ describe('TLS/SSL Protocol', () => {
}); });
}); });
describe('queries', () => { describe('queries', () => {
it('insert field as array', async () => { it('insert field as array', async () => {
clickhouse.sessionId = Date.now(); clickhouse.sessionId = Date.now();
@@ -398,16 +351,14 @@ describe('queries', () => {
} }
]; ];
const r2 = await clickhouse.insert(
const r2 = await clickhouse.insert('INSERT INTO test_array (date, str, arr, arr2, arr3)', rows).toPromise(); 'INSERT INTO test_array (date, str, arr, arr2, arr3)',
rows
).toPromise();
expect(r2).to.be.ok(); expect(r2).to.be.ok();
clickhouse.sessionId = null;
}); });
it('select, insert and two pipes', async () => {
it('queries', async () => {
const result = await clickhouse.query('DROP TABLE IF EXISTS session_temp').toPromise(); const result = await clickhouse.query('DROP TABLE IF EXISTS session_temp').toPromise();
expect(result).to.be.ok(); expect(result).to.be.ok();
@@ -473,30 +424,24 @@ describe('queries', () => {
const result9 = await clickhouse.query('SELECT count(*) AS count FROM session_temp').toPromise(); const result9 = await clickhouse.query('SELECT count(*) AS count FROM session_temp').toPromise();
const result10 = await clickhouse.query('SELECT count(*) AS count FROM session_temp2').toPromise(); const result10 = await clickhouse.query('SELECT count(*) AS count FROM session_temp2').toPromise();
expect(result9).to.eql(result10); expect(result9).to.eql(result10);
});
it('select number as number', async () => {
const result = await clickhouse.query('DROP TABLE IF EXISTS test_int_temp').toPromise();
expect(result).to.be.ok();
const result11 = await clickhouse.query('SELECT date FROM test_array GROUP BY date WITH TOTALS').withTotals().toPromise(); const result1 = await clickhouse.query('CREATE TABLE test_int_temp (int_value Int8 ) ENGINE=Memory').toPromise();
expect(result11).to.have.key('meta'); expect(result1).to.be.ok();
expect(result11).to.have.key('data');
expect(result11).to.have.key('totals');
expect(result11).to.have.key('rows');
expect(result11).to.have.key('statistics');
const result111 = await clickhouse.query('DROP TABLE IF EXISTS test_int_temp').toPromise();
expect(result111).to.be.ok();
const result12 = await clickhouse.query('CREATE TABLE test_int_temp (int_value Int8 ) ENGINE=Memory').toPromise();
expect(result12).to.be.ok();
const int_value_data = [{int_value: 0}]; const int_value_data = [{int_value: 0}];
const result13 = await clickhouse.insert('INSERT INTO test_int_temp (int_value)', int_value_data).toPromise(); const result3 = await clickhouse.insert('INSERT INTO test_int_temp (int_value)', int_value_data).toPromise();
expect(result13).to.be.ok(); expect(result3).to.be.ok();
const result14 = await clickhouse.query('SELECT int_value FROM test_int_temp').toPromise(); const result4 = await clickhouse.query('SELECT int_value FROM test_int_temp').toPromise();
expect(result14).to.eql(int_value_data); expect(result4).to.eql(int_value_data);
}); });
}); });
describe('response codes', () => { describe('response codes', () => {
it('table is not exists', async () => { it('table is not exists', async () => {
try { try {
@@ -534,7 +479,7 @@ describe('set database', () => {
expect(r).to.be.ok(); expect(r).to.be.ok();
const temp = new ClickHouse({ const temp = new ClickHouse({
database: noDefaultDb database: noDefaultDb,
}); });
@@ -717,6 +662,35 @@ describe('Abort query', () => {
}); });
}); });
describe('Select and WITH TOTALS statement', () => {
[false, true].forEach(withTotals => {
it(`is ${withTotals}`, async () => {
const query = clickhouse.query(
`SELECT
number % 3 AS i,
groupArray(number) as kList
FROM (
SELECT number FROM system.numbers LIMIT 14
)
GROUP BY i ${withTotals ? '' : 'WITH TOTALS'}
FORMAT TabSeparatedWithNames
`
);
if (withTotals) {
query.withTotals();
}
const result = await query.toPromise();
expect(result).to.have.key('meta');
expect(result).to.have.key('data');
expect(result).to.have.key('totals');
expect(result).to.have.key('rows');
expect(result).to.have.key('statistics');
});
});
});
after(async () => { after(async () => {
await clickhouse.query(`DROP DATABASE IF EXISTS ${database}`).toPromise(); await clickhouse.query(`DROP DATABASE IF EXISTS ${database}`).toPromise();