Programación reactiva con RxJs

RxJS es la adaptación a JS de la librería ReactiveX, esta librería no es exclusiva de Angular o del lenguaje JS, está desarrollada para diversos lenguajes como C++ (RxCpp), C# (Rx.NET), o PHP (RxPHP) entre otros.

¿Qué es RxJS?:

"Una API para el manejo de flujos de datos (streams) asíncronos (callback based code) y la propagación del cambio".

Como rezan en su propia Web es una combinación de las mejores ideas de los patrones Observador, Iterador y la programación funcional.

Conceptos básicos de RxJS que resuelve la gestión asíncrona de eventos:

  • Observable: Representa la idea de una colección de futuros valores o eventos.
  • Observador: Colección de retrollamadas (callbacks) que saben como escuchar a los valores producidos por el observable.
  • Suscripción: Representa la ejecución de un observable, es de útil para posteriormente cancelar la suscripción.
  • Operadores: Son funciones para tratar las colecciones de datos con operaciones como map, filter, concat, reduce, etc,.
  • Sujeto: Es la forma de hacer multicasting de un valor a multiples observadores.
  • Planificador: Repartidor centralizado para controlar la concurrencia y la coordinación.

Programación reactiva

En la programación reactiva cualquier flujo de información es un stream, lo mismo eventos que se producen en el UI (User Interface) como eventos del ratón, un array de datos o una API de comunicaciones.

En realidad esta idea no es nueva, los buses de eventos (por ejemplo los clics de ratón) son ejemplos de eventos asíncronos, la reactividad lleva esa misma idea más allá, se pueden crear flujos de datos de prácticamente cualquier cosa, la reactividad está en auge en gran medida por la popularidad de los microservicios y las aplicaciones altamente escalables en la actualidad.

En 2014 se publica el manifiesto reactivo que trata de condensar algunas de estas ideas para crear aplicaciones altamente escalables y ligeramente acopladas (loosely-coupled), esto las hace mas fáciles de desarrollar y susceptibles de cambiar.

Un sistema reactivo se caracteriza por ser:

  • Responsive: Es la capacidad de completar la tarea en un tiempo determinado.
  • Resilient: Es la capacidad de soportar y recuperarse ante errores. Se implementa mediante la replicación (ejecutar un componente de forma simultanea en diferentes sitios) , aislamiento y delegación. Los fallos son contenidos en cada componente del sistema aislándolos sin comprometer el sistema completo.
  • Elastic: Los sistemas reactivos pueden aumentar o disminuir los recursos reservados para adaptarse en el cambio de las entradas.
  • Message Driven: Basados en un sistema de intercambio de mensajes de forma asíncrona entre componentes.

Patrón observador

El patrón observador define una relación del tipo uno a muchos entre objetos, cuando el estado de un objeto cambia este notifica su cambio al resto de objetos dependientes. Este patrón también es conocido como publicación-suscriptor (o consumidor), la idea básica del patrón es que un objeto contiene atributos o métodos observables a los cuales otros objetos se pueden suscribir pasando una referencia a si mismos. El objeto observable mantiene una lista de referencias a sus observadores y es capaz de notificarles cambios que sufre.

El patrón observable aún no es parte de la implementación oficial, está propuesta por el grupo TC39 para el estándar ECMAScript así que es muy probable que se introduzca de la mano de RxJS en el futuro. Para leer más sobre el tema dejo este enlace "Interesting ECMAScript 2017 proposals that weren’t adopted".

Crear un observable de un contador

Creo una nueva aplicación:

ng new rxjs-ej01
cd rxjs-ej01
ng serve --open

Voy a crear un observable de un contador en [src/app/app.component.ts]. En la cabecera del fichero añado el operador interval de RxJS, la función emite una secuencia de enteros espaciados por un intervalo de tiempo definido en ms (hemos definido un segundo de espera para el ejemplo).

import { Component } from "@angular/core";
import { interval } from "rxjs";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent {
  title = "rxjs-ej01";

  ngOnInit() {
    const secondsCounter = interval(1000);

    secondsCounter.subscribe(n =>
      console.log(`It's been ${n} seconds since subscribing!`)
    );
  }
}

Suscribimos un mensaje de consola que muestra el número de segundos transcurridos desde la suscripción.

Crear un observable desde un evento de ratón

Añado en la cabecera la función fromEvent de RxJS:

import { fromEvent } from "rxjs";

He añadido un atributo id="my-element" al elemento raíz <div> en [src/app/app.component.html], con document​.get​Element​ById obtengo una referencia a un objeto Element por su ID.

La función fromEvent es observable

  ngOnInit() {
    const el = document.getElementById("my-element");
    const mouseMoves = fromEvent(el, "mousemove");

    const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
      console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
    });
  }

La forma tradicional de registrar un manejador de un evento:

document.addEventListener("click", () => console.log("Clicked!"));

En RxJS creamos un observable en su lugar:

fromEvent(document, "click").subscribe(() => console.log("Clicked!"));

Anulando la suscripción

Siguiendo con el ejemplo de arriba he anulado la suscripción, transcurridos diez segundos deja de mostrar las coordenadas del ratón por consola:

setTimeout(() => {
  subscription.unsubscribe();
}, 10000);

Operadores

Los operadores de RxJS son funciones que pueden ser encadenadas en lo que llamamos la cadena o pipeline de operadores y que se sitúan entre medias del Observable (productor de la información) y el Observer (consumidor de la misma) con el objetivo de filtrar, transformar o combinar los valores del Observable/Observables.

import { map } from "rxjs/operators";
import { of } from "rxjs";

En el ejemplo inferior usamos el operador map que aplica una función a cada valor emitido.

const nums = of(1, 2, 3);

const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

Pull VS Push

Pull y Push representan diferentes protocolos que describen como se relaciona el productor de datos con el consumidor.

¿Qué es un sistema Pull? En un sistema Pull el consumidor determina cuando quiere recibir datos del productor. El productor en si mismo de despreocupa de cuando se enviarán los datos al consumidor. Por ejemplo todas las funciones JS con de tipo Pull, las funciones son productoras de datos y el código que la llama a la función consumiendo el retorno.

Tipo Productor Consumidor
Pull Pasivo: Produce los datos cuando se le solicita Activo: Decide cuando necesita los datos
Push Activo: Produce los datos a su propio ritmo Pasivo: Reacciona a los datos recibidos

¿Qué es Push? El productor determina cuando enviar los datos al consumidor, un ejemplo típico es un servidor de chat que envía los mensajes a medida que llegan a los usuarios.

Observable como generalización de una función

El siguiente ejemplo es una clase Observable,

Suscribirse a un observable es similar a invocar a una función. Los observables son capaces de despachar valores de forma síncrona o asíncrona. En el ejemplo de abajo las llamadas se suceden de forma síncrona.

const foo = new Observable(subscriber => {
  console.log("Hello");
  subscriber.next(42);
});

foo.subscribe(x => {
  console.log(x);
});
foo.subscribe(y => {
  console.log(y);
});

Genera la siguiente salida:

"Hello";
42;
"Hello";
42;

¿Cual es la diferencia principal entre un observable y una función? Los observables pueden devolver múltiples valores a lo largo del tiempo cosa que no pueden las funciones:

function foo() {
  console.log("Hello");
  return 42;
  return 100; // dead code. will never happen
}

Sin embargo con un observable podemos hacer esto:

const foo = new Observable(subscriber => {
  console.log("Hello");
  subscriber.next(42);
  subscriber.next(100);
});

También podemos "retornar" los valores de forma asíncrona:

const foo = new Observable(subscriber => {
  console.log("Hello");
  subscriber.next(42);
  subscriber.next(100);
  subscriber.next(200);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});

console.log("before");
foo.subscribe(x => {
  console.log(x);
});
console.log("after");

Produce la siguiente salida:

"before";
"Hello";
42;
100;
200;
"after";
300;

Como resumen observable.subscribe() significa dame cualquier cantidad de valores, de forma síncrona o asíncrona.

Patrón de diseño iterador

Un patrón de diseño define una interfaz que declara los métodos necesarios para acceder e forma secuencial a un grupo de objetos de una colección, permite recorrer una estructura de datos sin que sea necesario conocer la estructura interna de la misma.

Usamos el iterador next para recorrer un array

const simpleIterator = data => {
  let cursor = 0;
  return {
    next: () => (cursor < data.length ? data[cursor++] : false)
  };
};

var consumer = simpleIterator(["simple", "data", "iterator"]);
console.log(consumer.next()); // 'simple'
console.log(consumer.next()); // 'data'
console.log(consumer.next()); // 'iterator'
console.log(consumer.next()); // false

Código fuente

Enlaces externos