Vova Bilyachat

Melbourne, Australia

How to use Cosmos DB with Spring Boot Reactive(WebFlux)

28 March 2021

Introduction

I started with Java a bit more than 2 years ago and one of the biggest things I miss about Java is await/async. But, after some time I found webflux – reactive apis which makes non-blocking programming simpler. And, to make this example a bit more interesting I have decided to show how to use Azure Cosmos DB with Java.

Start project

To generate default project I am using IntelliJ but it’s also possible to do it using https://start.spring.io/. There are two important Java dependencies: one for webflux and another for cosmos db.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-spring-data-cosmos</artifactId>
    <version>3.4.0</version>
</dependency>

What is Reactive ?

Reactor is the reactive library of choice for Spring WebFlux. It provides the Mono(0..1) and Flux(0..N) types to work with data and a huge number of operators to support it. Reactor is a Reactive Streams library and all of its operators support non-blocking back pressure.

How to create Flux or Mono

Flux<String> seq1 = Flux.just("Vova", "love", "to", "sleep");

List<String> iterable = Arrays.asList("Once", "upon", "a", "time", "there", "was", "CosmosDb");
Flux<String> seq2 = Flux.fromIterable(iterable);

Flux<Integer> numbersFromFiveToSeven = Flux.range(1, 31);

Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

Operators

var stream = Flux.just("Vova", "love", "to", "sleep")
  .log()
  .map(String::toUpperCase)

In my code above nothing will happen since I didn’t subscribe to the stream. So, to make it work we either subscribe to stream in a code or return it, so that Spring Boot will subscribe to it for you.

Cosmos db configuration

The configuration in this example is pretty simple, it reads URI and the primary key from the properties file. In real world application I prefer to use Azure Key Vault to store sensitive information but, that is a topic for another blog post.

package com.vob.reactive.webflux.config;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.spring.data.cosmos.config.AbstractCosmosConfiguration;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.repository.config.EnableReactiveCosmosRepositories;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableReactiveCosmosRepositories
public class CosmosdbConfig extends AbstractCosmosConfiguration {
    @Value("${cosmosdb.uri}")
    private String uri;
    @Value("${cosmosdb.key}")
    private String key;
    @Bean
    public CosmosClientBuilder appCosmosClientBuilder() {
        return new CosmosClientBuilder()
                .key(key)
                .endpoint(uri);
    }


    @Bean
    public CosmosConfig cosmosConfig() {
        return CosmosConfig.builder()
                .enableQueryMetrics(true)
                .build();
    }

    @Override
    protected String getDatabaseName() {
        return "webflux";
    }
}

Documents

package com.vob.reactive.webflux.repository.entity;

import com.azure.spring.data.cosmos.core.mapping.Container;
import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import Java.util.ArrayList;

@Data
@Container(containerName = "profile")
@Builder
public class Profile {
    @Id
    private String id;
    @Version
    private String _tag;
    @Builder.Default
    private ArrayList<String> wishLists = new ArrayList<>();
}
package com.vob.reactive.webflux.repository.entity;

import com.azure.spring.data.cosmos.core.mapping.Container;
import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;

import Java.util.ArrayList;

@Data
@Container(containerName = "wishlist")
@Builder
public class Wishlist {
    @Id
    private String id;
    private String name;
    @Version
    private String _tag;
    @Builder.Default
    private ArrayList<String> products = new ArrayList<>();
}

Lets talk about annotations

@Container

  • containerName – name of the container in Cosmos DB
  • ru (optional) – number of request units for this container
  • timeToLive – set how long a document will live inside of the container and be deleted after that. By default is -1 which means it will never be automatically deleted
  • autoCreateContainer – if true then when an application starts, sdk will check if the container exists and if not, it will create it
  • partitionKeyPath – partition key for this container

In my applications I usually set the autoCreateContainer property to false and then I have a Terraform template to spin up my infrastructure. This way I have more control over indexes configuration and other settings of my database.

@Id

To map my document id field into Cosmos db id I am using @Id annotation.

@PartitionKey

The other was to set partition key for documents is to use separate annotation @PartitionKey. Since my dataset is small I did not bother to setup partition key at all and that is also possible.

@Version

Each time when you save document into cosmos db it will be updated with _etag field. If you want to use Optimistic locking that is the way, create document with string field and annotate it with @Version. Why to bother? Well again it depends on use but for example you have document which can by updated from multiple threads and we don’t want to override each other so that’s how _etag will help us.


Reactive Repositories

To create our repositories we would need to extend from ReactiveCosmosRepository interface and specify our document class, with a key type

package com.vob.reactive.webflux.repository;

import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.vob.reactive.webflux.repository.entity.Wishlist;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

@Repository
public interface WishlistRepository  extends ReactiveCosmosRepository<Wishlist, String> {
    Flux<Wishlist> findByIdIn(Iterable<String> ids);
}

package com.vob.reactive.webflux.repository;

import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.vob.reactive.webflux.repository.entity.Profile;
import org.springframework.stereotype.Repository;

@Repository
public interface ProfileRepository extends ReactiveCosmosRepository<Profile, String> {

}

Create new list

Service

public Mono<Wishlist> createWishlist(String profileId, String name) {
    var newWishList = Wishlist.builder()
            .id(UUID.randomUUID().toString())
            .name(name)
            .build();
    return wishlistRepository.save(newWishList)
            .flatMap(wishList -> addWishlistAndSaveProfile(profileId, wishList));
}

private Mono<Wishlist> addWishlistAndSaveProfile(String profileId, Wishlist wishList) {
    return Mono.defer(() -> profileRepository
            .findById(profileId)
            .defaultIfEmpty(Profile.builder().id(profileId).build())
            .flatMap(profile -> {
                profile.getWishLists().add(wishList.getId());
                return profileRepository.save(profile);
            }))
            .retryWhen(getRetrySpecs())
            .map(p -> wishList);
}

 public static RetryBackoffSpec getRetrySpecs(){
        return Retry.backoff(3, Duration.of(10, ChronoUnit.MILLIS))
                .doBeforeRetry(retrySignal -> {
                    log.warn("Lets retry :)");
                })
                .filter(throwable -> throwable instanceof CosmosAccessException
                && ((CosmosAccessException) throwable).getCosmosException().getStatusCode() == HttpStatus.PRECONDITION_FAILED.value());
    }

Controller

@PostMapping
public Mono<WishlistDto> addProfile(@PathVariable String profileId, @RequestParam String name) {
    return service.createWishlist(profileId, name)
            .map(t -> DtoMapper.convertToDto(t, false));
}

Create list controller is going to call createWishlist method and that method will return Mono object. So in first step we call repository to save our newly created list, since we are using reactive here that time which will take to go to Cosmos DB and save our document, Cosmos DB is blazing fast in my production application usually its 2-4ms on average, during that time thread will be free to serve other requests. As soon as we receive request back from Cosmos DB and Java thread is available, next operator flatMap will be called. Next step is to update profile with Wishlist id and save it back. So firstly I need to try if profile already exists by calling profileRepository.findById, then if nothing is found we flow will go to defaultIfEmpty operator so in flatMap operator we will have new object if it does not exists or document from cosmosdb. As soon as we call profileRepository.save it can throw exception since we are using @Version and _etag which will fail if we try to save object with different _etag(that can happen if at a same time two threads read object and update both will have same value for _etag but as soon as one thread will save document _etag will get updated so second thread will fail to save. Other beauty of reactive is super easy way to do retry operations, it has built in operator retryWhen which will require spec how do you wish to handle retry operations. But if I put retry on repository save method it will retry just save logic which wont help me since I need to repeat everything since loading from Cosmos Db. This is why Mono.defer is handy operator which helps us to group complete logic to retry.

Retry

Logic in my retry is pretty simple before I do retry i want to log(And in my prod applications I am always using Application Insights), then i dont want retry any exception I will retry only cosmosdb exception PRECONDITION_FAILED https://docs.microsoft.com/en-us/rest/api/cosmos-db/http-status-codes-for-cosmosdb. And we will retry with delay of 10ms, then 15ms, then 20ms.

412 Precondition failure

The operation specified an eTag that is different from the version available at the server, that is, an optimistic concurrency error. Retry the request after reading the latest version of the resource and updating the eTag on the request.

Read list by id

Service

public Mono<Wishlist> getWishList(String profileId, String wishListId) {
    return Mono.zip(profileRepository.findById(profileId), wishlistRepository.findById(wishListId))
            .flatMap(this::validateProfileToWishlistPermission);
}

public Mono<Wishlist> validateProfileToWishlistPermission(Tuple2<Profile, Wishlist> objects) {
    //Validate that user has access to this profile
    var wishlist = objects.getT2();
    if (objects.getT1().getWishLists().contains(wishlist.getId())) {
        return Mono.just(wishlist);
    }
    return Mono.empty();
}

Controller

@GetMapping()
public Mono<ResponseEntity<WishlistDto>> getList(@PathVariable String profileId, @PathVariable String listId) {
    return service.getWishList(profileId, listId)
            .map(w -> ResponseEntity.ok().body(DtoMapper.convertToDto(w, true)))
            .defaultIfEmpty(ResponseEntity.notFound().build());
}

Mono.zip - another operator which can group multiple calls together and pass them into next operator. In this case two calls will be sent into cosmos db and will go into flat map only if both findById returns some value, if not then our controller will return not found status code.

Sum Up

Reactive and non-blocking generally do not make applications run faster and even in some cases can slightly slow it down but it allows us to increase throughput. So having less resources we can process way more requests. On the other hand reactive require some skills and undertanding and it can be a bit of a learning curve.

Github

https://github.com/vovikdrg/webflux-cosmosdb