Reactive extensions - Introduction to reactive programming
Topics of this article: Rx, reactive programming, declarative programming, IoT systems, IObservable, IObserver
About the author: Bogdan Dobrea is an experienced .NET developer with a great curiosity towards discovering new technologies.
We all learned in school imperative programming, which has the advantage of being simple to explain and implement. This programming paradigm allows us to write a code where we describe, step by step, what we want to do. Hence, the result will be exactly predictable. It happens what we wanted to happen.
Let’s try to exemplify this using a real situation: an online store, where, through the browser, any user can look at products and place an order.
As an administrator of this online store, we would like to create an algorithm that returns the top /”rich” customers: for example, we want to find out how many customers have in their basket products that total at least 1000 Ron.
In the code, our client will be associated with the Customer class. We only need 2 properties, its name (Name) and the total of its shopping cart, expressed by the OrderAmount property. For simplicity, we will use Romanian lei (Ron) when we refer to the total of the basket.
The following examples are written in C#, but the subject of the article is not limited to this language, as we will find out later.
Our premise is that at one point, in our private virtual space there are 5 clients, each having a name as well as a basket of a certain value:
Our imperative algorithm, whereby we want to find out which of the 5 clients has spent at least 1000 RON could look like this:
In line with the definition of the imperative programming style mentioned at the beginning of the article, I described on each line, step by step, what I wanted to happen.
First, we created a new list of topSpendersList, then we went through the list of customers item by item, only interested in the name of the person whose shopping basket exceeds or is equal to 1000 Ron. At the end of the execution of this algorithm, we will notice that in the topSpendersList list we have 3 members (“Felicia”, “Robert” and “Fred”), because each of the 3 has spent at least 1000 Ron.
Everything was fine so far, we found out what we needed. Now let’s see how we could rewrite this algorithm using the declarative programming model.
Declarative programming, in contrast to the imperative one, is a programming style in which the logic of a calculation is described, but without presenting the execution mode. Those familiar with .Net (C#, VB) already use this programming mode when writing code using LINQ:
In the lines of code above, the logic of a calculation is expressed (it selects from the customers list only the customers who have a value of the shopping cart at least equal to 1000 Ron, then using their name creates a new list), without having to write the action step by step. The source code for the Where(), Select() and ToList() functions is in the .NET Framework library.
The result of the execution of this algorithm will be the same as for the previous one. Therefore, the topSpendersList list will contain the same 3 members (“Felicia”, “Robert” and “Fred”).
What does reactive mean? A common dictionary tells us that it is about a reaction to a stimulus.
To explain it more familiarly, I will use the well-known Excel spreadsheet, where we can have static cells as well as dynamic cells (for example, a formula). In the screenshot below, in column A we enter static values, and the first 2 cells in column D have the calculation formula, respectively the average of the first 10 values in column A. As a bonus, we added a 2D Chart with the help of to which we will represent a graph with the values in column A.
Each time we add, modify, or delete a value in a cell in column A, we notice that Sum, Average, and Graph 2D “reacts” to these changes.
Returning to our example, suppose that every time a new client appears on the site, we want the topSpendersList list to update itself with the name of the new client if it meets the condition imposed by us.
The examples written above will no longer work, because they depend on a fixed customer list, while we now have a customer flow.
And here come Reactive Extensions, which is a free library with which we can rewrite our algorithm so that it takes into account the fact that it has to “react” every time we have a new client.
Although there are more lines of code written than in the previous examples, we will find out that they are easy to understand, and we will use this pattern often in the future.
Let’s take them one at a time:
- customersList is now of the ObservableCollection We need this because this type of list will invoke an event called CollectionChanged every time we change the elements inside it;
- FromEventPattern creates a data flow from the CollectionChanged event that it listens to and drops us a new value each time we add a new client to the list;
- SelectMany returns a customer list extracted from the NewItems property of NotifyCollectionChangedEventArgs;
- Where it is already known to us, it will filter new customers and will only return those who have spent at least 1000 Ron;
- At this data flow, we must subscribe using the Subscribe In the body of this method, we decide what we do with the new client who respects our condition: we add it to the topSpendersList.
If we run the new algorithm, without having any element in the customersList, we will see that nothing happens. As we add new elements, the algorithm will run and after its execution, topSpendersList will be populated.
After adding the 5 initial customers, in the topSpendersList we will find the names of the last 3 customers.
But now customersList is no longer a fixed list, so after we add the next 5, our algorithm will “react” to this action and add 3 more people (Phil, Anastacia and Aurora) in the topSpendersList:
Therefore, we can say that reactive programming is a model (paradigm) of declarative programming that focuses on data flows and the propagation of change.
The new syntax, which uses Observable and Subscribe keywords, will be accessible after we add the NuGet package called System.Reactive.
IObservable and IObserver
To better understand the example of reactive code, we need to talk a little about the 2 interfaces. Their definition is in the .NET Framework since version 4.0. However, their implementation is not in the framework, but in the System.Reactive library.
The easiest way to explain things is to make an analogy with a real-life situation. This is what we did with the online store, so we will do the same here. Let’s look at IObservable as a treadmill, which receives products at one end, and transports them to the other end. The products can be of different sizes and colors and can reach the treadmill at different time intervals. If the treadmill is working and there is no one at the other end, they will fall on the floor and thus be lost.
In order to prevent this from happening, we need a human operator to take those packages. In other words, it “subscribes” to the packet flow on the treadmill. The operator (or observer) implements the IObserver interface, which contains 3 methods: OnNext, OnError, and OnCompleted.
The observable(treadmill), from the moment it has a subscriber, will know how to use these methods as follows:
- Whenever we have a new product on the tape, it will call the OnNext method, and the parameter sent will be the product itself;
- If a tread failure occurs, it will call the OnError From this moment, the connection between the tread and the operator is eliminated, and even if new products will be produced on the treadmill, the operator will not know about them;
- After all, packages are sent, the tape will call OnCompleted, and again the connection between the treadmill and the operator will be eliminated. Even if products will appear again, just like OnError, the operator will not know about them.
We notice that there is a close connection between IObservable and IObserver. Moreover, when we call Subscribe, IObservable will return an IDisposable, which is very useful, because when we no longer need this subscription, we simply call Dispose() on its instance. The System.Reactive library comes with another helper class called CompositeDisposable, in which we can add each subscription separately using the Add() method, and in the end, by calling Clear() at the CompositeDisposable instance, we remove all subscriptions, to assure the best memory management.
If we turn now to the reactive programming algorithm, we will realize that it makes more sense. For simplicity, in the body of the Subscribe method, we have written only a lambda expression that handles only the cases in which we have new clients (OnNext). Not cases of an error (OnError), nor is the situation in which the Observable calls OnCompleted when there are no new items.
Reactive extensions are also available for other programming languages, not just C#:
- Java (RxJava)
- C ++ (UniRx)
- Ruby (Rx.rb)
- Python (RxPY)
- PHP (RxPHP)
- Swift (RxSwift)
- Dart (RxDart)
See the official site, here.
This was, in short, an introduction to reactive programming. Reactive extensions can help us write easy-to-understand, scalable algorithms. These provide us with a helping hand in memory management (via CompositeDisposable). Last but not least, they can be a real help in asynchronous programming. The examples presented are the basis of this reactive thinking.
Reactive extensions also provide us with some very useful operators. Here is their short definition:
- Skip() – emits the first n elements of an observable structure
- Merge() – combines 2 or more observable streams
- Throttle() – issues a new item only if a certain time has elapsed without another item being issued
- Map() – transforms the current element, by applying a function to another element
- Timer() – a much easier way to write classic .Net timers