diff --git a/README.md b/README.md index f3fa0d5..bea0723 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,11 @@ const clickhouse = new ClickHouse({ isUseGzip: false, format: "json", // "json" || "csv" || "tsv" config: { + session_id : 'session_id if neeed', session_timeout : 60, output_format_json_quote_64bit_integers : 0, enable_http_compression : 0, - database : 'my_database_name', + database : 'my_database_name', }, // This object merge with request params (see request lib docs) diff --git a/index.js b/index.js index 7140558..0e32f36 100644 --- a/index.js +++ b/index.js @@ -1,17 +1,15 @@ 'use strict'; const zlib = require('zlib'); - -const - _ = require('lodash'), - request = require('request'), - stream = require('stream'), - querystring = require('querystring'), - JSONStream = require('JSONStream'), - through = require('through'), - stream2asynciter = require('stream2asynciter'), - { URL } = require('url'), - tsv = require('tsv'); +const _ = require('lodash'); +const request = require('request'); +const { Transform, Readable, } = require('stream'); +const JSONStream = require('JSONStream'); +const through = require('through'); +const stream2asynciter = require('stream2asynciter'); +const { URL } = require('url'); +const tsv = require('tsv'); +const uuidv4 = require('uuid/v4'); /** @@ -24,26 +22,38 @@ const * session_timeout */ -var SEPARATORS = { +const SEPARATORS = { TSV: "\t", CSV: ",", Values: "," }; -var ALIASES = { +const ALIASES = { TabSeparated: "TSV" }; var ESCAPE_STRING = { - TSV: function (v, quote) {return v.replace (/\\/g, '\\\\').replace (/\\/g, '\\').replace(/\t/g, '\\t').replace(/\n/g, '\\n')}, - CSV: function (v, quote) {return v.replace (/\"/g, '""')}, + /** + * @return {string} + */ + TSV: function (value) { + return value + .replace(/\\/g, '\\\\') + .replace(/\\/g, '\\') + .replace(/\t/g, '\\t') + .replace(/\n/g, '\\n'); + }, + + CSV: function (value) { + return value.replace (/\"/g, '""'); + }, }; var ESCAPE_NULL = { TSV: "\\N", CSV: "\\N", Values: "\\N", - // JSONEachRow: "\\N", + JSONEachRow: "\\N", }; const R_ERROR = new RegExp('Code: ([0-9]{2}), .*Exception:'); @@ -55,27 +65,43 @@ const PORT = 8123; const DATABASE = 'default'; const USERNAME = 'default'; +const FORMATS = { + 'json': 'JSON', + 'tsv': 'TabSeparatedWithNames', + 'csv': 'CSVWithNames', +}; + +const REVERSE_FORMATS = Object.keys(FORMATS).reduce( + function(obj, format) { + obj[FORMATS[format]] = format; + return obj; + }, + {} +); + +const R_FORMAT_PARSER = new RegExp( + `FORMAT (${Object.keys(FORMATS).map(k => FORMATS[k]).join('|')})`, + 'mi' +); + function parseCSV(body, options = { header: true }) { const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body); data.splice(data.length - 1, 1); return data; } -function parseJSON(body) { - return JSON.parse(body); -} - function parseTSV(body, options = { header: true }) { const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body); data.splice(data.length - 1, 1); return data; } -function parseCSVStream(s) { +function parseCSVStream(s = new Set()) { let isFirst = true; let ref = { fields: [] }; + return through(function (chunk) { let str = chunk.toString(); let parsed = parseCSV(str, {header: isFirst}); @@ -97,11 +123,12 @@ function parseJSONStream() { return JSONStream.parse(['data', true]); } -function parseTSVStream(s) { +function parseTSVStream(s = new Set()) { let isFirst = true; let ref = { fields: [] }; + return through(function (chunk) { let str = chunk.toString(); let parsed = parseTSV(str, {header: isFirst}); @@ -135,25 +162,33 @@ function chunkBuilder(isFirst, ref, chunk, parsed) { } } -function encodeValue(quote, v, format, isArray) { - format = ALIASES[format] || format; +function encodeValue(quote, v, _format, isArray) { + const format = ALIASES[_format] || _format; switch (typeof v) { case 'string': if (isArray) { return `'${ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v}'`; - } else { - return ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v; } + + return ESCAPE_STRING[format] ? ESCAPE_STRING[format](v, quote) : v; case 'number': - if (isNaN (v)) + if (isNaN(v)) { return 'nan'; - if (v === +Infinity) + } + + if (v === +Infinity) { return '+inf'; - if (v === -Infinity) + } + + if (v === -Infinity) { return '-inf'; - if (v === Infinity) + } + + if (v === Infinity) { return 'inf'; + } + return v; case 'object': @@ -164,7 +199,6 @@ function encodeValue(quote, v, format, isArray) { // you can add array items if (v instanceof Array) { - // return '[' + v.map(encodeValue.bind(this, true, format)).join (',') + ']'; return '[' + v.map(function (i) { return encodeValue(true, i, format, true); }).join(',') + ']'; @@ -172,7 +206,7 @@ function encodeValue(quote, v, format, isArray) { // TODO: tuples support if (!format) { - console.trace (); + console.trace(); } if (v === null) { @@ -182,17 +216,19 @@ function encodeValue(quote, v, format, isArray) { return format in ESCAPE_NULL ? ESCAPE_NULL[format] : v; case 'boolean': return v === true ? 1 : 0; + default: + return v; } } function getErrorObj(res) { - let err = new Error(`${res.statusCode}: ${res.body || res.statusMessage}`); + const err = new Error(`${res.statusCode}: ${res.body || res.statusMessage}`); if (res.body) { - let m = res.body.match(R_ERROR); + const m = res.body.match(R_ERROR); if (m) { if (m[1] && isNaN(parseInt(m[1])) === false) { - err.code = parseInt(m[1]); + err.code = parseInt(m[1]); } if (m[2]) { @@ -210,11 +246,11 @@ function isObject(obj) { } -class Rs extends stream.Transform { +class Rs extends Transform { constructor(reqParams) { super(); - let me = this; + const me = this; me.ws = request.post(reqParams); @@ -296,30 +332,197 @@ class Rs extends stream.Transform { class QueryCursor { - constructor(query, reqParams, opts) { - this.isInsert = !!query.match(/^insert/i); - this.fieldList = null; - this.query = query; - this.reqParams = _.merge({}, reqParams); - this.opts = opts; + constructor(connection, query, data, opts = {}) { + this.connection = connection; + + this.query = query; + this.data = data; + + this.opts = _.merge({}, opts, { format: this.connection.opts.format }); + + // Sometime needs to override format by query + const formatFromQuery = ClickHouse.getFormatFromQuery(this.query); + if (formatFromQuery && formatFromQuery !== this.format) { + this.opts.format = formatFromQuery; + } + this.useTotals = false; - this._request = null; + this._request = null; + this.queryId = opts.queryId || uuidv4(); + console.log('QueryCursor', {query: this.query, data: this.data, opts: this.opts}) + } + + get isInsert() { + return !!this.query.match(/^insert/i); + } + + get isDebug() { + return this.connection.opts.debug; + } + + get format() { + return this.opts.format; + } + + // TODO Add check for white list of formats + set format(newFormat) { + this.opts.format = newFormat; + } + + _getBodyForInsert() { + const me = this; + + let query = me.query; + let data = me.data; + + let values = [], + fieldList = [], + isFirstElObject = false; + + if (Array.isArray(data) && Array.isArray(data[0])) { + values = data; + } else if (Array.isArray(data) && isObject(data[0])) { + values = data; + isFirstElObject = true; + } else if (isObject(data)) { + values = [data]; + isFirstElObject = true; + } else { + throw new Error('ClickHouse._getBodyForInsert: data is invalid format'); + } + + if (isFirstElObject) { + let m = query.match(/INSERT INTO (.+?) \((.+?)\)/); + if (m) { + fieldList = m[2].split(',').map(s => s.trim()); + } else { + throw new Error('insert query wasnt parsed field list after TABLE_NAME'); + } + } + + return values.map(row => { + if (isFirstElObject) { + return ClickHouse.mapRowAsObject(fieldList, row); + } else { + return ClickHouse.mapRowAsArray(row); + } + }).join('\n'); } - exec(cb) { - let me = this; + _getReqParams() { + const me = this; - if (me.opts.debug) { - console.log('exec req headers', me.reqParams.headers); + const { + reqParams, + config, + username, + password, + database, + } = me.connection.opts; + + const params = _.merge({ + headers: { + 'Content-Type': 'text/plain' + }, + }, reqParams); + const configQS = _.merge({}, config, { + query_id: me.queryId, + }); + + if (database) { + configQS.database = database; } - me._request = request.post(me.reqParams, (err, res) => { - if (me.opts.debug) { - console.log('exec', err, _.pick(res, [ + const url = new URL(me.connection.url); + + if (username) { + url.searchParams.append('user', username); + } + + if (password) { + url.searchParams.append('password', password); + } + + Object.keys(configQS).forEach(k => { + url.searchParams.append(k, configQS[k]); + }); + + + let data = me.data; + let query = me.query; + + if (typeof query === 'string') { + if (/with totals/i.test(query)) { + me.useTotals = true; + } + + // Hack for Sequelize ORM + query = query.trim().trimEnd().replace(/;$/gm, ""); + + if (query.match(/^(select|show|exists)/i)) { + if ( ! R_FORMAT_PARSER.test(query)) { + query += ` FORMAT ${ClickHouse.getFullFormatName(me.format)}`; + } + + query += ';'; + + if (data && data.external) { + params['formData'] = data.external.reduce( + function(formData, external) { + url.searchParams.append( + `${external.name}_structure`, + external.structure || 'str String' + ); + + formData[external.name] = { + value: external.data.join('\n'), + options: { + filename: external.name, + contentType: 'text/plain' + } + }; + + return formData; + }, + {} + ); + } + } else if (query.match(/^insert/i)) { + query += ' FORMAT TabSeparated'; + + if (data) { + params['body'] = me._getBodyForInsert(); + } + } + } + + url.searchParams.append('query', query); + + if (me.connection.isUseGzip) { + params.headers['Accept-Encoding'] = 'gzip'; + } + + if (me.isDebug) { + console.log('QueryCursor._getReqParams: params', me.query, params); + } + + params['url'] = url.toString(); + + return params; + } + + exec(cb) { + const me = this; + const reqParams = me._getReqParams(); + + me._request = request.post(reqParams, (err, res) => { + if (me.isDebug) { + console.log('QueryCursor.exec: result', me.query, err, _.pick(res, [ 'statusCode', 'body', - 'statusMessage' + 'statusMessage', + 'headers' ])); } @@ -334,42 +537,66 @@ class QueryCursor { if ( ! res.body) { return cb(null, {r: 1}); } - - if (me.opts.debug) { - console.log('exec res headers', res.headers); - } try { - const result = this._parseRowsByFormat(res.body); - cb(null, me.useTotals ? result : result.data || result) - } catch (e) { - cb(e); + let data = me.getBodyParser()(res.body); + + if (me.format === 'json') { + data = data.data; + } + + if (me.useTotals) { + const totals = JSON.parse(res.headers['x-clickhouse-summary']); + return cb( + null, + { + meta: {}, + data: data, + totals, + rows: data.length, + statistics: {}, + } + ); + } + + cb(null, data); + } catch (err) { + cb(err); } }); } - _parseRowsByFormat(body, isStream = false) { - let result = null; - let ws; - switch (this._getFormat()) { - case "json": - result = !isStream && parseJSON(body) || parseJSONStream(); - break; - case "tsv": - result = !isStream && parseTSV(body) || parseTSVStream(new Set()); - break; - case "csv": - result = !isStream && parseCSV(body) || parseCSVStream(new Set()); - break; - default: - result = body; + getBodyParser() { + if (this.format === 'json') { + return JSON.parse; } - return result; + + if (this.format === 'tsv') { + return parseTSV; + } + + if (this.format === 'csv') { + return parseCSV; + } + + throw new Error(`CursorQuery.getBodyParser: unknown format "${this.format}"`); }; - - _getFormat() { - return this.opts.sessionFormat || this.opts.format; - } + + getStreamParser() { + if (this.format === 'json') { + return parseJSONStream; + } + + if (this.format === 'tsv') { + return parseTSVStream; + } + + if (this.format === 'csv') { + return parseCSVStream; + } + + throw new Error(`CursorQuery.getStreamParser: unknown format "${this.format}"`); + } withTotals() { this.useTotals = true; @@ -388,69 +615,59 @@ class QueryCursor { }); } - stream() { - const - me = this, - isDebug = me.opts.debug; + const me = this; - if (isDebug) { - console.log('stream req headers', me.reqParams.headers); - } + const reqParams = me._getReqParams(); if (me.isInsert) { - const rs = new Rs(this.reqParams); - rs.query = this.query; + const rs = new Rs(reqParams); + rs.query = me.query; me._request = rs; return rs; } else { - const streamParser = this._parseRowsByFormat(null, true); + const streamParser = this.getStreamParser()(); - const rs = new stream.Readable({ objectMode: true }); + const rs = new Readable({ objectMode: true }); rs._read = () => {}; - rs.query = this.query; + rs.query = me.query; - const tf = new stream.Transform({ objectMode: true }); - let isFirstChunck = true; + const tf = new Transform({ objectMode: true }); + let isFirstChunk = true; tf._transform = function (chunk, encoding, cb) { - // Если для первого chuck первый символ блока данных не '{', тогда: - // 1. в теле ответа не JSON - // 2. сервер нашел ошибку в данных запроса - if (isFirstChunck && ( - (me._getFormat() === "json" && chunk[0] !== 123) && - (me._getFormat() === "csv" && chunk[0] !== 110) && - (me._getFormat() === "tsv" && chunk[0] !== 110) - )) { - this.error = new Error(chunk.toString()); - - streamParser.emit("error", this.error); - rs.emit('close'); - - return cb(); - } - isFirstChunck = false; + // В независимости от формата, в случае ошибки, в теле ответа будет текс, + // подпадающий под регулярку R_ERROR. + if (isFirstChunk) { + isFirstChunk = false; + + if (R_ERROR.test(chunk.toString())) { + streamParser.emit('error', new Error(chunk.toString())); + rs.emit('close'); + + return cb(); + } + } cb(null, chunk); }; let metaData = {}; - const requestStream = request.post(this.reqParams); + const requestStream = request.post(reqParams); // Не делаем .pipe(rs) потому что rs - Readable, // а для pipe нужен Writable - let s = null; - if (me.opts.isUseGzip) { + let s; + if (me.connection.isUseGzip) { const z = zlib.createGunzip(); s = requestStream.pipe(z).pipe(tf).pipe(streamParser) } else { s = requestStream.pipe(tf).pipe(streamParser) } - s .on('error', function (err) { rs.emit('error', err); @@ -493,12 +710,32 @@ class QueryCursor { destroy() { - if (this._request instanceof stream.Readable) { - return this._request.destroy(); + const me = this; + + let isCallDestroy = false; + + if (me._request instanceof Readable) { + isCallDestroy = true; + me._request.destroy(); + } else if (me._request) { + isCallDestroy = true; + me._request.abort(); } - if (this._request) { - return this._request.abort(); + // To trying to kill query by query id + if (me.queryId) { + + // Because this realesation work with session witout any ideas, + // we need use this hack + me.connection.query( + `KILL QUERY WHERE query_id = '${me.queryId}' SYNC`, {}, { + sessionId: Date.now(), + } + ).exec(() => {}); + } + + if (isCallDestroy) { + return ; } throw new Error('QueryCursor.destroy error: private field _request is invalid'); @@ -507,11 +744,7 @@ class QueryCursor { class ClickHouse { - constructor(opts) { - if ( ! opts) { - opts = {}; - } - + constructor(opts = {}) { this.opts = _.extend( { debug: false, @@ -520,12 +753,11 @@ class ClickHouse { basicAuth: null, isUseGzip: false, config: { - // session_id : Date.now(), session_timeout : 60, output_format_json_quote_64bit_integers : 0, enable_http_compression : 0 }, - format: "json", // "json" || "csv" || "tsv" + format: 'json', }, opts ); @@ -602,205 +834,79 @@ class ClickHouse { return this; } - get isUseGzip() { return this.opts.isUseGzip; } + set isUseGzip(val) { this.opts.isUseGzip = !!val; this.opts.config.enable_http_compression = this.opts.isUseGzip ? 1 : 0; } - - escape(str) { - return str.replace(/\t|\n/g, '\\t'); - } - - static mapRowAsArray(row) { - return row.map(function(value) { - return encodeValue(false, value, 'TabSeparated'); - }).join('\t'); - } - - _getFormat(query) { - let format = ""; - switch (this.opts.format) { - case "json": - format = this._parseFormat(query, " format JSON"); - break; - case "tsv": - format = this._parseFormat(query, " format TabSeparatedWithNames"); - break; - case "csv": - format = this._parseFormat(query, " format CSVWithNames"); - break; - default: - format = " "; - } - return format; - }; - - _parseFormat(query, def) { - if (query.match(/format/mg) === null) { - this.opts.sessionFormat = this.opts.format; - return def; - } - if (query.match(/format JSON/mg) !== null) { - this.opts.sessionFormat = "json"; - } else if (query.match(/format TabSeparated/mg) !== null) { - this.opts.sessionFormat = "tsv"; - } else if (query.match(/format CSV/mg) !== null) { - this.opts.sessionFormat = "csv"; - } - return ""; - } - - _mapRowAsObject(fieldList, row) { - return fieldList.map(f => encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated')).join('\t'); + return row + .map(value => encodeValue(false, value, 'TabSeparated')) + .join('\t'); } - - _getBodyForInsert(query, data) { - let values = [], - fieldList = [], - isFirstElObject = false; - - if (Array.isArray(data) && Array.isArray(data[0])) { - values = data; - } else if (Array.isArray(data) && isObject(data[0])) { - values = data; - isFirstElObject = true; - } else if (isObject(data)) { - values = [data]; - isFirstElObject = true; - } else { - throw new Error('ClickHouse._getBodyForInsert: data is invalid format'); - } - - if (isFirstElObject) { - let m = query.match(/INSERT INTO (.+?) \((.+?)\)/); - if (m) { - fieldList = m[2].split(',').map(s => s.trim()); - } else { - throw new Error('insert query wasnt parsed field list after TABLE_NAME'); - } - } - - return values.map(row => { - if (isFirstElObject) { - return this._mapRowAsObject(fieldList, row); - } else { - return ClickHouse.mapRowAsArray(row); - } - }).join('\n'); + static mapRowAsObject(fieldList, row) { + return fieldList + .map(f => { + return encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated'); + }) + .join('\t'); } - - _getReqParams(query, data) { - let me = this; - - let reqParams = _.merge({}, me.opts.reqParams), - configQS = _.merge({}, me.opts.config); - - if (me.opts.database) { - configQS.database = me.opts.database; + static getFullFormatName(format = '') { + if ( ! FORMATS[format]) { + throw new Error(`Clickhouse.getFullFormatName: unknown format "${format}`); } - if (typeof query === 'string') { - let sql = query.trim(); - - // Hack for Sequelize ORM - sql = sql.trimEnd().replace(/;$/gm, ""); - - if (sql.match(/^(select|show|exists)/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + this._getFormat(sql) + ';') + '&' + querystring.stringify(configQS); - if (me.opts.username) { - reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; - } - - if (this.opts.password) { - reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password; - } - - if (data && data.external) { - let formData = {}; - - for (let external of data.external) { - reqParams.url += `&${external.name}_structure=${external.structure || 'str String'}`; - - formData[external.name] = { - value: external.data.join('\n'), - options: { - filename: external.name, - contentType: 'text/plain' - } - } - } - - reqParams['formData'] = formData; - } - } else if (query.match(/^insert/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS); - - if (me.opts.username) { - reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; - } - - if (this.opts.password) { - reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password; - } - - if (data) { - reqParams['body'] = me._getBodyForInsert(sql, data); - } - } else { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ";") + '&' + querystring.stringify(configQS); - - if (me.opts.username) { - reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; - } - - if (this.opts.password) { - reqParams['url'] = reqParams['url'] + '&password=' + me.opts.password; - } + return FORMATS[format]; + } + + static getFormatFromQuery(query = '') { + if ( ! query) { + throw new Error(`Clickhouse.getFormatFromQuery: query is empty!`); + } + + // We use regexp with "g" flag then match doen't return first group. + // So, use exec. + const m = R_FORMAT_PARSER.exec(query); + if (m) { + const format = m[1]; + if ( ! REVERSE_FORMATS[format]) { + throw new Error(`Clickhouse.getFormatFromQuery: unknown format "${format}"!`); } - reqParams['headers'] = { - 'Content-Type': 'text/plain' - } + return REVERSE_FORMATS[format]; } - if (me.opts.isUseGzip) { - //reqParams.headers['Content-Encoding'] = 'gzip'; - reqParams.headers['Accept-Encoding'] = 'gzip'; - // reqParams['gzip'] = true; - } - - if (me.opts.debug) { - console.log('DEBUG', reqParams); - } - - return reqParams; + return ''; } + static getFormats() { + return Object.keys(FORMATS).map(k => ({ format: k, fullFormatExpr: FORMATS[k], })); + } query(...args) { - if (args.length === 2 && typeof args[args.length - 1] === 'function') { - return new QueryCursor(args[0], this._getReqParams(args[0], null), this.opts).exec(args[args.length - 1]); - } else { - return new QueryCursor(args[0], this._getReqParams(args[0], args[1]), this.opts); + if (typeof args[args.length - 1] === 'function') { + const newArgs = args.slice(0, args.length); + const cb = args[args.length - 1]; + + return new QueryCursor(this, ...newArgs).exec(cb); } + + return new QueryCursor(this, ...args); } - insert(query, data) { - return new QueryCursor(query, this._getReqParams(query, data), this.opts); + return new QueryCursor(this, query, data); } } module.exports = { - ClickHouse + ClickHouse, }; diff --git a/package-lock.json b/package-lock.json index dee6b60..48eae98 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "clickhouse", - "version": "1.2.21", + "version": "1.2.24", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -555,9 +555,9 @@ } }, "uuid": { - "version": "3.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.3.2.tgz", - "integrity": "sha512-yXJmeNaw3DnnKAOKJE51sL/ZaYfWJRl1pK9dr19YFCu0ObS231AB1/LbqTKRAQ5kw8A90rA6fr4riOUpTZvQZA==" + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==" }, "verror": { "version": "1.10.0", diff --git a/package.json b/package.json index ace4fbc..53fd1d5 100644 --- a/package.json +++ b/package.json @@ -56,8 +56,9 @@ "querystring": "0.2.0", "request": "2.88.0", "stream2asynciter": "1.0.0", - "through": "^2.3.8", - "tsv": "^0.2.0" + "through": "2.3.8", + "tsv": "0.2.0", + "uuid": "3.4.0" }, "deprecated": false, "description": "Client for ClickHouse", @@ -91,5 +92,5 @@ "test": "mocha --bail --timeout 60000 --slow 5000" }, "types": "index.d.ts", - "version": "1.2.24" + "version": "2.0.0" } diff --git a/test/test.js b/test/test.js index 0c31947..2e1d872 100644 --- a/test/test.js +++ b/test/test.js @@ -1,8 +1,12 @@ -const - stream = require('stream'), - expect = require('expect.js'), - _ = require('lodash'), - { ClickHouse } = require('../.'); +'use strict'; + +const stream = require('stream'); +const expect = require('expect.js'); +const _ = require('lodash'); +const https = require('https'); +const fs = require('fs'); + +const { ClickHouse } = require('../.'); const database = 'test_' + _.random(1000, 100000); @@ -44,7 +48,14 @@ describe('Exec', () => { ) ENGINE=MergeTree(date, (mark, time), 8192)`, - 'OPTIMIZE TABLE session_temp PARTITION 201807 FINAL' + 'OPTIMIZE TABLE session_temp PARTITION 201807 FINAL', + + ` + SELECT + * + FROM session_temp + LIMIT 10 + `, ]; for(const query of sqlList) { @@ -56,7 +67,7 @@ describe('Exec', () => { }); describe('Select', () => { - it('use callback', callback => { + it('callback #1', callback => { clickhouse.query(sql).exec((err, rows) => { expect(err).to.not.be.ok(); @@ -67,8 +78,7 @@ describe('Select', () => { }); }); - - it('use callback #2', callback => { + it('callback #2', callback => { clickhouse.query(sql, (err, rows) => { expect(err).to.not.be.ok(); @@ -78,103 +88,20 @@ describe('Select', () => { callback(); }); }); - - it('use callback #3 with csv format', callback => { - clickhouse.query(`${sql} format CSVWithNames`).exec((err, rows) => { - expect(err).to.not.be.ok(); - - expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' }); - - callback(); - }); - }); - - - it('use callback #4 with csv format', callback => { - clickhouse.query(`${sql} format CSVWithNames`, (err, rows) => { - expect(err).to.not.be.ok(); - - expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' }); - - callback(); - }); - }); - - it('use callback #5 with tsv format', callback => { - clickhouse.query(`${sql} format TabSeparatedWithNames`).exec((err, rows) => { - expect(err).to.not.be.ok(); - - expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); - - callback(); - }); - }); - - - it('use callback #6 with tsv format', callback => { - clickhouse.query(`${sql} format TabSeparatedWithNames`, (err, rows) => { - expect(err).to.not.be.ok(); - - expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); - - callback(); - }); - }); - - - it('use stream', function(callback) { - this.timeout(10000); + + it('promise and await/async', async () => { + const rows = await clickhouse.query(sql).toPromise(); + expect(rows).to.have.length(rowCount); + expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); + }); + + it('stream', function(callback) { let i = 0; let error = null; clickhouse.query(sql).stream() .on('data', () => ++i) - // TODO: on this case you should catch error - .on('error', err => callback(err)) - .on('end', () => { - expect(error).to.not.be.ok(); - expect(i).to.be(rowCount); - - callback(); - }); - }); - - it('use stream with csv format', function(callback) { - // this.timeout(10000); - - let i = 0; - let error = null; - - clickhouse.query(`${sql} format CSVWithNames`).stream() - .on('data', () => { - ++i - }) - // TODO: on this case you should catch error - .on('error', err => callback(err)) - .on('end', () => { - expect(error).to.not.be.ok(); - expect(i).to.be(rowCount); - - callback(); - }); - }); - - it('use stream with tsv format', function(callback) { - // this.timeout(10000); - - let i = 0; - let error = null; - - clickhouse.query(`${sql} format TabSeparatedWithNames`).stream() - .on('data', () => { - ++i - }) - // TODO: on this case you should catch error .on('error', err => error = err) .on('end', () => { expect(error).to.not.be.ok(); @@ -184,8 +111,7 @@ describe('Select', () => { }); }); - - it('use stream with pause/resume', function(callback) { + it('stream with pause/resume', function(callback) { const count = 10, pause = 1000, @@ -216,10 +142,9 @@ describe('Select', () => { }) }); - const nodeVersion = process.version.split('.')[0].substr(1); if (parseInt(nodeVersion, 10) >= 10) { - it('use async for', async function() { + it('async for', async function() { let i = 0; for await (const row of clickhouse.query(sql).stream()) { @@ -231,58 +156,93 @@ describe('Select', () => { expect(i).to.be(rowCount); }); - - it('use async for with csv format', async function() { - let i = 0; - - for await (const row of clickhouse.query(`${sql} format CSVWithNames`).stream()) { - ++i; - expect(row).to.have.key('number'); - expect(row).to.have.key('str'); - expect(row).to.have.key('date'); - } - - expect(i).to.be(rowCount); - }); - - it('use async for with tsv format', async function() { - let i = 0; - - for await (const row of clickhouse.query(`${sql} format TabSeparatedWithNames`).stream()) { - ++i; - expect(row).to.have.key('number'); - expect(row).to.have.key('str'); - expect(row).to.have.key('date'); - } - - expect(i).to.be(rowCount); - }); } - it('use promise and await/async', async () => { - let rows = await clickhouse.query(sql).toPromise(); - - expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); - }); - - - it('use select with external', async () => { - const result = await clickhouse.query('SELECT count(*) AS count FROM temp_table', { - external: [ - { - name: 'temp_table', - data: _.range(0, rowCount).map(i => `str${i}`) - }, - ] - }).toPromise(); + it('select with external', async () => { + const result = await clickhouse.query( + 'SELECT count(*) AS count FROM temp_table', + { + external: [ + { + name: 'temp_table', + data: _.range(0, rowCount).map(i => `str${i}`) + }, + ] + } + ).toPromise(); expect(result).to.be.ok(); expect(result).to.have.length(1); expect(result[0]).to.have.key('count'); expect(result[0].count).to.be(rowCount); }); + + it('catch error', async () => { + try { + await clickhouse.query(sql + '1').toPromise(); + } catch (err) { + expect(err).to.be.ok(); + } + }); + + [ + { + format: 'fake_name', + fullFormatExpr: 'FakeName', + }, + ...ClickHouse.getFormats(), + ].forEach(({ format, fullFormatExpr }) => { + + // The string "foRmat" is used because different forms of writing are found in real code. + const fullSql = sql + (format === 'fake_name' ? '' : ` foRmat ${fullFormatExpr}`); + + it(`with "${fullFormatExpr}" into sql`, async () => { + const rows = await clickhouse.query(fullSql).toPromise(); + + expect(rows).to.have.length(rowCount); + expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); + }); + + if (format !== 'fake_name') { + it(`with "${fullFormatExpr}" in options`, async () => { + const rows = await clickhouse.query(sql, {}, { format }).toPromise(); + + expect(rows).to.have.length(rowCount); + expect(rows[0]).to.eql({ number: 0, str: '0', date: '1970-01-02' }); + }); + } + + it(`with promise "${fullFormatExpr}"`, async () => { + try { + await clickhouse.query(`SELECT * FROM random_table_name ${fullFormatExpr}`).toPromise(); + } catch (err) { + expect(err).to.be.ok(); + } + }); + + it(`with stream ${fullFormatExpr}"`, cb => { + let i = 0, + error = null; + + const stream = clickhouse.query(`SELECT * FROM random_table_name ${fullFormatExpr}`).stream(); + + stream + .on('data', () => ++i) + .on('error', err => error = err) + .on('close', () => { + expect(error).to.be.ok(); + expect(error.toString()).to.match(new RegExp(`Table ${database}.random_table_name doesn\'t exist`)); + + expect(i).to.eql(0); + + cb(); + }) + .on('end', () => { + cb(new Error('no way #2')); + }); + }); + }); }); @@ -310,21 +270,16 @@ describe('session', () => { }); }); - // You can use all settings from request library (https://github.com/request/request#tlsssl-protocol) describe('TLS/SSL Protocol', () => { it('use TLS/SSL Protocol', async () => { - const - https = require('https'), - fs = require('fs'); - let server = null; try { server = https.createServer( { key : fs.readFileSync('test/cert/server.key'), - cert : fs.readFileSync('test/cert/server.crt') + cert : fs.readFileSync('test/cert/server.crt'), }, (req, res) => { res.writeHead(200); @@ -341,12 +296,11 @@ describe('TLS/SSL Protocol', () => { cert: fs.readFileSync('test/cert/server.crt'), key: fs.readFileSync('test/cert/server.key'), } - } + }, + debug : false, }); - - const r = await temp.query('SELECT 1 + 1').toPromise(); - + const r = await temp.query('SELECT 1 + 1 Format JSON').toPromise(); expect(r).to.be.ok(); expect(r[0]).to.have.key('plus(1, 1)'); expect(r[0]['plus(1, 1)']).to.be(2); @@ -364,7 +318,6 @@ describe('TLS/SSL Protocol', () => { }); }); - describe('queries', () => { it('insert field as array', async () => { clickhouse.sessionId = Date.now(); @@ -398,16 +351,14 @@ describe('queries', () => { } ]; - - const r2 = await clickhouse.insert('INSERT INTO test_array (date, str, arr, arr2, arr3)', rows).toPromise(); + const r2 = await clickhouse.insert( + 'INSERT INTO test_array (date, str, arr, arr2, arr3)', + rows + ).toPromise(); expect(r2).to.be.ok(); - - - clickhouse.sessionId = null; }); - - it('queries', async () => { + it('select, insert and two pipes', async () => { const result = await clickhouse.query('DROP TABLE IF EXISTS session_temp').toPromise(); expect(result).to.be.ok(); @@ -473,30 +424,24 @@ describe('queries', () => { const result9 = await clickhouse.query('SELECT count(*) AS count FROM session_temp').toPromise(); const result10 = await clickhouse.query('SELECT count(*) AS count FROM session_temp2').toPromise(); expect(result9).to.eql(result10); + }); + + it('select number as number', async () => { + const result = await clickhouse.query('DROP TABLE IF EXISTS test_int_temp').toPromise(); + expect(result).to.be.ok(); - const result11 = await clickhouse.query('SELECT date FROM test_array GROUP BY date WITH TOTALS').withTotals().toPromise(); - expect(result11).to.have.key('meta'); - expect(result11).to.have.key('data'); - expect(result11).to.have.key('totals'); - expect(result11).to.have.key('rows'); - expect(result11).to.have.key('statistics'); - - const result111 = await clickhouse.query('DROP TABLE IF EXISTS test_int_temp').toPromise(); - expect(result111).to.be.ok(); - - const result12 = await clickhouse.query('CREATE TABLE test_int_temp (int_value Int8 ) ENGINE=Memory').toPromise(); - expect(result12).to.be.ok(); + const result1 = await clickhouse.query('CREATE TABLE test_int_temp (int_value Int8 ) ENGINE=Memory').toPromise(); + expect(result1).to.be.ok(); const int_value_data = [{int_value: 0}]; - const result13 = await clickhouse.insert('INSERT INTO test_int_temp (int_value)', int_value_data).toPromise(); - expect(result13).to.be.ok(); + const result3 = await clickhouse.insert('INSERT INTO test_int_temp (int_value)', int_value_data).toPromise(); + expect(result3).to.be.ok(); - const result14 = await clickhouse.query('SELECT int_value FROM test_int_temp').toPromise(); - expect(result14).to.eql(int_value_data); + const result4 = await clickhouse.query('SELECT int_value FROM test_int_temp').toPromise(); + expect(result4).to.eql(int_value_data); }); }); - describe('response codes', () => { it('table is not exists', async () => { try { @@ -534,7 +479,7 @@ describe('set database', () => { expect(r).to.be.ok(); const temp = new ClickHouse({ - database: noDefaultDb + database: noDefaultDb, }); @@ -717,6 +662,35 @@ describe('Abort query', () => { }); }); +describe('Select and WITH TOTALS statement', () => { + [false, true].forEach(withTotals => { + it(`is ${withTotals}`, async () => { + const query = clickhouse.query( + `SELECT + number % 3 AS i, + groupArray(number) as kList + FROM ( + SELECT number FROM system.numbers LIMIT 14 + ) + GROUP BY i ${withTotals ? '' : 'WITH TOTALS'} + FORMAT TabSeparatedWithNames + ` + ); + + if (withTotals) { + query.withTotals(); + } + + const result = await query.toPromise(); + + expect(result).to.have.key('meta'); + expect(result).to.have.key('data'); + expect(result).to.have.key('totals'); + expect(result).to.have.key('rows'); + expect(result).to.have.key('statistics'); + }); + }); +}); after(async () => { await clickhouse.query(`DROP DATABASE IF EXISTS ${database}`).toPromise();