Wednesday 8 July 2015

CompletableFuture VS ParallelStream

CompletableFuture can be a complex concept if taken by itself, so I will use a simple example here to show for one business requirement, how to use it, when to use it and what is the difference comparing to parallelStream.

The requirement is as follows :

Build a list of promotion Strings. For each one you need to send an email to all the customers with the promotion. You also need to send an email to the Boss saying that promotions have been applied today.

The boss email and the customers emails are independent to each others and should not wait on either completion.

The base algorithm is this one  :

public static void main(String[] args) throws InterruptedException {
 List<String> promotions = Arrays.asList("Pizza Discount : ",
                                                 "Beer Discount : ",  
                                                 "Coke Discount : ", 
                                                 "Service Discount : ");
 old(promotions);
}

public static void old(List<String> promotions) {
 for (String promotion : promotions) {
  promotion = promotion + "30%";
  promotion = promotion + " Valid only Today";
  sendEmailToCustomers(promotion);
 }

 sendEmailToTheBoss("Today discount was 30%");
}



Let assume that sending an email takes 1 second :

public static void sendEmailToCustomers(String promotion) {
 try {
  Thread.sleep(1000);
  System.out.println("Email sent to customer for promotion : " + promotion);
 } catch (InterruptedException e) {
 }
}

public static void sendEmailToTheBoss(String promotion) {
 try {
  Thread.sleep(1000);
  System.out.println("Email sent to Boss for promotion : " + promotion);
 } catch (InterruptedException e) {
 }
} 
 
For each promotion we add the discount percentage. Then we add the day of when the discount is valid. Lastly we send the emails to all the customers. At the end we send the email to the Boss.

We can improve the speed of the algorithm parallelizing the process of sending emails to the customers using parallelStream :

public static void ps(List<String> promotions) throws InterruptedException {
 promotions.parallelStream()
                .map(promotion -> promotion + "30%")
                .map(promotion -> promotion + " Valid only Today")
                .forEach(promotion -> sendEmailToCustomers(promotion)); 
 
        sendEmailToTheBoss("Today discount was 30%");
}

Fantastic, now the promotions are sent in parallel, so it will take much less.

But we still have a requirement to fulfill, customers and Boss don't want to wait on each other to receive their emails.

Here the CompletableFuture comes in handy :

public static void cf(List<String> promotions) throws InterruptedException {
       CompletableFuture[] futures = promotions.stream()
             .map(promotion -> CompletableFuture.supplyAsync(() -> promotion + "30%"))
             .map(future -> future.thenApplyAsync((promotion) -> promotion + " Valid only Today"))
             .map(future -> future.thenAcceptAsync(promotion -> {
                    sendEmailToCustomers((String) promotion);
                  }))
             .toArray(CompletableFuture[]::new);

 sendEmailToTheBoss("Today discount was 30%");

 CompletableFuture.allOf(futures).join();
}

We composed our completableFutures based on the steps necessary to build the promotion.
The last step of the chain is to send the emails to the customers.

Everything is asynchronous here and CompletableFuture[] futures holds the composed completableFutures.
When the jvm rich the line sendEmailToTheBoss("Today discount was 30%"); , some of the customers email will be already sent and some are still to be sent, but we don't care to wait, we just want to acknowledge our Boss that promotions were generated.

The last line CompletableFuture.allOf(futures).join(); is because before leaving this method we want to wait for all the emails to be sent to the customers.

So, running all the methods above :

System.out.println("-------OLD-------");
old(promotions);
System.out.println("-------PARALLEL STREAM-------");
ps(promotions);
System.out.println("-------COMPLETABLE FUTURE-------");
cf(promotions);

The output will be

-------OLD-------
Email sent to customer for promotion : Pizza Discount : 30% Valid only Today
Email sent to customer for promotion : Beer Discount : 30% Valid only Today
Email sent to customer for promotion : Coke Discount : 30% Valid only Today
Email sent to customer for promotion : Service Discount : 30% Valid only Today
Email sent to Boss for promotion : Today discount was 30%
-------PARALLEL STREAM-------
Email sent to customer for promotion : Service Discount : 30% Valid only Today
Email sent to customer for promotion : Coke Discount : 30% Valid only Today
Email sent to customer for promotion : Beer Discount : 30% Valid only Today
Email sent to customer for promotion : Pizza Discount : 30% Valid only Today
Email sent to Boss for promotion : Today discount was 30%
-------COMPLETABLE FUTURE-------
Email sent to customer for promotion : Pizza Discount : 30% Valid only Today
Email sent to customer for promotion : Beer Discount : 30% Valid only Today
Email sent to Boss for promotion : Today discount was 30%
Email sent to customer for promotion : Coke Discount : 30% Valid only Today
Email sent to customer for promotion : Service Discount : 30% Valid only Today