Header Ads Widget

Ticker

6/recent/ticker-posts

Uso de subprocesos de trabajo del nodo 11.7 con RxJS observable

 Con el lanzamiento de Node 11.7, el módulo worker_threads se convierte en una característica estándar y ya no está oculto detrás del --experimental-workerconmutador. El módulo worker_threads permite a los desarrolladores ejecutar JavaScript de forma asíncrona en subprocesos livianos y aislados contenidos dentro del proceso principal del nodo. Este artículo se centrará en cómo utilizar subprocesos de trabajo para ejecutar una tarea de forma asincrónica y transmitir datos de esa tarea al resto de su aplicación Node utilizando RxJS Observables.

Con el lanzamiento de Node 11.7, el módulo worker_threads se convierte en una característica estándar y ya no está oculto detrás del --experimental-workerconmutador. El módulo worker_threads permite a los desarrolladores ejecutar JavaScript de forma asíncrona en subprocesos livianos y aislados contenidos dentro del proceso principal del nodo. Este artículo se centrará en cómo utilizar subprocesos de trabajo para ejecutar una tarea de forma asincrónica y transmitir datos de esa tarea al resto de su aplicación Node utilizando RxJS Observables.

Antes de comenzar, si desea obtener más información sobre los subprocesos de trabajo y por qué es posible que desee utilizarlos, le recomendaría leer el subproceso múltiple de Node.js: ¿Qué son los subprocesos de trabajo y por qué son importantes? de  Alberto Gimeno.  Alberto ha hecho un trabajo fantástico al explicar el propósito del módulo worker_thread, brindó algunos ejemplos sólidos de dónde tiene sentido usarlo y demostró algunas formas alternativas de construir una aplicación Node multiproceso.

¿Qué estamos construyendo?

Vamos a crear una aplicación de nodo simple que crea un subproceso de trabajo que ejecuta una tarea simulada de larga duración que informa el estado a intervalos regulares hasta que se completa o hasta que se agota el tiempo. El hilo de trabajo se incluirá en un RxJS Observable para que el resto de la aplicación pueda transmitir los mensajes devueltos desde el hilo de trabajo utilizando la poderosa biblioteca RxJS.

Si desea avanzar y ver la solución final, puede verla en GitHub en briandesousa / node-worker-thread-rxjs .

Configurando su entorno

Lo primero que debemos hacer es asegurarnos de que nuestro entorno esté listo para funcionar:

  1. Instalar Node 11.7.0+
  2. Úselo  npm initpara inicializar un nuevo paquete NPM
  3. Agregue un script de inicio simple al package.json para iniciar la aplicación: node node-parent-thread-rxjs.js
  4. Instale el paquete RxJS con npm install -s rxjs
  5. Cree node-parent-thread-rxjs.js que contendrá el código que se ejecuta en el hilo principal
  6. Cree node-worker-thread-rxjs.js que contendrá la implementación de la tarea de ejecución prolongada que se ejecuta en un hilo separado

Creando el hilo de trabajo

El hilo de trabajo tiene la lógica para simular una tarea de larga duración:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
dieciséis
17
18
19
const { workerData, parentPort } = require('worker_threads');
 
parentPort.postMessage(`starting heavy duty work from process ${process.pid} that will take ${workerData}s to complete`);
 
timeLimit = workerData;
timer = 0;
 
// simulate a long-running process with updates posted back on a regular interval
do {
    setTimeout(
        (count) => {
            parentPort.postMessage(`heavy duty work in progress...${count + 1}s`);
            if (count === timeLimit) {
                parentPort.postMessage('done heavy duty work');
            }
        },
        1000 * timer,
        timer);
} while (++timer !== timeLimit);

Analicemos un poco este script:

  • Usamos parentPortde los worker_threadsmódulos para comunicarnos con el hilo principal en 3 puntos diferentes:
    • antes de que comience la tarea
    • mientras la tarea se está ejecutando (dentro del bucle do while) para proporcionar el estado al hilo principal
    • cuando la tarea se completa


  • Usamos workerDatadel worker_threadsmódulo para pasar un límite de tiempo de cuánto tiempo (en segundos) debe ejecutarse la tarea. La tarea se completa cuando se alcanza este límite de tiempo (línea 19).

Este hilo de trabajo no hace nada particularmente útil, pero demuestra cómo un hilo puede recibir instrucciones de su padre y transmitir múltiples actualizaciones a su padre.

Creando el hilo principal

El hilo principal tiene las siguientes responsabilidades:

  • Inicie el hilo de trabajo, especificando cuánto tiempo debe ejecutarse el hilo de trabajo. Lo llamaremos WORKER_TIME.
  • Recibir actualizaciones del hilo de trabajo en una secuencia observable
  • Sal de la aplicación si el hilo de trabajo tarda demasiado. A esto lo llamaremos MAX_WAIT_TIME.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
dieciséis
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const Rxjs = require('rxjs');
const RxjsOperators = require('rxjs/operators');
const { Worker } = require('worker_threads');
 
console.log("\nNode multi-threading demo using worker_threads module in Node 11.7.0\n");
 
const COMPLETE_SIGNAL = 'COMPLETE';
 
function runTask(workerData, completedOnTime) {
    return Rxjs.Observable.create(observer => {
        const worker = new Worker('./node-worker-thread-rxjs.js', { workerData });
        worker.on('message', message => observer.next(message));
        worker.on('error', error => observer.error(error));
        worker.on('exit', code => {
            if (code !== 0) {
                observer.error(`Worker stopped with exit code ${code}`);
            } else {
                completedOnTime();
                observer.next(COMPLETE_SIGNAL);
                observer.complete();
            }
        });
    });
}
 
const MAX_WAIT_TIME = 3;
const WORKER_TIME = 10;
 
function main() {
    completedOnTime = false;
 
    console.log(`[Main] Starting worker from process ${process.pid}`);
 
    const worker$ = runTask(WORKER_TIME, () => completedOnTime = true);
 
    // receive messages from worker until it completes but only wait for MAX_WAIT_TIME
    worker$.pipe(
        RxjsOperators.takeWhile(message => message !== COMPLETE_SIGNAL),
        RxjsOperators.takeUntil(Rxjs.timer(MAX_WAIT_TIME * 1000))
    ).subscribe(
        result => console.log(`[Main] worker says: ${result}`),
        error => console.error(`[Main] worker error: ${error}`),
        () => {
            if (!completedOnTime) {
                console.log(`[Main] worker could not complete its work in the allowed ${MAX_WAIT_TIME}s, exiting Node process`);
                process.exit(0);
            } else {
                console.log(`[Main] worker completed its work in the allowed ${WORKER_TIME}s`);
            }
        }
    );
}
 
main();

Muchas cosas están pasando aquí. Centrémonos runTask()primero en la función:

  • Usamos Observerable.create()del rxjspaquete para crear un nuevo observable. Este observable crea una instancia del hilo de trabajo y pasa algunos datos.
  • Asignamos la salida de eventos del hilo de trabajo a las funciones apropiadas en la interfaz de Observer :
    • message los eventos se devuelven como valores normales enviados al suscriptor a través de Observer.next()
    • error los eventos se asignan a Observer.error()
    • cuando exitse recibe un mensaje, verificamos el mensaje para determinar por qué salió el hilo de trabajo:
      • si se devuelve un valor distinto de cero, entonces sabemos que algo salió mal y asignamos el resultado a Observer.error()
      • Si se devuelve cero, llamamos a una función de devolución de llamada para notificar a la aplicación que la tarea se completó a tiempo, enviamos un valor final COMPLETE_SIGNAL especial y luego completamos el observable con Observer.complete()

El runTask()no tiene un nombre muy descriptivo, sin embargo ahora se puede ver que encapsula la lógica de correlación entre los eventos de los subprocesos de trabajo y la interfaz observable.

A continuación, veamos la main()función:

  • Creamos lo observable llamando runTask()Pasamos una devolución de llamada simple que establece el completedOnTimeindicador en verdadero para que podamos informar la razón por la que se completó el observable.
    • Tenga en cuenta que el observable que acabamos de crear es un observable frío . Crea el hilo de trabajo y comienza a emitir eventos solo una vez que se ha suscrito.
  • Canalizamos algunas funciones del operador RxJS en el observable:
    • takeWhile() para detener la transmisión cuando se recibe el valor especial COMPLETE_SIGNAL
    • takeUntil() para detener la transmisión cuando se acabe el tiempo


  • Nos suscribimos a lo observable y registramos los valores o errores que se reciben en la consola. Cuando el observable se completa, registramos la razón por la que se completó. Si el motivo es que se nos acabó el tiempo, salimos a la fuerza de la aplicación con process.exit(0).

Ejecutando la solución

Ejecute la solución con su npm startcomando. Suponiendo que MAX_WAIT_TIME todavía está establecido en 3 y WORKER_TIME está establecido en 10, verá el siguiente resultado:

1
2
3
4
5
6
7
8
Node multi-threading demo using worker_threads module in Node 11.7.0
 
[Main] Starting worker from process 4764
[Main] worker says: starting heavy duty work from process 4764 that will take 10s to complete
[Main] worker says: heavy duty work in progress...1s
[Main] worker says: heavy duty work in progress...2s
[Main] worker says: heavy duty work in progress...3s
[Main] worker could not complete its work in the allowed 3s, exiting Node process

El hilo de trabajo comenzó a hacer su trabajo, pero después de 3 segundos, la aplicación indicó que detuviera la transmisión. El proceso principal se salió con fuerza junto con el hilo de trabajo antes de que tuviera la oportunidad de completar su tarea.

También puede intentar ajustar la solución para ver qué sucede cuando:

  • WORKER_TIME es inferior a MAX_WAIT_TIME
  • Se runTask()generan múltiples subprocesos de trabajo a partir del subproceso principal al llamar varias veces y crear múltiples observables con diferentes configuraciones

Envolver

Solo hemos arañado la superficie de lo que es posible cuando se combina el poder de transmisión y la belleza de RxJS Observables con el módulo worker_threads. ¡Feliz enhebrado!

Publicar un comentario

0 Comentarios