Vova Bilyachat

Melbourne, Australia

How to use cosmosdb in java with non-blocking api.

28 March 2021

Introduction

I started with java a bit more than 2 years and one of the biggest things I miss in java is await/async. But after sometime I found webflux – reactive apis that’s makes non-blocking simpler. And to make this example a bit more interesting I decided to show how to use Azure CosmosDB with java.

Start project

To generate default project I am using IntelliJ but its also possible using https://start.spring.io/. There are are two important dependency one for webflux and another for cosmosdb.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-spring-data-cosmos</artifactId>
    <version>3.4.0</version>
</dependency>

What is Reactive ?

Reactor is the reactive library of choice for Spring WebFlux. It provides the Mono(0..1) and Flux(0..N) types to work on data combining with huge amount of operators. Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure.

How to create Flux or Mono

Flux<String> seq1 = Flux.just("Vova", "love", "to", "sleep");

List<String> iterable = Arrays.asList("Once", "upon", "a", "time", "there", "was", "CosmosDb");
Flux<String> seq2 = Flux.fromIterable(iterable);

Flux<Integer> numbersFromFiveToSeven = Flux.range(1, 31);

Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

Operators

var stream = Flux.just("Vova", "love", "to", "sleep")
  .log()
  .map(String::toUpperCase)

In my code about nothig will happen since i did not subscribe to the stream. So to make it works we easer subscribe to stream or return so Spring boot will subscribe to it.

Cosmosdb configuration

Configuration is pretty simple it read uri and key from properties file, in real world application I prefer to use Azure Key Vault to store that type of sensitive information but that is topic for another blog. package com.vob.reactive.webflux.config;

import com.azure.cosmos.CosmosClientBuilder;
import com.azure.spring.data.cosmos.config.AbstractCosmosConfiguration;
import com.azure.spring.data.cosmos.config.CosmosConfig;
import com.azure.spring.data.cosmos.repository.config.EnableReactiveCosmosRepositories;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableReactiveCosmosRepositories
public class CosmosdbConfig extends AbstractCosmosConfiguration {
    @Value("${cosmosdb.uri}")
    private String uri;
    @Value("${cosmosdb.key}")
    private String key;
    @Bean
    public CosmosClientBuilder appCosmosClientBuilder() {
        return new CosmosClientBuilder()
                .key(key)
                .endpoint(uri);
    }


    @Bean
    public CosmosConfig cosmosConfig() {
        return CosmosConfig.builder()
                .enableQueryMetrics(true)
                .build();
    }

    @Override
    protected String getDatabaseName() {
        return "webflux";
    }
}

Documents

package com.vob.reactive.webflux.repository.entity;

import com.azure.spring.data.cosmos.core.mapping.Container;
import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import java.util.ArrayList;

@Data
@Container(containerName = "profile")
@Builder
public class Profile {
    @Id
    private String id;
    @Version
    private String _tag;
    @Builder.Default
    private ArrayList<String> wishLists = new ArrayList<>();
}
package com.vob.reactive.webflux.repository.entity;

import com.azure.spring.data.cosmos.core.mapping.Container;
import lombok.Builder;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;

import java.util.ArrayList;

@Data
@Container(containerName = "wishlist")
@Builder
public class Wishlist {
    @Id
    private String id;
    private String name;
    @Version
    private String _tag;
    @Builder.Default
    private ArrayList<String> products = new ArrayList<>();
}

Lets talk about annotations

@Container

  • containerName – allows to setup name of the container in cosmosdb
  • ru (optional) – request unites to use for that container
  • timeToLive – using this property we can set for how long our document will live inside of container by default is -1 which means it does not have expiry
  • autoCreateContainer – by default when application start sdk will check if container exists and if not it will create it
  • partitionKeyPath – same as previous it will be used to create new container partition key

In my real application I am setting autoCreateContainer=false and usually I have terraform template to spin up my infrastructure in this case I have more control over indexes and so on.

@Id

To map my document id field into cosmosdb id I am using @Id annotation.

@PartitionKey

The other was to set partition key for documents is to use separate annotation @PartitionKey. Since my dataset is small I did not bother to setup partition key at all and that is also possible.

@Version

Each time when you save document into cosmosdb it will be updated with _etag field. If you want to use Optimistic locking that is the way, create document with string field and annotate it with @Version. Why to bother? Well again it depends on use but for example you have document which can by updated from multiple threads and we don’t want to override each other so that’s how _etag will help us.


Reactive Repositories

To create our repositories we would need to extend from ReactiveCosmosRepository interface and specify our document class, with a key type

package com.vob.reactive.webflux.repository;

import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.vob.reactive.webflux.repository.entity.Wishlist;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

@Repository
public interface WishlistRepository  extends ReactiveCosmosRepository<Wishlist, String> {
    Flux<Wishlist> findByIdIn(Iterable<String> ids);
}

package com.vob.reactive.webflux.repository;

import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.vob.reactive.webflux.repository.entity.Profile;
import org.springframework.stereotype.Repository;

@Repository
public interface ProfileRepository extends ReactiveCosmosRepository<Profile, String> {

}

Create new list

Service

public Mono<Wishlist> createWishlist(String profileId, String name) {
    var newWishList = Wishlist.builder()
            .id(UUID.randomUUID().toString())
            .name(name)
            .build();
    return wishlistRepository.save(newWishList)
            .flatMap(wishList -> addWishlistAndSaveProfile(profileId, wishList));
}

private Mono<Wishlist> addWishlistAndSaveProfile(String profileId, Wishlist wishList) {
    return Mono.defer(() -> profileRepository
            .findById(profileId)
            .defaultIfEmpty(Profile.builder().id(profileId).build())
            .flatMap(profile -> {
                profile.getWishLists().add(wishList.getId());
                return profileRepository.save(profile);
            }))
            .retryWhen(getRetrySpecs())
            .map(p -> wishList);
}

 public static RetryBackoffSpec getRetrySpecs(){
        return Retry.backoff(3, Duration.of(10, ChronoUnit.MILLIS))
                .doBeforeRetry(retrySignal -> {
                    log.warn("Lets retry :)");
                })
                .filter(throwable -> throwable instanceof CosmosAccessException
                && ((CosmosAccessException) throwable).getCosmosException().getStatusCode() == HttpStatus.PRECONDITION_FAILED.value());
    }

Controller

@PostMapping
public Mono<WishlistDto> addProfile(@PathVariable String profileId, @RequestParam String name) {
    return service.createWishlist(profileId, name)
            .map(t -> DtoMapper.convertToDto(t, false));
}

Create list controller is going to call createWishlist method and that method will return Mono object. So in first step we call repository to save our newly created list, since we are using reactive here that time which will take to go to CosmosDB and save our document, CosmosDB is blazing fast in my production application usually its 2-4ms on average, during that time thread will be free to serve other requests. As soon as we receive request back from CosmosDB and thread is available, next operator flatMap will be called. Next step is to update profile with Wishlist id and save it back. So firstly I need to try if profile already exists by calling profileRepository.findById, then if nothing is found we flow will go to defaultIfEmpty operator so in flatMap operator we will have new object if it does not exists or document from cosmosdb. As soon as we call profileRepository.save it can throw exception since we are using @Version and _etag which will fail if we try to save object with different _etag(that can happen if at a same time two threads read object and update both will have same value for _etag but as soon as one thread will save document _etag will get updated so second thread will fail to save. Other beauty of reactive is super easy way to do retry operations, it has built in operator retryWhen which will require spec how do you wish to handle retry operations. But if I put retry on repository save method it will retry just save logic which wont help me since I need to repeat everything since loading from CosmosDb. This is why Mono.defer is handy operator which helps us to group complete logic to retry.

Retry

Logic in my retry is pretty simple before I do retry i want to log(And in my prod applications I am always using Application Insights), then i dont want retry any exception I will retry only cosmosdb exception PRECONDITION_FAILED https://docs.microsoft.com/en-us/rest/api/cosmos-db/http-status-codes-for-cosmosdb. And we will retry with delay of 10ms, then 15ms, then 20ms.

412 Precondition failure

The operation specified an eTag that is different from the version available at the server, that is, an optimistic concurrency error. Retry the request after reading the latest version of the resource and updating the eTag on the request.

Read list by id

Service

public Mono<Wishlist> getWishList(String profileId, String wishListId) {
    return Mono.zip(profileRepository.findById(profileId), wishlistRepository.findById(wishListId))
            .flatMap(this::validateProfileToWishlistPermission);
}

public Mono<Wishlist> validateProfileToWishlistPermission(Tuple2<Profile, Wishlist> objects) {
    //Validate that user has access to this profile
    var wishlist = objects.getT2();
    if (objects.getT1().getWishLists().contains(wishlist.getId())) {
        return Mono.just(wishlist);
    }
    return Mono.empty();
}

Controller

@GetMapping()
public Mono<ResponseEntity<WishlistDto>> getList(@PathVariable String profileId, @PathVariable String listId) {
    return service.getWishList(profileId, listId)
            .map(w -> ResponseEntity.ok().body(DtoMapper.convertToDto(w, true)))
            .defaultIfEmpty(ResponseEntity.notFound().build());
}

Mono.zip - another operator which can group multiple calls together and pass them into next operator. In this case two calls will be sent into cosmosdb and will go into flat map only if both findById returns some value, if not then our controller will return not found status code.

Sum Up

Reactive and non-blocking generally do not make applications run faster and even in some cases can slightly slow it down but it allows us to increase throughput. So having less resources we can process way more requests. On the other hand reactive require some skills and undertanding and it can be a bit of a learning curve.

Github

https://github.com/vovikdrg/webflux-cosmosdb