V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
nyxsonsleep
V2EX  ›  Node.js

node 怎么实现并行化执行传输不可序列化对象?

  •  
  •   nyxsonsleep · 152 天前 · 2758 次点击
    这是一个创建于 152 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在的需求是存在一个类内的方法,原来是串行的,现在需要改并行。现在我需要将一个对象 obj ,或者对象的方法 obj.run 传入子线程,然后回调执行。

    但是我尝试了几种方式,似乎是没办法将复杂的对象进行传递?以至于常规的回调函数的方式没办法在 node 的并行化中实现。

    1. worker_thread
    new Worker(moduleThreadFile, 
    workerData:{'obj':obj}) //ERROR
    

    会报告 Cannot set property code of which has only a getter.

    1. workerpool
    pool=workerpool.pool()
    pool.exec(obj,[])
    

    实际上传入的 obj 在子线程中是 undefined

    第 1 条附言  ·  151 天前
    计算密集型任务。
    51 条回复    2024-08-10 11:32:35 +08:00
    okakuyang
        1
    okakuyang  
       152 天前
    维护一个 map ,用 id 来区分哪个任务完成了,触发相应回调。
    nyxsonsleep
        2
    nyxsonsleep  
    OP
       152 天前
    @okakuyang #1 能简单举例一些伪代码来做示例吗? node 用得少。
    jifengg
        3
    jifengg  
       152 天前
    没用过 workerpool ,但是,以我的理解
    pool.exec(obj,[])
    exec 第一个参数应该是一个 function ?,第二个参数你写了空数组的[],应该是传给这个 function 的参数列表?
    yaodong0126
        4
    yaodong0126  
       152 天前
    传不了,看文档,写的清楚的不能更清楚了,用什么工具之前多看文档,多看文档
    shadowyue
        5
    shadowyue  
       152 天前
    看文档,worker 能传递的数据格式有要求的
    nyxsonsleep
        7
    nyxsonsleep  
    OP
       152 天前
    @jifengg #3 function 也试过,没区别
    nyxsonsleep
        8
    nyxsonsleep  
    OP
       152 天前
    @yaodong0126 #4 我想我需要的什么工具能支持传输完整原始对象的方法。
    photon006
        9
    photon006  
       152 天前
    bluebird.map()比较方便实现并行,还能设置并发量 concurrency

    http://bluebirdjs.com/docs/api/promise.map.html
    nyxsonsleep
        10
    nyxsonsleep  
    OP
       152 天前
    @shadowyue #5 这说明了 workerData 不能传输这种对象。那么有其他方法可以实现吗?
    luckyscript
        11
    luckyscript  
       151 天前
    workerData <any> Any JavaScript value that is cloned and made available as require('node:worker_threads').workerData. The cloning occurs as described in the HTML structured clone algorithm, and an error is thrown if the object cannot be cloned (e.g. because it contains functions).

    ---

    一定要通过子线程的方案来实现吗?把需要执行的方法改造成异步的形式是不是也可以。
    nyxsonsleep
        12
    nyxsonsleep  
    OP
       151 天前
    @luckyscript #11 这个是计算密集任务
    shadowyue
        13
    shadowyue  
       151 天前
    #10 @nyxsonsleep

    你如果一定要传递这个对象参数,最直接的就是把这个对象,转成符合要求的数据类型。
    比如把对象拍平,把值弄成数组传递
    nyxsonsleep
        14
    nyxsonsleep  
    OP
       151 天前
    @photon006 #9 看起来是个异步模型。我这个可能需要的是计算密集型加速方案。
    nyxsonsleep
        15
    nyxsonsleep  
    OP
       151 天前
    @shadowyue #13 无法实现,对象非常复杂。
    shadowyue
        16
    shadowyue  
       151 天前
    #15 对象可以序列化存储吗?可以的话直接写文件或者写数据库,worker 自己去读
    photon006
        17
    photon006  
       151 天前
    @nyxsonsleep

    确实,bluebird.map()适合 io 密集型任务的并行执行,计算密集型不合适。

    计算密集型需要调用 cpu 多线程,可以通过 worker_threads 实现

    主线程分配任务给 worker 线程,最简单就是传递字符串

    const tasks = [
    `
    function taskA() {
    // do something
    }
    `
    ,
    `
    function taskB() {
    // do something
    }
    `
    ]

    worker 线程接收到用 eval 语法执行 taskA 、taskB ,把执行结果返回给主线程。
    xiwh
        19
    xiwh  
       151 天前
    题主的需求大概率实现不了,Node 的 Worker 是并行用多进程实现的,那哪些对象是没法序列化的?文件句柄,线程句柄,tcp/udp 连接句柄,而这些资源在进程间都是隔离的,即便是强行序列化传过去也用不了,当然 Linux 似乎有方式实现进程间资源共享,最好的方式还是支持基于内存通信的多线程的语言 go java c++等
    yaodong0126
        20
    yaodong0126  
       151 天前
    我的天,有这么难吗,为什么一定要让线程去执行回调,线程把任务完成后通知主不可以吗?
    sinalvee
        21
    sinalvee  
       151 天前
    没实践过,不知道有没有别的坑,思路就是把函数转为字符串,worker 中再把字符串转回来

    ```js
    const { Worker, isMainThread, parentPort, workerData } = require('node:worker_threads');

    if (isMainThread) {
    const obj = {
    name: 'Foo',

    greet(other) {
    return `Hello ${this.name} and ${other}`;
    }
    }

    const objStr = JSON.stringify(obj, (key, value) => {
    if (typeof value === 'function') {
    return value.toString();
    }
    return value;
    });

    const worker = new Worker(__filename, {
    workerData: objStr,
    });
    worker.on('message', (value) => {
    console.log('Receive data from worker =>', value);
    });
    worker.on('error', console.error);
    worker.on('exit', (code) => {
    if (code !== 0)
    console.error(new Error(`Worker stopped with exit code ${code}`));
    });
    } else {
    const objStr = workerData;
    const objParsed = JSON.parse(objStr);

    const run = (obj, funcName, ...args) => {
    if (obj.hasOwnProperty(funcName)) {
    const funcStr = obj[funcName];
    // 提取函数体,忽略函数参数定义
    const funcBody = funcStr.substring(obj.greet.indexOf('{') + 1, obj.greet.lastIndexOf('}'));
    // 使用剩余参数语法来定义一个新的函数,允许接收任意数量的参数
    const funcArgs = funcStr.substring(funcStr.indexOf('(') + 1, funcStr.indexOf(')')).split(',').map(arg => arg.trim()).filter(arg => arg);
    const func = new Function(...funcArgs, funcBody);

    return func.call(obj, ...args);
    }
    }

    const result = run(objParsed, 'greet', 'Bar');

    parentPort.postMessage(result);
    }
    ```
    nyxsonsleep
        22
    nyxsonsleep  
    OP
       151 天前
    @shadowyue #16 JSON.stringfy 试过,不行,函数丢失了。这个对象有很多静态成员和方法。
    nyxsonsleep
        23
    nyxsonsleep  
    OP
       151 天前
    @yaodong0126 #20 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
    Melting
        24
    Melting  
       151 天前
    可以用 module.exports 导出方法,在 worker_threads 里调用吧
    nyxsonsleep
        25
    nyxsonsleep  
    OP
       151 天前
    @Melting #24
    > 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。所以我尝试进行回调。
    okakuyang
        26
    okakuyang  
       151 天前 via iPhone
    worker 不能传函数,你可以一个任务给一个 id ,任务处理完传 id 给主线程,根据 id 执行对应回调。超级简单。
    nyxsonsleep
        27
    nyxsonsleep  
    OP
       151 天前
    @okakuyang #26 没看明白。我就是需要将任务传递给 worker ,不然这个任务怎么处理完呢?子进程根本不知道应该做什么吧。
    现在我需要并行化的就是 obj.run 函数,子进程中没有这个函数,也没有这个对象,怎么执行 obj.run 呢?
    momocraft
        28
    momocraft  
       151 天前
    重写一份不绑定对象的,参数可以序列化的 run
    EchoWhale
        29
    EchoWhale  
       151 天前 via iPhone
    别想了,自己实现一个序列化/反序列化方法吧。
    nomagick
        30
    nomagick  
       151 天前
    你就别当 node 有线程,node 相当于没有线程,先序列化再反序列化,多进程模型

    而且 node 里面的 fork, 也不是你认为的 fork, 纯就是重新再启动一个新的
    nomagick
        31
    nomagick  
       151 天前
    函数和对象引用都是不共享的,也不能传递,只能通信
    mark2025
        32
    mark2025  
       151 天前
    @nyxsonsleep 在 worker 内加载/初始化这个对象不行么?
    okakuyang
        33
    okakuyang  
       151 天前
    @nyxsonsleep 你的 worker 里面本身要有 [处理计算] 的代码,主线程只是负责把 [要处理的数据] [id] 传给 worker 线程,worker 计算完成之后只负责把计算好的数据 [字符串/基本类型/字节] [id] 发给主线程,主线程收到处理好的数据根据 [id] 再进行下一步处理 [合并数据] 。
    nyxsonsleep
        34
    nyxsonsleep  
    OP
       151 天前
    @mark2025 #32 > 首先这个工程的主要功能需要编译成一个单文件,由于某些原因可能不会轻易改变这种行为。
    此外由于 node 的多线程需要一个单独的 js 文件作为入口,所以我创建了一个新工程,来创建新的 js 文件入口,因此这个工程无法直接导入原工程的内容。

    基于以上原因,实际上 worker 里基本没办法初始化这个对象。或者能提供某种方法可以避免上述的限制吗?
    image72
        35
    image72  
       150 天前
    我看到 workerData 支持 Blob, dataview,arraybuffer 类型(The structured clone algorithm
    )[https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm#webapi_types]
    mark2025
        36
    mark2025  
       150 天前
    @nyxsonsleep 能不能就这一个 js 文件既是主主入口文件也是 worker 入口文件,然后内部根据 mian/worker 分支判断呢?
    nyxsonsleep
        37
    nyxsonsleep  
    OP
       149 天前
    @mark2025 #36 这个 js 文件有 10+M ,worker 子线程启动的时候会不会有性能问题。
    yaodong0126
        38
    yaodong0126  
       149 天前
    @nyxsonsleep 你编不编译成单文件不重要啊,你都新建一个工程了,那么你的新工程存在两个文件就不可以?文件 A 作为和主交互的入口,并在里面 require 你的单文件 B 执行,B 执行完毕通过 A 把消息传递给主,不就完了吗,下面 26 楼说的也是这个意思
    accelerator1
        39
    accelerator1  
       149 天前
    nodejs 中的 worker 是多进程,不是多线程,不能内存共享,正常情况没法传递引用。
    但是 worker 支持 transferable 对象,也就是可以直接传递引用避免进程间的数据拷贝,其实还是要自己实现序列/反序列化。

    如果你的 obj 不可序列化,那就把 obj 的实例化函数放到 worker 中,通过传递 obj 实例化的相关参数来实现。
    kyuuseiryuu
        40
    kyuuseiryuu  
       149 天前
    源码级传递 —— 你在 worker 侧实现一个一摸一样的类。这样把数据丢过去就能计算了。
    SenseHu
        41
    SenseHu  
       149 天前
    有没可能方向就错了, node 适合计算密集型任务?
    yaodong0126
        42
    yaodong0126  
       149 天前
    不懂的能不能不要乱说? worker 什么时候变成进程了?你去 ps 看看进程号不难吧?天天在那误人子弟
    yaodong0126
        43
    yaodong0126  
       149 天前
    进程间还能避免数据拷贝,说话都有逻辑吗?没事看看书吧
    zhufpy
        44
    zhufpy  
       148 天前
    想办法序列化对象吧~,或者在通过一些参数,在 worker 里实例化对象
    lmw2616
        45
    lmw2616  
       148 天前
    切片上传?
    lmw2616
        46
    lmw2616  
       148 天前
    @lmw2616 理解错了( ̄▽ ̄)"
    yaodong0126
        47
    yaodong0126  
       148 天前
    官方都说了不允许传递 function ,还在那序列化,序列化,现在程序员的平均水平是真的低
    image72
        48
    image72  
       144 天前
    @Livid 请求删除屏蔽 @yaodong0126 用户引战,群嘲不友善行为
    yaodong0126
        49
    yaodong0126  
       142 天前
    @image72 菜就多练
    accelerator1
        50
    accelerator1  
       130 天前
    @yaodong0126 看起来是在说我,虽然但是,你说的对,是进程不是线程,因为是线程隔离,无法直接传递引用,我的确误人子弟了。
    accelerator1
        51
    accelerator1  
       130 天前
    @yaodong0126 对不起,还是写反了,能理解意思就好😁
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5391 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 08:32 · PVG 16:32 · LAX 00:32 · JFK 03:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.