Rx II – Observables: los sujetos

Hace unos días empezamos a hablar de las Reactive eXtensions. Estudiamos su contexto y las bases teóricas en las que se fundamentan. Además expusimos la fórmula creada por los Microsoft Live Labs para definirla. Y terminamos el artículo comentando que resolveríamos esta fórmula en futuras publicaciones. Hoy es el día de resolver el primer parámetro: Observables

Pero antes, vamos a ver cómo podemos instalar y referenciar las Rx en nuestro proyecto.

Instalando Rx

Existen dos formas básicas de distribución de las extensiones reactivas. La primera es desde la propia web de microsoft, mediante este enlace. Aquí te podrás descargar un paquete con los ensamblados principales, además de los específicos para cada distribución.

La segunda opción es utilizar nuget, desde donde podrás descargarte los diferentes ensamblados (en su versión estable o en la experimental) haciendo una búsqueda de "rx" en su interfaz de gestión de paquetes:

Dentro del ámbito de este artículo, bastará con crear un proyecto nuevo de consola e instalar desde nuget el paquete "Rx-Main". O si ya descargamos todos los ensamblados, deberíamos añadir la referencia a "System.Reactive".

Una vez tenemos esto, ya podemos remangarnos y empezar a trabajar:

Observables

Cuando decimos "observables" en la fórmula que define las reactive extensions, nos referimos a los objetos del patrón observable: Observador y Observable. Y dentro de la framework 4.0, en el namespace System, podremos encontrar los contratos que nos exponen su comportamiento:

Por lo tanto ya tenemos las "normas" para empezar a trabajar. Como prueba, vamos a crear una clase base genérica que implemente la interfaz IObservable:

public class Observable<T> : IObservable<T>
{
   private readonly IList<IObserver<T>> observers;

   protected Observable()
   {
      this.observers = new List<IObserver<T>>();
   }

   public IDisposable Subscribe(IObserver<T> observer)
   {
      if (!observers.Contains(observer))
      {
         observers.Add(observer);
      }
   }
}

Con el fin de implementar la interfaz IObservable, hemos creado el método de suscripción del contrato. Ahí almacenaremos en una lista privada de observadores, a todos los que se han suscrito a nuestro objeto observable. Pero para poder completar esta clase, deberíamos ser capaces de enviar notificaciones a nuestros observadores. Y ¿qué notificaciones esperará un observador? Si nos detenemos a estudiar la interfaz de IObserver, veremos que un observador estará atento a 3 tipos de sucesos:

Por lo que sabiendo esto podemos añadir una serie de métodos en nuestra clase Observable para que ya tenga implementada la notificación/llamada a todos los observadores suscritos:

public void OnNext(T value)
{
   foreach(var observer in observers)
   {
      observer.OnNext(value);
   }
}

public void OnError(Exception error)
{
   foreach(var observer in observers)
   {
      observer.OnError(error);
   }
}

public void OnCompleted()
{
   foreach(var observer in observers)
   {
      observer.OnCompleted();
   }
}

Una vez la hemos completado, nos damos cuenta de que los métodos que hemos creado satisfacen el contrato de la interfaz IObserver. Por lo que podríamos estar hablando de que para poder implementar estas interfaces necesitaremos un objeto que indirectamente sea a la vez IObservable e IObserver.

Dentro de reactive extensions se ha creado una interfaz que nos define este nuevo contrato: ISubject

public interface ISubject<T> : ISubject<T, T>
{
}

public interface ISubject<T, S> : IObservable<T>, IObserver<S>
{
}

Y además encontraremos, de serie, cuatro implementaciones base de esta interfaz:

Sujetos

Dentro de todas las traducciones que tiene Subjects en nuestro idioma, es probable que la que se nos haga más simple sea sujetos. Como decíamos un sujeto será un objeto observable, que con el fin de poder notificar de una forma simple a los observadores, implementara también la interfaz de IObserver.

La funcionalidad de un sujeto se puede resumir como que puede notificar a sus observadores, una serie de acontecimientos en forma de iteración:

ISubject<int> subject;
IObserver<int> observer; 

subject.Subscribe(observer); // se suscribe un observador 

subject.OnNext(1); // notificamos la iteración 1 
subject.OnNext(2); // notificamos la iteración 2 
// subject.OnError(new Exception()); // podemos interrumpir la iteración con una excepción 
subject.OnNext(3); // notificamos la iteración 3 
subject.OnCompleted(); // notificamos que ya no hay más iteraciones

Pero además Rx nos va a proveer de una serie de extensiones para facilitarnos la suscripción, que nos van a ayudar a crear dinámicamente observadores:

// Type: System.ObservableExtensions
// Assembly: System.Reactive, Version=1.0.10621.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35
// Assembly location: System.Reactive.dll
public static class ObservableExtensions
{
	public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);

	public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext);

	public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);

	public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted);

	public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
}

Que se podrán explotar mediante expresiones lambda:

subject.Subscribe(
	iterator => Console.WriteLine("Iterator: " + iterator));

subject.Subscribe(
	iterator => Console.WriteLine("Iterator: " + iterator),
	() => Console.WriteLine("Complete"));

subject.Subscribe(
	iterator => Console.WriteLine("Iterator: " + iterator),
	exception => Console.WriteLine("Exception: " + exception.Message),
	() => Console.WriteLine("Complete"));

Como vemos, el comportamiento final de un sujeto observable, resulta semejante en funcionalidad a lo que podría ser un evento. Partiendo de este código:

ISubject<string> Changed= new Subject<string>(); // un sujeto
event Action<string> Loaded; // un evento

Action<string> onNotified = s => Console.WriteLine(s); // creamos nuestro handler

Por un lado podemos suscribirnos a un sujeto o adjuntarnos a un evento:

var disposable = Changed.Subscribe(onNotified); // nos suscribimos al sujeto
Loaded += onNotified; // nos atachamos al evento

También podemos lanzar un evento o publicar en un sujeto:

Changed.OnNext("hello"); // publicamos
Loaded("hello"); // lanzamos el evento

Y para terminar, podemos desuscribirnos para no volver a tener notificaciones:

disposable.Dispose(); // "eliminamos" el resultado de la suscripción
Loaded -= onNotified; // nos des-adjuntamos del evento

 

Una vez hemos aclarado su funcionamiento, podemos ver el comportamiento de esos sujetos que encontramos en las reactive extensions.

Subject

Como decíamos anteriormente un objeto Suject<T> es la implementación base de un sujeto de Rx. Su funcionamiento es simple, notifica de todas las iteraciones una vez nos hemos suscrito. Por ejemplo, si hacemos esto:

var subject = new Subject<int>(); 

subject.OnNext(1); 

subject.Subscribe(iterator => Console.WriteLine("Iterator: " + iterator)); 

subject.OnNext(2); 
subject.OnNext(3);

Esperaremos que la salida sea:

Iterator: 2
Iterator: 3

Es decir, que no tiene ningún tipo de memoria, simplemente se dedica a notificar a los suscriptores actuales las iteraciones que ocurren en ese momento.

ReplaySubject

El tipo ReplaySubject<T> es un sujeto que recuerda las iteraciones para todos los suscriptores:

var replaySubject = new ReplaySubject<int>();
replaySubject.OnNext(1);
replaySubject.OnNext(2);

replaySubject.Subscribe(iterator => Console.WriteLine("Iterator: " + iterator));

replaySubject.OnNext(3);

En este código, a pesar de suscribirnos una vez se han ejecutado dos iteraciones, la salida por consola será esta:

Iterator: 1
Iterator: 2
Iterator: 3

BehaviorSubject

Un tipo de sujeto muy especial es el BehaviorSubject<T>, que tiene la propiedad de recordar solo la última iteración acontecida, por lo que si realizamos un ejemplo semejante al anterior:

var behaviorSubject = new BehaviorSubject<int>(0);
behaviorSubject.OnNext(1);
behaviorSubject.OnNext(2);

behaviorSubject.Subscribe(iterator => Console.WriteLine("Iterator: " + iterator));

behaviorSubject.OnNext(3);

La salida será esta:

Iterator: 2
Iterator: 3

Un detalle a tener en cuenta en este sujeto es que requiere de inicio un valor que será la primera iteración que recuerde. En el código anterior lo en el constructor le hemos pasado como parámetro le valor '0', por lo que si nos llegamos a suscribir al principio, hubiéramos recibido la línea "Iterator: 0".

AsyncSubject

El último de los sujetos que vienen con Rx es el AsyncSubject<T>. Como su nombre indica, este es un sujeto asíncrono. Esto implica que no notifica nada hasta que no se ha completado (OnComplete). Y una vez esto ocurre, solo notifica del último iterador. Si lanzamos este código:

var asyncSubject = new AsyncSubject<int>();

asyncSubject.Subscribe(iterator => Console.WriteLine("Iterator: " + iterator));

asyncSubject.OnNext(1);
asyncSubject.OnNext(2);
asyncSubject.OnNext(3);

No obtendremos ninguna información en la consola de salida. Pero en el momento que añadamos a este código la llamada:

asyncSubject.OnCompleted();

Obtendremos en la consola la notificación de la última iteración dentro del proceso total:

Iterator: 3

 

Podemos ver que es muy interesante el uso que se le puede dar a estos objetos, y como algunos nos dan pistas sobre los diferentes escenarios donde aplicarlos. Pero lo que es verdaderamente útil, es utilizar las extensiones que nos ofrece Rx para crear estos sujetos:

Creando sujetos

La forma más rápida de crear un sujeto es convertirlo desde un enumerable.

public static IObservable<TSource> ToObservable<TSource>(this IEnumerable<TSource> source);

Usando la extensión "ToObservable" podremos convertir cualquier iteración en un sujeto observable de reactive extensions, y al suscribirnos, podremos recibir la publicación de cada una de las iteraciones.

var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnCompleted();

// esto es lo mismo que el anterior sujeto
var enumerable = new List<int> { 1, 2 };
var observableEnumeration = enumerable.ToObservable();

Esta extensión es tan solo la punta del iceberg de Rx. Existe un objeto llamado Observable que contiene extensiones y métodos que nos ayudarán a crear y operar con sujetos. Por ejemplo:

var neverObservable = Observable.Never<int>(); // var s = new Subject<int>();
var emptyObservable = Observable.Empty<int>(); // var s = new Subject<int>(); s.OnComplete();
var returnObservable = Observable.Return(1); // var s = new ReplaySubject<int>(); s.OnNext(1); s.OnComplete();
var throwObservable = Observable.Throw<int>(new Exception()); // var s = new ReplaySubject<int>(); s.OnError(new Exception());

Pero comentar todo lo que nos ofrece este objeto pertenece al siguiente artículo sobre reactive extensions, que esperamos que leáis dentro de unos días.