Gathering detailed insights and metrics for async-iterable-split
Gathering detailed insights and metrics for async-iterable-split
Gathering detailed insights and metrics for async-iterable-split
Gathering detailed insights and metrics for async-iterable-split
it-split
Splits Uint8Arrays emitted by an (async) iterable by a delimiter
chonkify
Ultra-lightweight chunker for everything — arrays, strings, sets, maps, async iterables and more.
ai-lines
Split an async iterable into lines.
@ushakov-igor/chonk
Ultra-lightweight chunker for everything — arrays, strings, sets, maps, async iterables and more.
npm install async-iterable-split
Typescript
Module System
Min. Node Version
Node Version
NPM Version
Total Downloads
0
Last Day
0
Last Week
0
Last Month
0
Last Year
0
Latest Version
3.0.2
Package Id
async-iterable-split@3.0.2
Unpacked Size
35.89 kB
Size
7.99 kB
File Count
5
NPM Version
10.8.1
Node Version
22.3.0
Published on
Jun 23, 2024
Cumulative downloads
Total Downloads
Last Day
0%
NaN
Compared to previous day
Last Week
0%
NaN
Compared to previous week
Last Month
0%
NaN
Compared to previous month
Last Year
0%
NaN
Compared to previous year
1
Split an async iterable (value is Uint8Array) into multiple "sub async iterable(iterator)"s by line, size or needle.
1npm i async-iterable-split
Suppose we have a foo.txt
file with the following contents:
1this is the first line 2hello world 3this is the last line
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const iterable = createReadStream("./foo.txt"); // Node.js readable stream is async iterable (since v10.0.0) 5const splitable = new Splitable(iterable); // Any object that deploys the [Symbol.asyncIterator] interface can be used as a parameter for instantiating Splitable 6 7while (await splitable.hasValue()) { 8 const subIterator = splitable.splitLine(); // Create a "sub async iterable(iterator)", all the data iterated from this object is one row of the original 9 const chunks = []; 10 for await (const chunk of subIterator) { 11 chunks.push(chunk); 12 } 13 // When the "for await of" loop exits, all the data of a row will be iterated out 14 console.dir(Buffer.concat(chunks).toString("utf-8")); 15}
The output looks like this:
'this is the first line'
'hello world'
'this is the last line'
You can also use the readLine
method to get a whole line at once:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const line = await splitable.readLine(); // Type of "line" is Uint8Array 7 console.dir(Buffer.from(line).toString("utf-8")); 8}
Line break can be either
LF
orCRLF
.
Suppose we have a foo.txt
file with the following contents:
1abcdefghijklmnopqrstuvwxyz
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5 6while (await splitable.hasValue()) { 7 const subIterator = splitable.splitSize(10); // Create a "sub async iterable(iterator)", all the data iterated from this object is 10 bytes of the original 8 const chunks = []; 9 for await (const chunk of subIterator) { 10 chunks.push(chunk); 11 } 12 console.dir(Buffer.concat(chunks).toString("utf-8")); 13}
The output looks like this:
'abcdefghij'
'klmnopqrst'
'uvwxyz'
You can also use the readSize
method to get fixed-size data at once:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const part = await splitable.readSize(10); 7 console.dir(Buffer.from(part).toString("utf-8")); 8}
needle
Suppose we have a foo.txt
file with the following contents:
1foobarbaz
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5 6while (await splitable.hasValue()) { 7 const subIterator = splitable.splitBeforeNeedle(Buffer.from("ba")); // Create a "sub async iterable(iterator)", all the data iterated from this object is data before "ba" of the original 8 const chunks = []; 9 for await (const chunk of subIterator) { 10 chunks.push(chunk); 11 } 12 console.dir(Buffer.concat(chunks).toString("utf-8")); 13}
The output looks like this:
'foo'
'r'
'z'
You can also use the readBeforeNeedle
method to get all the data before needle
at once:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const part = await splitable.readBeforeNeedle(Buffer.from("ba")); 7 console.dir(Buffer.from(part).toString("utf-8")); 8}
The above three splitting methods can be used alone or cross-used. The following is an example of parsing an HTTP request message, there are parts split by line and parts split by size:
Server:
1import { createServer } from "node:net";
2import { Splitable } from "async-iterable-split";
3
4const server = createServer(async (socket) => {
5 try {
6 // Node.js TCP socket is also async iterable
7 const splitable = new Splitable(socket);
8 while (await splitable.hasValue()) {
9 // httpReqParser will parses one HTTP request message at a time, one TCP connection may contain multiple HTTP messages
10 console.log(await httpReqParser(splitable));
11 }
12 } catch (error) {
13 console.log("got error", error);
14 socket.destroy(error);
15 }
16}).listen(8888, "127.0.0.1", () => {
17 console.log(server.address());
18});
19
20// This is just a simple HTTP request message parser, a production-ready parser needs more consideration.
21async function httpReqParser(splitable) {
22 const reqMsg = {};
23
24 // Parse the request line, limiting the request line size to 65535 bytes
25 const reqLine = Buffer.from(await splitable.readLine(65536)).toString("ascii");
26 const reqInfos = reqLine.split(" ");
27 reqMsg.method = reqInfos.shift();
28 reqMsg.uri = reqInfos.shift();
29 reqMsg.httpVersion = reqInfos.shift().replace("HTTP/", "");
30
31 // Parse the request headers, limit the size of a single request header to 16384 bytes, and limit the count of request headers to 256
32 reqMsg.headers = {};
33 let headerCount = 0;
34 while (await splitable.hasValue()) {
35 if (headerCount > 256) {
36 throw new Error("header count exceeded limit");
37 }
38 const header = Buffer.from(await splitable.readLine(16384)).toString("ascii");
39 // If a blank line is encountered, it means the end of the request headers, jump out of the while loop
40 if (header.length === 0) {
41 break;
42 }
43 headerCount++;
44 const [key, value] = header.split(":");
45 reqMsg.headers[key.toLowerCase()] = value.trim();
46 }
47
48 // Parse the request body and limit the size of the request body to 1MB
49 const bodySize = Number(reqMsg.headers["content-length"]) || 0;
50 if (bodySize > 2 ** 20) {
51 throw new Error("body size exceeded limit");
52 }
53 reqMsg.body = Buffer.from(await splitable.readSize(bodySize)).toString("utf-8");
54
55 // return parsing result
56 return reqMsg;
57}
Client:
1import { connect } from "node:net"; 2 3// Establish a TCP connection, and after successful establishment, send two consecutive HTTP request messages 4const socket = connect({ host: "127.0.0.1", port: 8888 }, () => { 5 socket.write(`GET / HTTP/1.1 6Host: 127.0.0.1 7 8`); 9 10 socket.write(`POST /ping HTTP/1.1 11Host: 127.0.0.1 12Content-Type: text/plain; charset=utf-8 13Content-Length: 8 14 15👋ping`); 16});
The server output looks like this:
{
method: 'GET',
uri: '/',
httpVersion: '1.1',
headers: { host: '127.0.0.1' },
body: ''
}
{
method: 'POST',
uri: '/ping',
httpVersion: '1.1',
headers: {
host: '127.0.0.1',
'content-type': 'text/plain; charset=utf-8',
'content-length': '8'
},
body: '👋ping'
}
The "sub async iterable(iterator)" created by splitXXX
methods cannot be iterated at the same time. You must wait for the previous round of iteration to end before starting a new round of iteration.
When calling readXXX
methods, the corresponding "sub async iterable(iterator)" will be automatically created and iterated immediately. The iterated data will be temporarily stored in memory. When the iteration ends, All data chunks will be concated into one data chunk and resolve
out. It is recommended to set a reasonable size limit to avoid memory leaks.
splitable.hasValue()
is used to check if any data can be iterated out,this method is asynchronous. It cannot be called again while it is in pending
state, or a "sub async iterable(iterator)" is iterating.
After a "sub async iterable(iterator)" is done, if you want to get the remaining original data, you can directly iterate the splitable, it's also an async iterable(iterator).
将一个“异步可迭代对象”(值为 Uint8Array)按行、大小或
needle
拆分成多个“子异步可迭代对象(迭代器)”。
1npm i async-iterable-split
假设我们有一个 foo.txt
文件,文件内容如下所示:
1this is the first line 2hello world 3this is the last line
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const iterable = createReadStream("./foo.txt"); // Node.js 的可读流是异步可迭代对象(从 v10.0.0 开始) 5const splitable = new Splitable(iterable); // 任何部署了 [Symbol.asyncIterator] 接口的对象,都可以作为实例化 Splitable 的参数 6 7while (await splitable.hasValue()) { 8 const subIterator = splitable.splitLine(); // 创建一个“子异步可迭代对象(迭代器)”,该对象迭代出的全部数据是原始数据中的一行 9 const chunks = []; 10 for await (const chunk of subIterator) { 11 chunks.push(chunk); 12 } 13 // 当 for await of 循环退出时,一行的数据就全部迭代出来了 14 console.dir(Buffer.concat(chunks).toString("utf-8")); 15}
输出如下所示:
'this is the first line'
'hello world'
'this is the last line'
也可以使用 readLine
方法一次性获取一整行的数据:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const line = await splitable.readLine(); // "line" 的类型是 Uint8Array 7 console.dir(Buffer.from(line).toString("utf-8")); 8}
换行符可以是
LF
也可以是CRLF
。
假设我们有一个 foo.txt
文件,文件内容如下所示:
1abcdefghijklmnopqrstuvwxyz
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5 6while (await splitable.hasValue()) { 7 const subIterator = splitable.splitSize(10); // 创建一个子异步可迭代对象(迭代器),该对象迭代出的全部数据是原始数据中的 10 个字节 8 const chunks = []; 9 for await (const chunk of subIterator) { 10 chunks.push(chunk); 11 } 12 console.dir(Buffer.concat(chunks).toString("utf-8")); 13}
输出如下所示:
'abcdefghij'
'klmnopqrst'
'uvwxyz'
也可以使用 readSize
方法一次性获取固定大小的数据:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const part = await splitable.readSize(10); 7 console.dir(Buffer.from(part).toString("utf-8")); 8}
needle
拆分假设我们有一个 foo.txt
文件,文件内容如下所示:
1foobarbaz
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5 6while (await splitable.hasValue()) { 7 const subIterator = splitable.splitBeforeNeedle(Buffer.from("ba")); // 创建一个子异步可迭代对象(迭代器),该对象迭代出的数据是原始数据中 "ba" 前面的部分 8 const chunks = []; 9 for await (const chunk of subIterator) { 10 chunks.push(chunk); 11 } 12 console.dir(Buffer.concat(chunks).toString("utf-8")); 13}
输出如下所示:
'foo'
'r'
'z'
也可以使用 readBeforeNeedle
方法一次性获取 needle
前面的所有数据:
1import { createReadStream } from "node:fs"; 2import { Splitable } from "async-iterable-split"; 3 4const splitable = new Splitable(createReadStream("./foo.txt")); 5while (await splitable.hasValue()) { 6 const part = await splitable.readBeforeNeedle(Buffer.from("ba")); 7 console.dir(Buffer.from(part).toString("utf-8")); 8}
以上三种拆分方式即可以单独使用,也可以交叉使用,下面是一个解析 HTTP 请求报文的示例,即有按行拆分的部分,也有按大小拆分的部分:
服务端:
1import { createServer } from "node:net"; 2import { Splitable } from "async-iterable-split"; 3 4const server = createServer(async (socket) => { 5 try { 6 // Node.js 中的 TCP socket 也是异步可迭代对象 7 const splitable = new Splitable(socket); 8 while (await splitable.hasValue()) { 9 // httpReqParser 每次解析一个 HTTP 请求报文,一个 TCP 连接可能包含多个 HTTP 报文 10 console.log(await httpReqParser(splitable)); 11 } 12 } catch (error) { 13 console.log("got error", error); 14 socket.destroy(error); 15 } 16}).listen(8888, "127.0.0.1", () => { 17 console.log(server.address()); 18}); 19 20// 这只是一个简单的 HTTP 请求报文解析器,一个可用于生产环境的解析器要考虑更多。 21async function httpReqParser(splitable) { 22 const reqMsg = {}; 23 24 // 解析请求行,将请求行大小限制在 65535 个字节以内 25 const reqLine = Buffer.from(await splitable.readLine(65536)).toString("ascii"); 26 const reqInfos = reqLine.split(" "); 27 reqMsg.method = reqInfos.shift(); 28 reqMsg.uri = reqInfos.shift(); 29 reqMsg.httpVersion = reqInfos.shift().replace("HTTP/", ""); 30 31 // 解析请求头,将单个请求头大小限制在 16384 个字节以内,请求头个数限制在 256 个以内 32 reqMsg.headers = {}; 33 let headerCount = 0; 34 while (await splitable.hasValue()) { 35 if (headerCount > 256) { 36 throw new Error("header count exceeded limit"); 37 } 38 const header = Buffer.from(await splitable.readLine(16384)).toString("ascii"); 39 // 如果遇到了空行代表请求头部分结束了,跳出 while 循环 40 if (header.length === 0) { 41 break; 42 } 43 headerCount++; 44 const [key, value] = header.split(":"); 45 reqMsg.headers[key.toLowerCase()] = value.trim(); 46 } 47 48 // 解析请求体,将请求体大小限制在 1MB 以内 49 const bodySize = Number(reqMsg.headers["content-length"]) || 0; 50 if (bodySize > 2 ** 20) { 51 throw new Error("body size exceeded limit"); 52 } 53 reqMsg.body = Buffer.from(await splitable.readSize(bodySize)).toString("utf-8"); 54 55 // 返回解析结果 56 return reqMsg; 57}
客户端:
1import { connect } from "node:net"; 2 3// 建立 TCP 连接,并在建立成功后,连续发送两条 HTTP 请求报文 4const socket = connect({ host: "127.0.0.1", port: 8888 }, () => { 5 socket.write(`GET / HTTP/1.1 6Host: 127.0.0.1 7 8`); 9 10 socket.write(`POST /ping HTTP/1.1 11Host: 127.0.0.1 12Content-Type: text/plain; charset=utf-8 13Content-Length: 8 14 15👋ping`); 16});
服务端的输出如下所示:
{
method: 'GET',
uri: '/',
httpVersion: '1.1',
headers: { host: '127.0.0.1' },
body: ''
}
{
method: 'POST',
uri: '/ping',
httpVersion: '1.1',
headers: {
host: '127.0.0.1',
'content-type': 'text/plain; charset=utf-8',
'content-length': '8'
},
body: '👋ping'
}
通过 splitXXX
方法创建的“子异步可迭代对象(迭代器)”不能被同时迭代,必须等前一轮迭代结束了,才能开启新的一轮迭代。
调用 readXXX
方法时会自动创建对应的“子异步可迭代对象(迭代器)”并立即对其进行迭代,迭代出的数据会暂存在内存中,当迭代结束时,会将所有数据块合并成一个完整的数据块 resolve
出来。建议使用这些方法时设置一个合理的大小限制,避免内存泄漏。
splitable.hasValue()
用于判断是否还可以迭代出数据,该方法是异步的。当它处于 pending
状态时不可再次调用,亦不可迭代“子异步可迭代对象(迭代器)”。
当迭代完某个“子异步可迭代对象(迭代器)”后,如果想获取剩余的原始数据,可以直接迭代 splitable,它也是一个异步可迭代对象(迭代器)。
No vulnerabilities found.
No security vulnerabilities found.