Con el lanzamiento de Node 11.7, el módulo worker_threads se convierte en una característica estándar y ya no está oculto detrás del conmutador. El módulo worker_threads permite a los desarrolladores ejecutar JavaScript de forma asíncrona en subprocesos livianos y aislados contenidos dentro del proceso principal del nodo. Este artículo se centrará en cómo utilizar subprocesos de trabajo para ejecutar una tarea de forma asincrónica y transmitir datos de esa tarea al resto de su aplicación Node utilizando RxJS Observables.
Con el lanzamiento de Node 11.7, el módulo worker_threads se convierte en una característica estándar y ya no está oculto detrás del conmutador. El módulo worker_threads permite a los desarrolladores ejecutar JavaScript de forma asíncrona en subprocesos livianos y aislados contenidos dentro del proceso principal del nodo. Este artículo se centrará en cómo utilizar subprocesos de trabajo para ejecutar una tarea de forma asincrónica y transmitir datos de esa tarea al resto de su aplicación Node utilizando RxJS Observables.
Antes de comenzar, si desea obtener más información sobre los subprocesos de trabajo y por qué es posible que desee utilizarlos, le recomendaría leer el subproceso múltiple de Node.js: ¿Qué son los subprocesos de trabajo y por qué son importantes? de Alberto Gimeno. Alberto ha hecho un trabajo fantástico al explicar el propósito del módulo worker_thread, brindó algunos ejemplos sólidos de dónde tiene sentido usarlo y demostró algunas formas alternativas de construir una aplicación Node multiproceso.
¿Qué estamos construyendo?
Vamos a crear una aplicación de nodo simple que crea un subproceso de trabajo que ejecuta una tarea simulada de larga duración que informa el estado a intervalos regulares hasta que se completa o hasta que se agota el tiempo. El hilo de trabajo se incluirá en un RxJS Observable para que el resto de la aplicación pueda transmitir los mensajes devueltos desde el hilo de trabajo utilizando la poderosa biblioteca RxJS.
Si desea avanzar y ver la solución final, puede verla en GitHub en briandesousa / node-worker-thread-rxjs .
Configurando su entorno
Lo primero que debemos hacer es asegurarnos de que nuestro entorno esté listo para funcionar:
- Instalar Node 11.7.0+
- Úselo para inicializar un nuevo paquete NPM
- Agregue un script de inicio simple al package.json para iniciar la aplicación:
- Instale el paquete RxJS con
- Cree node-parent-thread-rxjs.js que contendrá el código que se ejecuta en el hilo principal
- Cree node-worker-thread-rxjs.js que contendrá la implementación de la tarea de ejecución prolongada que se ejecuta en un hilo separado
Creando el hilo de trabajo
El hilo de trabajo tiene la lógica para simular una tarea de larga duración:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 dieciséis 17 18 19 | const { workerData, parentPort } = require( 'worker_threads' ); parentPort.postMessage(`starting heavy duty work from process ${process.pid} that will take ${workerData}s to complete`); timeLimit = workerData; timer = 0; // simulate a long-running process with updates posted back on a regular interval do { setTimeout( (count) => { parentPort.postMessage(`heavy duty work in progress...${count + 1}s`); if (count === timeLimit) { parentPort.postMessage( 'done heavy duty work' ); } }, 1000 * timer, timer); } while (++timer !== timeLimit); |
Analicemos un poco este script:
- Usamos de los módulos para comunicarnos con el hilo principal en 3 puntos diferentes:
- antes de que comience la tarea
- mientras la tarea se está ejecutando (dentro del bucle do while) para proporcionar el estado al hilo principal
- cuando la tarea se completa
- Usamos del módulo para pasar un límite de tiempo de cuánto tiempo (en segundos) debe ejecutarse la tarea. La tarea se completa cuando se alcanza este límite de tiempo (línea 19).
Este hilo de trabajo no hace nada particularmente útil, pero demuestra cómo un hilo puede recibir instrucciones de su padre y transmitir múltiples actualizaciones a su padre.
Creando el hilo principal
El hilo principal tiene las siguientes responsabilidades:
- Inicie el hilo de trabajo, especificando cuánto tiempo debe ejecutarse el hilo de trabajo. Lo llamaremos WORKER_TIME.
- Recibir actualizaciones del hilo de trabajo en una secuencia observable
- Sal de la aplicación si el hilo de trabajo tarda demasiado. A esto lo llamaremos MAX_WAIT_TIME.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 dieciséis 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | const Rxjs = require( 'rxjs' ); const RxjsOperators = require( 'rxjs/operators' ); const { Worker } = require( 'worker_threads' ); console.log( "\nNode multi-threading demo using worker_threads module in Node 11.7.0\n" ); const COMPLETE_SIGNAL = 'COMPLETE' ; function runTask(workerData, completedOnTime) { return Rxjs.Observable.create(observer => { const worker = new Worker( './node-worker-thread-rxjs.js' , { workerData }); worker.on( 'message' , message => observer.next(message)); worker.on( 'error' , error => observer.error(error)); worker.on( 'exit' , code => { if (code !== 0 ) { observer.error(`Worker stopped with exit code ${code}`); } else { completedOnTime(); observer.next(COMPLETE_SIGNAL); observer.complete(); } }); }); } const MAX_WAIT_TIME = 3 ; const WORKER_TIME = 10 ; function main() { completedOnTime = false ; console.log(`[Main] Starting worker from process ${process.pid}`); const worker$ = runTask(WORKER_TIME, () => completedOnTime = true ); // receive messages from worker until it completes but only wait for MAX_WAIT_TIME worker$.pipe( RxjsOperators.takeWhile(message => message !== COMPLETE_SIGNAL), RxjsOperators.takeUntil(Rxjs.timer(MAX_WAIT_TIME * 1000 )) ).subscribe( result => console.log(`[Main] worker says: ${result}`), error => console.error(`[Main] worker error: ${error}`), () => { if (!completedOnTime) { console.log(`[Main] worker could not complete its work in the allowed ${MAX_WAIT_TIME}s, exiting Node process`); process.exit( 0 ); } else { console.log(`[Main] worker completed its work in the allowed ${WORKER_TIME}s`); } } ); } main(); |
Muchas cosas están pasando aquí. Centrémonos primero en la función:
- Usamos del paquete para crear un nuevo observable. Este observable crea una instancia del hilo de trabajo y pasa algunos datos.
- Asignamos la salida de eventos del hilo de trabajo a las funciones apropiadas en la interfaz de Observer :
- los eventos se devuelven como valores normales enviados al suscriptor a través de
- los eventos se asignan a
- cuando se recibe un mensaje, verificamos el mensaje para determinar por qué salió el hilo de trabajo:
- si se devuelve un valor distinto de cero, entonces sabemos que algo salió mal y asignamos el resultado a
- Si se devuelve cero, llamamos a una función de devolución de llamada para notificar a la aplicación que la tarea se completó a tiempo, enviamos un valor final COMPLETE_SIGNAL especial y luego completamos el observable con
El no tiene un nombre muy descriptivo, sin embargo ahora se puede ver que encapsula la lógica de correlación entre los eventos de los subprocesos de trabajo y la interfaz observable.
A continuación, veamos la función:
- Creamos lo observable llamando . Pasamos una devolución de llamada simple que establece el indicador en verdadero para que podamos informar la razón por la que se completó el observable.
- Tenga en cuenta que el observable que acabamos de crear es un observable frío . Crea el hilo de trabajo y comienza a emitir eventos solo una vez que se ha suscrito.
- Tenga en cuenta que el observable que acabamos de crear es un observable frío . Crea el hilo de trabajo y comienza a emitir eventos solo una vez que se ha suscrito.
- Canalizamos algunas funciones del operador RxJS en el observable:
- para detener la transmisión cuando se recibe el valor especial COMPLETE_SIGNAL
- para detener la transmisión cuando se acabe el tiempo
- Nos suscribimos a lo observable y registramos los valores o errores que se reciben en la consola. Cuando el observable se completa, registramos la razón por la que se completó. Si el motivo es que se nos acabó el tiempo, salimos a la fuerza de la aplicación con .
Ejecutando la solución
Ejecute la solución con su comando. Suponiendo que MAX_WAIT_TIME todavía está establecido en 3 y WORKER_TIME está establecido en 10, verá el siguiente resultado:
1 2 3 4 5 6 7 8 | Node multi-threading demo using worker_threads module in Node 11.7.0 [Main] Starting worker from process 4764 [Main] worker says: starting heavy duty work from process 4764 that will take 10s to complete [Main] worker says: heavy duty work in progress...1s [Main] worker says: heavy duty work in progress...2s [Main] worker says: heavy duty work in progress...3s [Main] worker could not complete its work in the allowed 3s, exiting Node process |
El hilo de trabajo comenzó a hacer su trabajo, pero después de 3 segundos, la aplicación indicó que detuviera la transmisión. El proceso principal se salió con fuerza junto con el hilo de trabajo antes de que tuviera la oportunidad de completar su tarea.
También puede intentar ajustar la solución para ver qué sucede cuando:
- WORKER_TIME es inferior a MAX_WAIT_TIME
- Se generan múltiples subprocesos de trabajo a partir del subproceso principal al llamar varias veces y crear múltiples observables con diferentes configuraciones
Envolver
Solo hemos arañado la superficie de lo que es posible cuando se combina el poder de transmisión y la belleza de RxJS Observables con el módulo worker_threads. ¡Feliz enhebrado!
0 Comentarios
Dejanos tu comentario para seguir mejorando!