ObjectBox Data Observers and Reactive Extensions

ObjectBox makes it easy for your app to react to data changes by providing:

  • data observers,
  • reactive extensions,
  • and an optional library to work with RxJava.

This makes setting up data flows easy while taking care of threading details.

Reactive Observers: A first Example

Let’s start with an example to demonstrate what you can do with reactive data observers:

The first line creates a regular query to get Task objects where task.complete == false. The second line connects an observer to the query. This is what happens:

  • the query is executed in the background
  • once the query finishes the observer gets the result data
  • whenever changes are made to Task objects in the future, the query will be executed again
  • once updated query results are in, they are propagated to the observer
  • the observer is called on Android’s main thread

So it’s just two lines of code – but a lot of stuff happening behind the scenes. Now, let’s dive into the details.

Data Observers Basics

When objects change, ObjectBox notifies subscribed data observers. They can either subscribe to changes of certain object types (via BoxStore) or to query results. To create a data observer you need to implement the generic io.objectbox.reactive.DataObserver interface:

This observer will be called by ObjectBox when necessary: typically shortly after subscribing and when data changes.

Note: onData() is called asynchronously and decoupled from the thread causing the data change (like the thread that committed a transaction).

Observing General Changes

The BoxStore allows a DataObserver to subscribe to object types. Let’s say you want to observe Task objects for a to-do list app:

Now, if some unrelated code of your app stores more tasks ObjectBox will call taskObserver.onData(Task.class) to notify you about the changes.

Note: there is also subscribe() which takes no arguments. It subscribes the observer to receive changes for all available object classes.

Observing Queries

ObjectBox let’s you build queries to find the objects matching certain criteria. Queries are an essential part of ObjectBox: whenever you need a specific set of data, you will probably use a query.

Combining queries and observers results in a convenient and powerful tool: query observers will automatically deliver fresh results whenever relevant data has changed. Let’s say you display a list of to-do tasks in your app. You can use a DataObserver to get all tasks that are not yet completed and pass them to a method updateUi()  (note that we are using lambda syntax here):

So when is our observer lambda called? Immediately when an observer is subscribed, the query will be run in a separate thread. Once the query result is available, it will be passed to the observer. This is the first call to the observer.

Now let’s say a task gets changed and stored in ObjectBox. It does not matter where and how; it might be the user who marked a task as completed, or some backend thread putting additional tasks during synchronization with a server. In any case, the query will notify all observers with updated query results.

Note that this pattern can greatly simplify your code: there is a single place where your data comes in to update your user interface. There is no separate initialization code, no wiring of events, no rerunning queries, etc.

Observers and Transactions

Observer notifications occur after a transaction is committed. For some scenarios it is especially important to know transaction bounds. If you call box.put() or remove() individually, an implicit transaction is started and committed. For example, this code fragment would trigger data observers on User.class twice:

There are several ways to combine several operations into one transaction, for example using one of the runInTx() or callInTx() methods in the BoxStore class. For our simple example, we can simply use an overload of put() accepting multiple objects:

This results in a single transaction and thus in a single DataObserver notification.

Subscriptions and their Cancellation

When you call observer(), it returns a subscription object implementing the io.objectbox.reactive.DataSubscription interface:

So, if you plan to unsubscribe your DataObserver later, it is a good idea to hold on to the DataSubscription. Call cancel() on it to let ObjectBox know that the observer should not be notified anymore:

Note: On Android, you would typically create the subscription in one of the onCreate()/onStart()/onResume() lifecycle methods and cancel it in its counterpart  onDestroy()/onStop()/onPause().

Reactive Extensions

In the first part you saw how data observers can help you keep your app state up to date. But there is more: ObjectBox comes with simple and convenient reactive extensions for typical tasks. While most of these are inspired by RxJava, they are not actually based on RxJava. ObjectBox brings its own features because not all developers are familiar with RxJava (for the RxJava ObjectBox library see below). We do not want to impose the complexity (Rx is almost like a new language to learn) and size of RxJava (~10k methods) on everyone. So, let’s keep it simple and neat for now.

Thread Scheduling

On Android, UI updates must occur on the main thread only. Luckily, ObjectBox allows to switch the observer from a background thread over to the main thread. Let’s take a look on a revised version of the to-do task example from above:

Where is the difference? The additional on() call is all that is needed to tell where we want our observer to be called. AndroidScheduler.mainThread() is a built-in scheduler implementation. Alternatively, you can create an AndroidScheduler using a custom Looper, or build a fully custom scheduler by implementing the io.objectbox.reactive.Scheduler interface.

Transforming Data

Maybe you want to transform the data before you hand it over to an observer. Let’s say, you want to keep track of the count of all stored objects for each type. The BoxStore subscription gives you the classes of the objects, and this example shows you how to transform them into actual object counts:

Note that the transform operation takes a Class object and returns a Long number. Thus the DataObserver receives the object count as a Long parameter in onData().

While the lambda syntax is nice and brief, let’s look at the io.objectbox.reactive.Transformer interface for clarification of what the transform() method expects as a parameter:

Some additional notes on transformers:

  • Transforms are not required to actually “transform” any data. Technically, it is fine to return the same data that is received and just do some processing with (or without) it.
  • Transformers are always executed asynchronously. It is fine to perform long lasting operations.

ErrorObserver

Maybe you noticed that a transformer may throw any type of exception. Also, a DataObserver might throw a RuntimeException. In both cases, you can provide an ErrorObserver to be notified about an exception that occured. The io.objectbox.reactive.ErrorObserver is straight-forward:

To specify your ErrorObserver, simply call the onError() method after subscribe().

Single Notifications vs. Only-Changes

When you subscribe to a query, the DataObserver gets both of the following by default:

  • Initial query results (right after subscribing)
  • Updated query results (underlying data was changed)

Sometimes you may by interested in only one of those. This is what the methods single() and onlyChanges() are for (call them after subscribe()). Single subscriptions are special in the way that they are cancelled automatically once the observer is notified. You can still cancel them manually to ensure no call to the observer is made at a certain point.

Weak References

Sometimes it may be nice to have a weak reference to data observers. Note that for the sake of a deterministic flow, it is advisable to explicitly cancel subscriptions explicitly whenever possible. If that does not scare you off, use weak() after subscribe().

Threading overview

To summarize threading as discussed earlier:

  • Query execution runs on a background thread (exclusive for this task)
  • DataTransformer runs on a background thread (exclusive for this task)
  • DataObserver and ErrorObserver run on a background thread unless a scheduler is specified via the on() method.

ObjectBox RxJava Extension Library

By design, there are zero dependencies on any Rx libraries in the core of ObjectBox. As you saw before ObjectBox gives you simple means to transform data, asynchronous processing, thread scheduling, and one time (single) notifications. However, you still might want to integrate with the mighty RxJava 2 (we have no plans to support RxJava 1). For this purpose we created the ObjectBox RxJava extension library. It requires an additional Gradle dependency (make sure to check for the latest version, this might not be it):

With that you can use the classes RxQuery and RxBoxStore. Both offer static methods to subscribe using RxJava means.

For general object changes, you can use RxBoxStore to create an Observable. RxQuery allows to subscribe to query objects using:

  • Flowable
  • Observable
  • Single

Example usage:

The extension library is managed as a separate open source project on GitHub.

 

 

Spread the love