Java 8 Streams and JPA

Since the definition of the JPA Standard predates the release of Java 8 it is not surprising that the JPA API is only based on collections. There is no way to obtain a Java 8 Stream out of a query object.

For those out there using Hibernate as their JPA provider there is an interesting way to create a stream for your JPQL queries.

Hibernate Scrollable Results

Hibernate supports many more features than those offered by the JPA Standard only. Hibernate’s Query class allows us to obtain an iterator-like object for a JQPL query. This iterator-like class is named ScrollableResults.

You can see a great example of this feature in this blog post about Reading Large Result Sets with Hibernate and MySQL. I have taken the liberty of copying one of their examples:

Query query = session.createQuery(query);
query.setReadOnly(true);
query.setFetchSize(100);
ScrollableResults results = query.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
   Object row = results.get();
   // process the entity here
}
results.close();

Creating an Iterator Wrapper around Scrollable Results

Let’s take this a bit further now by implementing a Java Iterator wrapper around the Hibernate’s scrollable results object as shown below:

class ScrollableResultIterator<T> implements Iterator<T> {
   private final ScrollableResults results;
   private final Class<T> type;

   ScrollableResultIterator(ScrollableResults results, Class<T> type) {
      this.results = results;
      this.type = type;
   }

   @Override
   public boolean hasNext() {
      return results.next();
   }

   @Override
   public T next() {
      return type.cast(results.get(0));
   }
}

Gaining Access to Hibernate Session through JPA Entity Manager

As you can see the examples above use a Hibernate session object to obtain an instance of a Hibernate query object. Supposing we are using JPA, we typically have access to an EntityManager, but not to any particular implementation objects of the underlaying provider.

To gain access to the Hibernate’s Session object we can use a special method in the entity manger.

Session session = entityManager.unwrap(Session.class);

Be warned that at this point we are escaping from the safety of vendor agnostic code. In other words, if we ever wanted to use another provider like OpenJPA or EclipseLink, we would be forced to find a different alternative to implement our code here since those other providers won’t offer anything like Hibernate’s Session and ScrollableResults.

From Iterators to Spliterators to Streams

Our next step consists in obtaining a Java 8 stream out of this iterator. Lukas Eder recently wrote an interesting article about the difficulties to obtain a stream out of an iterable object which will be pretty handy here.

To achieve our goal we now need to use two utility classes in Java 8 named StreamSupport and Spliterators.

For instance, we could take an iterator, like the one we defined above and transform it into a Spliterator by doing:

private Spliterator<T> toSplitIterator(ScrollableResults scroll, Class<T> type){
   return Spliterators.spliteratorUnknownSize(
      new ScrollableResultIterator<>(scroll, type),
         Spliterator.DISTINCT | Spliterator.NONNULL | 
         Spliterator.CONCURRENT | Spliterator.IMMUTABLE
      );
}

This Splititerator is an intermediate product that we can now use to create a Java 8 stream out of it:

StreamSupport.stream(spliterator, false);

At this point we have built enough to create our first stream out of a JPQL query. We can do it as follows:

public Stream<T> getResultStream(
      String sql, 
      Integer fetchSize, 
      Map<String,Object> parameters) 
{
   Query query = session.createQuery(sql);
   if (fetchSize != null) {
      query.setFetchSize(fetchSize);
   }
   for (Map.Entry<String, Object> parameter : parameters.entrySet()) {
      query.setParameter(parameter.getKey(), parameter.getValue());
   }
   query.setReadOnly(true);
   ScrollableResults scroll = query.scroll(ScrollMode.FORWARD_ONLY);
   return StreamSupport.stream(toSplitIterator(scroll, type), false)
            .onClose(scroll::close);
}

A JPA Stream API

We can do a little bit better by defining the basics of the type of query object we are currently missing in JPA Standard API:

public interface StreamQuery<T> {
   Stream<T> getResultStream();
   StreamQuery<T> setParameter(String name, Object value);
   StreamQuery<T> setFetchSize(int fetchSize);
}

And putting together all we have learned so far we could create this implementation of our new StreamQuery interface:

public class HibernateQueryStream<T> implements StreamQuery<T> {

   private final Session session;
   private final String sql;
   private final Class<T> type;
   private final Map<String, Object> parameters = new HashMap<>();
   private Integer fetchSize;

   public HibernateQueryStream(
      EntityManager entityManager, 
      String sql, 
      Class<T> type) 
   {
     this.session = entityManager.unwrap(Session.class);
      this.sql = sql;
      this.type = type;
   }

   @Override
   public StreamQuery<T> setParameter(String name, Object value) {
      parameters.put(name, value);
      return this;
   }
   
   @Override
   public StreamQuery<T> setFetchSize(int fetchSize) {
      this.fetchSize = fetchSize;
      return this;
   }

   @Override
   public Stream<T> getResultStream() {
      Query query = session.createQuery(sql);
      if (fetchSize != null) {
         query.setFetchSize(fetchSize);
      }
      query.setReadOnly(true);
      for (Map.Entry<String, Object> parameter : parameters.entrySet()) {
         query.setParameter(parameter.getKey(), parameter.getValue());
      }
      ScrollableResults scroll = query.scroll(ScrollMode.FORWARD_ONLY);
      return StreamSupport.stream(toSplitIterator(scroll, type), false)
               .onClose(scroll::close);
   }
   
   private Spliterator<T> toSplitIterator(ScrollableResults scroll, Class<T> type){
      return Spliterators.spliteratorUnknownSize(
         new ScrollableResultIterator<>(scroll, type),
            Spliterator.DISTINCT | Spliterator.NONNULL | 
            Spliterator.CONCURRENT | Spliterator.IMMUTABLE
      );
   }

   private static class ScrollableResultIterator<T> implements Iterator<T> {

      private final ScrollableResults results;
      private final Class<T> type;
      
      ScrollableResultIterator(ScrollableResults results, Class<T> type) {
         this.results = results;
         this.type = type;
      }
      
      @Override
      public boolean hasNext() {
         return results.next();
      }
      
      @Override
      public T next() {
         return type.cast(results.get(0));
      }
   }
}

The Repository Layer

On top of this component we can now build our repository layer.

@Repository
public class StreamRepository {

   @PersistenceContext(unitName="demo")
   private EntityManager entityManager;

   public Stream<Order> getOrderHistory(String email) {
      String jpql = "SELECT o FROM Order o WHERE o.customer.email=:email";
      StreamQuery<Order> query = new HibernateQueryStream<>(
          entityManager, 
          jpql, 
          Order.class
      );
      return query.getResultStream();
   }
}

And from here the rest is a piece of cake:

orderRepository.findOrderHistory(email)
   .filter(order -> order.total() > 100)
   .map(Order::getOrderLines)
   .flatMap(List::stream)
   .map(OrderLine::getTotal)
   .reduce(0, (x,y) -> x + y);

It is important to highlight that the Hibernate session must be alive by the time we actually consume the stream, because it is at that point that the entities will be mapped. So, make sure that wherever you consume the stream the Hibernate session or you entity manager context is still alive.

Further Reading

Advertisements

2 thoughts on “Java 8 Streams and JPA

  1. Very interesting, but I don’t think you magically translate the predicate order -> order.total() > 100 into the equivalent JPQL for the predicate to be applied already in the database, right? Not to mention the map and reduce, which would best be implemented with SQL GROUP BY and SUM()

    As a SQL person, it hurts to see that a simple SQL data aggregation example is implemented by materialising almost all rows from the database in order to consume and aggregate them in Java memory. 🙂

    Nonetheless, interesting approach!

    • Thanks for the comment and feedback, Lukas.

      I think you are right! I suppose my last example was not the best. To be honest, at that point I just wanted to demonstrate that the stream was functional, and did not pay too much attention to the example that I used to create it.

      But I totally agree with you, for this particular example it would have been much better to do the aggregate query directly in the database.

      I recently used this pattern of data streams to build REST service providing a stream of results. As entities get mapped, they are mapped to model objects and then immediately flushed to the client. My original intention was to write about that. I hope I can make this clearer in a future post.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s