Rx III – Linq: crear objetos observables

Año nuevo artículo nuevo. Después de dos artículos cargados de conceptos teóricos y pruebas, ha llegado el momento de empezar a sacarle partido de verdad a la reactive framework. Hasta ahora hemos visto cuales son los principios en los que se fundamenta Rx, los objetos que vamos a tener que utilizar (los sujetos) e incluso introdujimos el uso de una clase llamada Observable que contiene métodos y extensiones para que todo esto sea más sencillo.

Si mediante el navegador de objetos o cualquier otra herramienta de estudio de ensamblados, abrimos el archivo "System.Reactive.dll", en el namespace "System.Reactive.Linq" encontramos una clase estática cargada de métodos y extensiones llamada Observable. Dentro de este objeto se encuentra definido el segundo parámetro de la fórmula que define las reactive extensions: Linq.

Para empezar Observable contiene extensiones con, si no todos, la mayoría de los operadores que ya conocemos de Linq. Pero, en este caso los encontraremos aplicados a objetos tipo IObservable<T>

var observable = new ReplaySubject<int>();
observable .OnNext(1);
observable .OnNext(2);
observable .OnNext(3);
observable .OnCompleted();

Como pudimos ver en el anterior artículo también encontraremos métodos para crear estos objetos IObservable<T>. Podríamos poner un ejemplo rápido con la extensión "ToObservable()", método que sirve para crear un observable a partir de un IEnumerable (como listas o colecciones). Para generar un código semejante al anterior deberíamos hacer algo así:

var enumerable = new List<int> { 1, 2, 3 };
var observable = enumerable.ToObservable();

Usando cualquiera de los dos objetos que hemos creado podríamos aplicar funciones de Linq como "Where" o "Sum":

observable
    .Where(i => i < 3)
    .Sum()
    .Subscribe(Console.WriteLine);

Con este código estaríamos filtrando las iteraciones menores de '3'. Y además sumaríamos sus valores (1+2 = 3). Para terminar escribimos en la consola el valor final.

 

A estos operadores que podríamos definir como estándares de Linq se les añaden muchos otros propios de Rx. Para poder echar un vistazo a los más importantes vamos a intentar dividirlos entre creadores y operadores. En este artículo trataremos los primeros, ya que es un tema suficientemente extenso. Y los categorizaremos en: create, rango, tiempo, asíncrono y eventos.

Creadores

Como métodos para crear objetos observables podemos encontrar las cuatro funciones que mencionamos en el artículo anterior:

var neverObservable = Observable.Never<int>(); 

// igual que
var s = new Subject<int>();

La función "Never" genera un observable en el que nunca hay una iteración ni termina y se diferencia de "Empty" en que este último si que termina la tarea:

var emptyObservable = Observable.Empty<int>(); 

// igual que
var s = new Subject<int>(); 
s.OnComplete();

Cuando usemos "Return" tendrá lugar la iteración que digamos y se terminará el proceso:

var returnObservable = Observable.Return(1); 

// igual que
var s = new ReplaySubject<int>(); 
s.OnNext(1); 
s.OnComplete();

Y dentro de estos cuatro métodos simples el último es "Throw" que lanzará un error que le especificamos:

var throwObservable = Observable.Throw<int>(new Exception()); 

// igual que
var s = new ReplaySubject<int>(); 
s.OnError(new Exception());

Create

La función por excelencia para crear cualquier tipo de objeto observable es "Create". Tiene tanta versatilidad que el resto de creadores se pueden implementar (con mayor o menor dificultad) usando esto. Un ejemplo claro sería el de la enumeración que se convierte en observable:

// un sujeto
var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnCompleted();

// lo mismo que una enumeración a observable
var enumerable = new List<int> { 1, 2 };
var observableEnumeration = enumerable.ToObservable();

// y también podemos hacerlo con 'Create'
var created = Observable.Create<int>(observable =>
	{
		observable.OnNext(1);
		observable.OnNext(2);
		observable.OnCompleted();

		return () => { };
	});

Como podemos ver, cuando utilizamos "Create" se nos solicita una función que como parámetro de entrada usa un IObserver<T> (que no es más que la parte observadora de un sujeto) y tiene que devolver una acción. En este caso devolvemos una acción vacía, pero lo ideal sería devolver una acción que realizara la tarea de desuscripción. Para ello se nos provee del objeto Disposable.

Rango

Otra forma de crear este tipo de objetos observables es especificando un rango. Para esta tarea la función más simple que encontraremos es "Range". En este ejemplo crearemos iteraciones del 0 al 9:

var rangeObservable = Observable.Range(0, 10);

Como decíamos anteriormente, podríamos crear este mismo objeto usando el método "Create":

var createObservable = Observable.Create<int>(observable =>
										{
											for (var i = 0; i < 10; i++)
											{
												observable.OnNext(i);
											}

											observable.OnCompleted();
											return () => { };
										});

O también podríamos usar otra función como "Generate":

var forObservable = Observable.Generate(0, i => i < 10, i => i + 1, i => i);

"Generate" es una función muy parecida a un bucle "for". El primer parámetro es el valor inicial, el segundo una expresión que siempre que ocurra se irá al siguiente paso. La tercera es el método que se realiza en cada uno de los pasos y por último usaremos una expresión para devolver el objeto que itera. Lo podríamos traducir así:

// Generate(0 , i => i < 10, i => i + 1, i => i)
for(var i = 0; i < 10, i = i + 1)
{
   yield return i;
}

Tiempo

También podemos crear observables dependiendo del tiempo. Una forma básica, que nos crearía un observable con iteraciones cada una unidad específica de tiempo es "Interval":

var timeObserver = Observable.Interval(TimeSpan.FromSeconds(1));

Con este código crearemos una iteración cada un segundo. Además esta iteración será incremental: empezando desde 0, cada vez que ocurra se le sumará uno.

Otra posibilidad es usar "Timer", que a las personas que hayan trabajado con el "Timer" de "System.Threading" se les hará familiar. Aquí podremos asignar el momento en el que queremos que empiece además de su intervalo:

var timerObserver = Observable.Timer(TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(1));

Aquí crearíamos un observable que empezará a iterar dentro de un minuto. Y a partir de entonces, cada segundo tendrá lugar una nueva iteración.

Asíncrono

Las reactive extensions traen una gran variedad de creadores de observables asíncronos. Por ejemplo "ToAsync" es una función que convertirá cualquier código en asíncrono y observable:

var asyncObservable = Observable.ToAsync<string, int>(this.FuncionQueTardaMucho)("parametro de entrada");

Como vemos podremos podemos incluso llamar a funciones con parámetros.

Otra forma de crear observables asíncronos será llamando a funciones que ya son asíncronas. Un ejemplo podría ser recogiendo la respuesta de una petición web:

 var asyncObservable = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();

En este código llamaremos a la función de "GetResponse" asíncrona y recibiremos un observable de "WebResponse", que será la respuesta de la petición de la web.

La forma más simple de crear observables asíncronos es el uso de la función "Start":

var startObservable = Observable.Start(() => ProcedimientoQueTardaMucho());

Y para terminar con este apartado, podríamos recoger tareas del TPL (Task Parallel Library) y convertirlas en observables asíncronos. Siempre y cuando usemos el namespace "System.Reactive.Threading.Tasks": 

var parallelObservable = Task.Factory.StartNew(() => ProcedimientoQueTardaMucho()).ToObservable();

Eventos

Como última subcategoría dentro de los creadores de observables, encontraríamos crearlos a partir de eventos. Es decir se adjuntan/attachan a un evento y devuelven sus publicaciones en forma de observables. Para esta tarea tendremos la función "FromEventPattern":

var clicks = Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
                        routedHandler => MyButton.Click += routedHandler,
                        routedHandler => MyButton.Click -= routedHandler);

El código anterior nos mostraría como recoger las pulsaciones de un botón en WPF (o Silverlight) mediante Rx. La versatilidad de estas sería poder operar con los clicks. Por ejemplo podríamos hacer que en cada pulsación de las 10 primeras se incremente en uno el contenido:

int counter = 0;
clicks.Take(10).Subscribe(e =>
                {
                    counter++;
                    MyButton.Content = counter.ToString(); 
                });

 

La mayor parte de estas funciones para crear observables tienen sobrecargas que nos aportarán mayor funcionalidad. Además no son las únicas herramientas de las que disponemos, aunque si probablemente las más significativas. Como siempre a partir de aquí os invitamos a que exploréis que otros métodos de creación existen o incluso a ensayar con la función "Create" para encontrar diferentes resultados.

En el siguiente artículo seguiremos explicando los temas referentes a Linq y trataremos los operadores propios de reactive extensions. Además conoceremos los Marble diagrams, así que esperamos que no os lo perdáis.