From f42b32907f9a3aff358519322a68faa366804562 Mon Sep 17 00:00:00 2001 From: Pavel Date: Wed, 18 Sep 2019 17:55:17 +0400 Subject: [PATCH] added tsv csv parsers on select query # not completed parsers work on stream --- index.js | 95 +++++++++++++++++++++++++++------------------------- test/test.js | 35 ++++++++++++++----- 2 files changed, 76 insertions(+), 54 deletions(-) diff --git a/index.js b/index.js index 44d8a84..dc6b9ba 100644 --- a/index.js +++ b/index.js @@ -56,19 +56,19 @@ const DATABASE = 'default'; const USERNAME = 'default'; function parseCSV(body) { - return new Promise((resolve, reject) => { - csv.parse(body, { delimiter: ','}, (err, output) => { - if (err) { - reject(err); - return; - } - resolve(output); - }) - }); + const data = new tsv.Parser(SEPARATORS.CSV, { header: true }).parse(body); + data.splice(data.length - 1, 1); + return data; +} + +function parseJSON(body) { + return JSON.parse(body); } function parseTSV(body) { - return Promise.resolve().then(() => tsv.parse(body, {header: false})); + const data = new tsv.Parser(SEPARATORS.TSV, { header: true }).parse(body) + data.splice(data.length - 1, 1); + return data; } function encodeValue(quote, v, format, isArray) { @@ -274,27 +274,32 @@ class QueryCursor { if (me.opts.debug) { console.log('exec res headers', res.headers); } - - this._parseRowByFormat(res.body).then((result) => cb(null, me.useTotals ? result : result.data || result)).catch(cb); + + try { + const result = this._parseRowsByFormat(res.body); + cb(null, me.useTotals ? result : result.data || result) + } catch (e) { + cb(e); + } }); } - _parseRowByFormat(body) { - let format = Promise.resolve(); - switch (this.opts.format) { + _parseRowsByFormat(body) { + let rows = null; + switch (this.opts.sessionFormat || this.opts.format) { case "json": - format = format.then(() => JSON.parse(body)); + rows = parseJSON(body); break; case "tsv": - format = format.then(() => parseTSV(body)); + rows = parseTSV(body); break; case "csv": - format = format.then(() => parseCSV(body)); + rows = parseCSV(body); break; default: - format = format.then(() => body); + rows = body; } - return format; + return rows; }; withTotals() { @@ -448,7 +453,7 @@ class ClickHouse { output_format_json_quote_64bit_integers : 0, enable_http_compression : 0 }, - format: "json" // "json" || "csv" || "tsv" + format: "json", // "json" || "csv" || "tsv" }, opts ); @@ -554,10 +559,10 @@ class ClickHouse { format = this._parseFormat(query, " format JSON"); break; case "tsv": - format = this._parseFormat(query, " format TabSeparated"); + format = this._parseFormat(query, " format TabSeparatedWithNames"); break; case "csv": - format = this._parseFormat(query, " format CSV"); + format = this._parseFormat(query, " format CSVWithNames"); break; default: format = " "; @@ -565,19 +570,20 @@ class ClickHouse { return format; }; - _parseFormat(query, def) { - if (query.match(/format/mg) === null) { - return def; - } - if (query.match(/format JSON/mg) !== null) { - this.opts.format = "json"; - } else if (query.match(/format TabSeparated/mg) !== null) { - this.opts.format = "tsv"; - } else if (query.match(/format CSV/mg) !== null) { - this.opts.format = "csv"; - } - return ""; - } + _parseFormat(query, def) { + if (query.match(/format/mg) === null) { + this.opts.sessionFormat = this.opts.format; + return def; + } + if (query.match(/format JSON/mg) !== null) { + this.opts.sessionFormat = "json"; + } else if (query.match(/format TabSeparated/mg) !== null) { + this.opts.sessionFormat = "tsv"; + } else if (query.match(/format CSV/mg) !== null) { + this.opts.sessionFormat = "csv"; + } + return ""; + } _mapRowAsObject(fieldList, row) { return fieldList.map(f => encodeValue(false, row[f] != null ? row[f] : '', 'TabSeparated')).join('\t'); @@ -634,13 +640,10 @@ class ClickHouse { let sql = query.trim(); // Hack for Sequelize ORM - if (sql.charAt(sql.length - 1) === ';') { - sql = sql.substr(0, sql.length - 1); - } - + sql = sql.trimEnd().replace(/;$/gm, ""); + if (sql.match(/^(select|show|exists)/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(query + this._getFormat(query)) + '&' + querystring.stringify(configQS); - + reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + this._getFormat(sql) + ';') + '&' + querystring.stringify(configQS); if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; } @@ -667,7 +670,7 @@ class ClickHouse { reqParams['formData'] = formData; } } else if (query.match(/^insert/i)) { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(query + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS); + reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ' FORMAT TabSeparated') + '&' + querystring.stringify(configQS); if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; @@ -678,10 +681,10 @@ class ClickHouse { } if (data) { - reqParams['body'] = me._getBodyForInsert(query, data); + reqParams['body'] = me._getBodyForInsert(sql, data); } } else { - reqParams['url'] = me.url + '?query=' + encodeURIComponent(query) + '&' + querystring.stringify(configQS); + reqParams['url'] = me.url + '?query=' + encodeURIComponent(sql + ";") + '&' + querystring.stringify(configQS); if (me.opts.username) { reqParams['url'] = reqParams['url'] + '&user=' + me.opts.username; @@ -726,6 +729,6 @@ class ClickHouse { } module.exports = { - ClickHouse + ClickHouse }; diff --git a/test/test.js b/test/test.js index ca7485a..ca8d514 100644 --- a/test/test.js +++ b/test/test.js @@ -79,30 +79,30 @@ describe('Select', () => { }); }); - it('use callback with csv format', callback => { - clickhouse.query(`${sql} format CSV`).exec((err, rows) => { + it('use callback #3 with csv format', callback => { + clickhouse.query(`${sql} format CSVWithNames`).exec((err, rows) => { expect(err).to.not.be.ok(); expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql([0, '0', '1970-01-02' ]); + expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' }); callback(); }); }); - it('use callback #2 with csv format', callback => { - clickhouse.query(`${sql} format CSV`, (err, rows) => { + it('use callback #4 with csv format', callback => { + clickhouse.query(`${sql} format CSVWithNames`, (err, rows) => { expect(err).to.not.be.ok(); expect(rows).to.have.length(rowCount); - expect(rows[0]).to.eql([0, '0', '1970-01-02' ]); + expect(rows[0]).to.eql({ number: 0, str: 0, date: '1970-01-02' }); callback(); }); }); - it('use callback with tsv format', callback => { + it('use callback #5 with tsv format', callback => { clickhouse.query(`${sql} format TabSeparatedWithNames`).exec((err, rows) => { expect(err).to.not.be.ok(); @@ -114,7 +114,7 @@ describe('Select', () => { }); - it('use callback #2 with tsv format', callback => { + it('use callback #6 with tsv format', callback => { clickhouse.query(`${sql} format TabSeparatedWithNames`, (err, rows) => { expect(err).to.not.be.ok(); @@ -134,6 +134,7 @@ describe('Select', () => { clickhouse.query(sql).stream() .on('data', () => ++i) + // TODO: on this case you should catch error .on('error', err => error = err) .on('end', () => { expect(error).to.not.be.ok(); @@ -525,6 +526,24 @@ describe('Constructor options', () => { new ClickHouse({ host: 'http://localhost:8124', port: 8123 + }), + + new ClickHouse({ + host: 'http://localhost:8124', + port: 8123, + format: "json" + }), + + new ClickHouse({ + host: 'http://localhost:8124', + port: 8123, + format: "tsv" + }), + + new ClickHouse({ + host: 'http://localhost:8124', + port: 8123, + format: "csv" }) ];