Rx I – Qué son las Reactive eXtensions

12 Dec 2011 · 5 mins. de lectura

Dentro de las librerías de moda para .net la, posiblemente, más difícil de explicar tiene el nombre de Reactive eXtensions o Rx. Una idea que nace en el contexto actual de aplicaciones distribuidas. Donde la información se encuentra en la nube, y mediante llamadas asíncronas podemos procesarla usando clientes que interaccionan con el usuario basándose en eventos.

Encontramos ejemplos de esto en cualquiera de las aplicaciones más comunes que usamos. Por ejemplo el cliente de Whatsapp de nuestro teléfono móvil, el TweetDeck para ver el twitter o el widget del escritorio que nos informa del tiempo.

Todas estas aplicaciones recogen datos de una serie de servicios que se encuentran en internet. Y mediante la interacción con el usuario los tratan. Además, como en la mayor parte de las tecnologías y APIs para interfaces gráficas, para entender que quiere hacer el usuario, se basan en eventos como un "onClick" del ratón sobre un botón concreto.

Entonces llegamos, al igual que un físico en busca de una teoría unificada, al origen de las reactive extensions: la creación de un lenguaje común para gestionar llamadas asíncronas a un servicio y los eventos que ocurren en la interfaz gráfica.

Es posible que esto que estamos hablando a muchos os suene, ya que gracias a Second Nug hicimos un WebCast sobre el tema hace unos meses. A lo largo de la serie de artículos, con la que comenzamos hoy, intentaremos extender esa charla, además de abordar los diferentes temas con más calma y perspectiva.

Programación reactiva

Si estudiamos detenidamente el nombre de la librería que nos ocupa, encontramos dos palabras:

Reactive programming es un paradigma de programación basado en los flujos de datos y la propagación de los cambios. Esto significa que deberíamos poder crear flujos de datos con facilidad (usando el propio lenguaje de programación), y nuestro propio entorno de ejecución debería ser capaz de propagarse a través de los cambios que se producen en estos flujos de datos.

Ahora parémonos un momento a coger aire... Vamos a ver un ejemplo práctico para entender todo esto: imaginemos un contexto de programación secuencial tradicional.

int a, b, c;
b = 10;
c = 2;
a = b + c; // a = 12
c = 4; // a = 12
b = 3; // a = 12

Como vemos, una vez asignamos el valor de la variable 'a' como una suma de 'b' más 'c', se evalúa esa suma. Y aunque cambiemos el valor de 'b' o 'c', el valor de 'a' ya ha sido evaluado y sigue siendo el mismo que la primera vez (12).

La programación reactiva propone un comportamiento diferente. Algo parecido a una tabla de excel donde nosotros asignamos a la celda 'A1' para que calcule el valor de 'B1' más 'C1'. Aquí, si cambiamos el valor de la columna 'B1' o 'C1' el valor de 'A1' se actualiza (evalúa) automáticamente.

Ahora ya podemos decir que Rx es ese conjunto de herramientas que extienden el lenguaje secuencial tradicional que conocemos, para poder crear código de programación reactiva.

Vamos a ver cómo lo hace :).

Reactive Framework

La Reactive Framework ha sido desarrollada en los Microsoft Live Labs y es otra de las creaciones de Erik Meijer, el padre de Linq.

Como hemos visto el objetivo de Rx es unir y simplificar la programación basada en eventos complejos y la asíncrona proporcionándonos un nuevo modelo de programación para estos escenarios. Para conseguirlo, se propuso un principio fundamental: crear la dualidad entre el patrón iterator y el patrón observer.

La propia gente de los Microsoft Live Labs la define así:

Donde observables hace referencia al patrón observer. Linq lo usamos para gestionar de una forma sencilla el patrón iterator. Y schedulers se refiere a la planificación de estas tareas en diferentes contextos de ejecución.

Para empezar, en este artículo trataremos los dos patrones base de la fórmula para en futuras publicaciones poder dar una explicación de cada uno de sus parámetros.

Iterator

Comenzamos con los conceptos base refrescando el patrón iterator, que nos provee de una forma de acceso a los elementos de un objeto agregado, de forma secuencial, sin exponer su representación interna.

En todas las versiones de la framework de .net se implementa este patrón usando el contrato IEnumerable. Y normalmente cada una de las iteraciones de un enumerable la gestionamos con el bucle foreach:

var iterable = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
foreach (var iterator in iterable)
{
   if (iterator < 10)
   {
      System.Console.WriteLine(iterator);
   }
}

System.Console.ReadLine();

Gracias a Linq se ha encontrado una sintaxis cómoda y rápida para trabajar con este tipo de objetos. Por lo que una forma de hacer lo mismo mediante esta tecnología sería:

var query = iterable.Where(iterator => iterator < 10).Select(iterator => { System.Console.WriteLine(iterator); return iterator; });

// tengo que 'compilar' la query para que se ejecute.
query.ToArray();

System.Console.ReadLine();

Observer

El otro concepto base es el patrón observador, también conocido como publicación/subscripción o modelo-patrón. Este surge de una serie de sencillos conceptos: Un objeto observable puede ser vigilado por objetos observadores. De esta forma, un observable almacena referencias sus observadores y tendrá la capacidad de notificarles cambios.

Gracias a este patrón podemos crear una especie de referencias débiles entre objetos. Ya que los observadores no guardan ninguna relación con el observado.

Para ver un ejemplo de como funciona este patrón, vamos a crear una clase que observará el estado del stock de un almacén. La llamaremos StockObserver:

public class StockObserver
{
   public string Name { get; set; }

   public void Notify(Stock sender, int units)
   {
      Console.WriteLine("[{0}] The stock {1} has now {2} units", this.Name, sender.Name, units);
   }
}

Esta clase nos notificará de los cambios en el stock. Pero no vale nada si no creamos objetos observables a los que suscribirse. En este caso haremos una clase para cada tipo de stock que sea observable y nos podamos suscribir a ella:

public class Stock
{
   private readonly IList<StockObserver> observers;

   private int units;

   public Stock()
   {
      this.observers = new List<StockObserver>();
   }

   public string Name { get; set; }

   public int Units
   {
      get
      {
         return this.units;
      }

      set
      {
         if (this.units != value)
         {
            this.units = value;
            this.NotifyObservers();
         }
      }
   }

   public void Subscribe(StockObserver observer)
   {
      System.Console.WriteLine(observer.Name + " suscrito a " + this.Name);
      this.observers.Add(observer);
   }

   public void Unsubscribe(StockObserver observer)
   {
      if (this.observers.Contains(observer))
      {
         this.observers.Remove(observer);
      }
   }

   private void NotifyObservers()
   {
      foreach (var observer in this.observers)
      {
         observer.Notify(this, this.units);
      }
   }
}

Una vez tenemos nuestros objetos observador y observable, podemos crear un pequeño programa que los utilice:

// creamos stocks
var manzanas = new Stock { Name = "Manzanas" };
var peras = new Stock { Name = "Peras" };
var armarios = new Stock { Name = "Armarios" };

// creamos observadores
var fruteria = new StockObserver { Name = "Frutería" };
var almacen = new StockObserver { Name = "Almacén" };

manzanas.Subscribe(fruteria);
manzanas.Subscribe(almacen);

peras.Subscribe(fruteria);
peras.Subscribe(almacen);

armarios.Subscribe(almacen);

Console.WriteLine("Inicializa manzanas y peras a 20 unidades");
manzanas.Units = 20;
peras.Units = 20;

for (int i = 0; i < 5; i++)
{
	if (i % 2 == 0)
	{
		Console.WriteLine("Resto una manzana");
		manzanas.Units--;
	}
	else
	{
		Console.WriteLine("Resto una pera");
		peras.Units--;
	}

	Thread.Sleep(1000);
}

Console.WriteLine("Inicializa armarios a 3 unidades");
armarios.Units = 3;

Thread.Sleep(1000);

Console.WriteLine("Suma un armario");
armarios.Units = 4;

System.Console.WriteLine("pulsa intro...");
System.Console.ReadLine();

En este pequeño programa veremos como creamos diferentes stocks de algunas frutas y armarios. Entonces crearemos dos observadores uno que observa el stock de las frutas y otro que observa el stock de todo el almacén (incluidas las frutas). Suscribiremos los stocks a los observadores que proceda y jugamos con los valores para ver como son los observadores quienes nos información de los cambios en su zona de actuación.

 

Y hasta aquí este capítulo introductorio a las reactive extensions. En próximas publicaciones intentaremos hablar más de ellas y resolver uno a uno todos los operadores que forman parte de la fórmula que la define. Por lo que espero que no os perdáis el siguiente capítulo de Observables (Subjects).

buy me a beer