diff --git a/README.md b/README.md index 9ec2bf4..8580103 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ const clickhouse = new ClickHouse({ debug: false, basicAuth: null, isUseGzip: false, + format: "json", // "json" || "csv" || "tsv" config: { session_timeout : 60, output_format_json_quote_64bit_integers : 0, diff --git a/index.js b/index.js index dc6b9ba..7140558 100644 --- a/index.js +++ b/index.js @@ -8,9 +8,9 @@ const stream = require('stream'), querystring = require('querystring'), JSONStream = require('JSONStream'), + through = require('through'), stream2asynciter = require('stream2asynciter'), { URL } = require('url'), - csv = require('csv'), tsv = require('tsv'); @@ -55,8 +55,8 @@ const PORT = 8123; const DATABASE = 'default'; const USERNAME = 'default'; -function parseCSV(body) { - const data = new tsv.Parser(SEPARATORS.CSV, { header: true }).parse(body); +function parseCSV(body, options = { header: true }) { + const data = new tsv.Parser(SEPARATORS.CSV, options).parse(body); data.splice(data.length - 1, 1); return data; } @@ -65,12 +65,76 @@ function parseJSON(body) { return JSON.parse(body); } -function parseTSV(body) { - const data = new tsv.Parser(SEPARATORS.TSV, { header: true }).parse(body) +function parseTSV(body, options = { header: true }) { + const data = new tsv.Parser(SEPARATORS.TSV, options).parse(body); data.splice(data.length - 1, 1); return data; } +function parseCSVStream(s) { + let isFirst = true; + let ref = { + fields: [] + }; + return through(function (chunk) { + let str = chunk.toString(); + let parsed = parseCSV(str, {header: isFirst}); + let strarr = str.split("\n"); + let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length; + + if (!isFirst) { + chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString(); + parsed = parseCSV(str, {header: isFirst}); + s = new Set(); + } + strarr.splice(strarr.length - plen).forEach((value => s.add(value))); + chunkBuilder.call(this, isFirst, ref, str, parsed); + isFirst = false; + }) +} + +function parseJSONStream() { + return JSONStream.parse(['data', true]); +} + +function parseTSVStream(s) { + let isFirst = true; + let ref = { + fields: [] + }; + return through(function (chunk) { + let str = chunk.toString(); + let parsed = parseTSV(str, {header: isFirst}); + let strarr = str.split("\n"); + let plen = (isFirst && strarr.length - 1 || strarr.length) - parsed.length; + + if (!isFirst) { + chunk = Buffer.concat([Buffer.from([...s].join("\n")), chunk]).toString(); + parsed = parseTSV(str, {header: isFirst}); + s = new Set(); + } + strarr.splice(strarr.length - plen).forEach((value => s.add(value))); + chunkBuilder.call(this, isFirst, ref, str, parsed); + isFirst = false; + }); +} + +function chunkBuilder(isFirst, ref, chunk, parsed) { + if (isFirst) { + ref.fields = Object.keys(parsed[0]); + parsed.forEach((value) => { + this.queue(value); + }); + } else { + parsed.forEach((value) => { + let result = {}; + ref.fields.forEach((field, index) => (result[field] = value[index])); + this.queue(result); + result = null; + }); + } +} + function encodeValue(quote, v, format, isArray) { format = ALIASES[format] || format; @@ -284,24 +348,29 @@ class QueryCursor { }); } - _parseRowsByFormat(body) { - let rows = null; - switch (this.opts.sessionFormat || this.opts.format) { + _parseRowsByFormat(body, isStream = false) { + let result = null; + let ws; + switch (this._getFormat()) { case "json": - rows = parseJSON(body); + result = !isStream && parseJSON(body) || parseJSONStream(); break; case "tsv": - rows = parseTSV(body); + result = !isStream && parseTSV(body) || parseTSVStream(new Set()); break; case "csv": - rows = parseCSV(body); + result = !isStream && parseCSV(body) || parseCSVStream(new Set()); break; default: - rows = body; + result = body; } - return rows; + return result; }; + _getFormat() { + return this.opts.sessionFormat || this.opts.format; + } + withTotals() { this.useTotals = true; return this; @@ -337,7 +406,7 @@ class QueryCursor { return rs; } else { - const toJSON = JSONStream.parse(['data', true]); + const streamParser = this._parseRowsByFormat(null, true); const rs = new stream.Readable({ objectMode: true }); rs._read = () => {}; @@ -346,14 +415,17 @@ class QueryCursor { const tf = new stream.Transform({ objectMode: true }); let isFirstChunck = true; tf._transform = function (chunk, encoding, cb) { - // Если для первого chuck первый символ блока данных не '{', тогда: // 1. в теле ответа не JSON // 2. сервер нашел ошибку в данных запроса - if (isFirstChunck && chunk[0] !== 123) { + if (isFirstChunck && ( + (me._getFormat() === "json" && chunk[0] !== 123) && + (me._getFormat() === "csv" && chunk[0] !== 110) && + (me._getFormat() === "tsv" && chunk[0] !== 110) + )) { this.error = new Error(chunk.toString()); - toJSON.emit("error", this.error); + streamParser.emit("error", this.error); rs.emit('close'); return cb(); @@ -373,9 +445,9 @@ class QueryCursor { let s = null; if (me.opts.isUseGzip) { const z = zlib.createGunzip(); - s = requestStream.pipe(z).pipe(tf).pipe(toJSON) + s = requestStream.pipe(z).pipe(tf).pipe(streamParser) } else { - s = requestStream.pipe(tf).pipe(toJSON) + s = requestStream.pipe(tf).pipe(streamParser) } @@ -403,14 +475,14 @@ class QueryCursor { rs.pause = () => { rs.__pause(); requestStream.pause(); - toJSON.pause(); + streamParser.pause(); }; rs.__resume = rs.resume; rs.resume = () => { rs.__resume(); requestStream.resume(); - toJSON.resume(); + streamParser.resume(); }; me._request = rs; diff --git a/package-lock.json b/package-lock.json index d594b36..dee6b60 100644 --- a/package-lock.json +++ b/package-lock.json @@ -112,32 +112,6 @@ "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" }, - "csv": { - "version": "5.1.2", - "resolved": "https://registry.npmjs.org/csv/-/csv-5.1.2.tgz", - "integrity": "sha512-2NA/Fp9FuRAJY6bu5mZsSyBFeqbDZjGg8z1WYETpBpuO30gyWOkxf9KjzBNEZiCsXuNxdQrU3rKrpejyucznMg==", - "requires": { - "csv-generate": "^3.2.3", - "csv-parse": "^4.4.5", - "csv-stringify": "^5.3.3", - "stream-transform": "^2.0.1" - } - }, - "csv-generate": { - "version": "3.2.3", - "resolved": "https://registry.npmjs.org/csv-generate/-/csv-generate-3.2.3.tgz", - "integrity": "sha512-IcR3K0Nx+nJAkcU2eAglVR7DuHnxcuhUM2w2cR+aHOW7bZp2S5LyN2HF3zTkp6BV/DjR6ykoKznUm+AjnWcOKg==" - }, - "csv-parse": { - "version": "4.4.5", - "resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-4.4.5.tgz", - "integrity": "sha512-koPV9m9AjNJCK3ig4ErgRJalZsLxWI7NP0Fd3+CO9hgDZt3FSljTeESnfWTbyRc8qk/3/LgX1s5naDqLxiuK9w==" - }, - "csv-stringify": { - "version": "5.3.3", - "resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-5.3.3.tgz", - "integrity": "sha512-q8Qj+/lN74LRmG7Mg0LauE5WcnJOD5MEGe1gI57IYJCB61KWuEbAFHm1uIPDkI26aqElyBB57SlE2GGwq2EY5A==" - }, "dashdash": { "version": "1.14.1", "resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz", @@ -387,11 +361,6 @@ "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", "dev": true }, - "mixme": { - "version": "0.3.2", - "resolved": "https://registry.npmjs.org/mixme/-/mixme-0.3.2.tgz", - "integrity": "sha512-tilCZOvIhRETXJuTmxxpz8mgplF7gmFhcH05JuR/YL+JLO98gLRQ1Mk4XpYQxxbPMKupSOv+Bidw7EKv8wds1w==" - }, "mkdirp": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", @@ -524,14 +493,6 @@ "tweetnacl": "~0.14.0" } }, - "stream-transform": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/stream-transform/-/stream-transform-2.0.1.tgz", - "integrity": "sha512-GiTcO/rRvZP2R8WPwxmxCFP+Of1yIATuFAmYkvSLDfcD93X2WHiPwdgIqeFT2CvL1gyAsjQvu1nB6RDNQ5b2jw==", - "requires": { - "mixme": "^0.3.1" - } - }, "stream2asynciter": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/stream2asynciter/-/stream2asynciter-1.0.0.tgz", diff --git a/package.json b/package.json index ec76126..3bb86de 100644 --- a/package.json +++ b/package.json @@ -52,11 +52,11 @@ "bundleDependencies": false, "dependencies": { "JSONStream": "1.3.4", - "csv": "^5.1.2", "lodash": "4.17.15", "querystring": "0.2.0", "request": "2.88.0", "stream2asynciter": "1.0.0", + "through": "^2.3.8", "tsv": "^0.2.0" }, "deprecated": false, diff --git a/test/test.js b/test/test.js index ca8d514..18cd64f 100644 --- a/test/test.js +++ b/test/test.js @@ -135,6 +135,46 @@ describe('Select', () => { clickhouse.query(sql).stream() .on('data', () => ++i) // TODO: on this case you should catch error + .on('error', err => callback(err)) + .on('end', () => { + expect(error).to.not.be.ok(); + expect(i).to.be(rowCount); + + callback(); + }); + }); + + it('use stream with csv format', function(callback) { + // this.timeout(10000); + + let i = 0; + let error = null; + + clickhouse.query(`${sql} format CSVWithNames`).stream() + .on('data', () => { + ++i + }) + // TODO: on this case you should catch error + .on('error', err => callback(err)) + .on('end', () => { + expect(error).to.not.be.ok(); + expect(i).to.be(rowCount); + + callback(); + }); + }); + + it('use stream with tsv format', function(callback) { + // this.timeout(10000); + + let i = 0; + let error = null; + + clickhouse.query(`${sql} format TabSeparatedWithNames`).stream() + .on('data', () => { + ++i + }) + // TODO: on this case you should catch error .on('error', err => error = err) .on('end', () => { expect(error).to.not.be.ok(); @@ -191,6 +231,32 @@ describe('Select', () => { expect(i).to.be(rowCount); }); + + it('use async for with csv format', async function() { + let i = 0; + + for await (const row of clickhouse.query(`${sql} format CSVWithNames`).stream()) { + ++i; + expect(row).to.have.key('number'); + expect(row).to.have.key('str'); + expect(row).to.have.key('date'); + } + + expect(i).to.be(rowCount); + }); + + it('use async for with tsv format', async function() { + let i = 0; + + for await (const row of clickhouse.query(`${sql} format TabSeparatedWithNames`).stream()) { + ++i; + expect(row).to.have.key('number'); + expect(row).to.have.key('str'); + expect(row).to.have.key('date'); + } + + expect(i).to.be(rowCount); + }); } @@ -445,17 +511,17 @@ describe('response codes', () => { expect(err.code).to.be(60); } - // try { - // let result = await clickhouse.query('DROP TABLE session_temp2').toPromise(); - // expect(result).to.be.ok(); + try { + let result = await clickhouse.query('DROP TABLE session_temp2').toPromise(); + expect(result).to.be.ok(); - // await clickhouse.query('SELECT COUNT(*) AS count FROM session_temp2').toPromise(); - // expect().fail('You should not be here2'); - // } catch (err) { - // expect(err).to.be.ok(); - // expect(err).to.have.key('code'); - // expect(err.code).to.be(60); - // } + await clickhouse.query('SELECT COUNT(*) AS count FROM session_temp2').toPromise(); + expect().fail('You should not be here2'); + } catch (err) { + expect(err).to.be.ok(); + expect(err).to.have.key('code'); + expect(err.code).to.be(60); + } }); });