Java 8 Streams and JPA

Since the definition of the JPA Standard predates the release of Java 8 it is not surprising that the JPA API is only based on collections. There is no way to obtain a Java 8 Stream out of a query object.

For those out there using Hibernate as their JPA provider there is an interesting way to create a stream for your JPQL queries.

Hibernate Scrollable Results

Hibernate supports many more features than those offered by the JPA Standard only. Hibernate’s Query class allows us to obtain an iterator-like object for a JQPL query. This iterator-like class is named ScrollableResults.

You can see a great example of this feature in this blog post about Reading Large Result Sets with Hibernate and MySQL. I have taken the liberty of copying one of their examples:

Query query = session.createQuery(query);
query.setReadOnly(true);
query.setFetchSize(100);
ScrollableResults results = query.scroll(ScrollMode.FORWARD_ONLY);
while (results.next()) {
   Object row = results.get();
   // process the entity here
}
results.close();

Creating an Iterator Wrapper around Scrollable Results

Let’s take this a bit further now by implementing a Java Iterator wrapper around the Hibernate’s scrollable results object as shown below:

class ScrollableResultIterator<T> implements Iterator<T> {
   private final ScrollableResults results;
   private final Class<T> type;

   ScrollableResultIterator(ScrollableResults results, Class<T> type) {
      this.results = results;
      this.type = type;
   }

   @Override
   public boolean hasNext() {
      return results.next();
   }

   @Override
   public T next() {
      return type.cast(results.get(0));
   }
}

Gaining Access to Hibernate Session through JPA Entity Manager

As you can see the examples above use a Hibernate session object to obtain an instance of a Hibernate query object. Supposing we are using JPA, we typically have access to an EntityManager, but not to any particular implementation objects of the underlaying provider.

To gain access to the Hibernate’s Session object we can use a special method in the entity manger.

Session session = entityManager.unwrap(Session.class);

Be warned that at this point we are escaping from the safety of vendor agnostic code. In other words, if we ever wanted to use another provider like OpenJPA or EclipseLink, we would be forced to find a different alternative to implement our code here since those other providers won’t offer anything like Hibernate’s Session and ScrollableResults.

From Iterators to Spliterators to Streams

Our next step consists in obtaining a Java 8 stream out of this iterator. Lukas Eder recently wrote an interesting article about the difficulties to obtain a stream out of an iterable object which will be pretty handy here.

To achieve our goal we now need to use two utility classes in Java 8 named StreamSupport and Spliterators.

For instance, we could take an iterator, like the one we defined above and transform it into a Spliterator by doing:

private Spliterator<T> toSplitIterator(ScrollableResults scroll, Class<T> type){
   return Spliterators.spliteratorUnknownSize(
      new ScrollableResultIterator<>(scroll, type),
         Spliterator.DISTINCT | Spliterator.NONNULL | 
         Spliterator.CONCURRENT | Spliterator.IMMUTABLE
      );
}

This Splititerator is an intermediate product that we can now use to create a Java 8 stream out of it:

StreamSupport.stream(spliterator, false);

At this point we have built enough to create our first stream out of a JPQL query. We can do it as follows:

public Stream<T> getResultStream(
      String sql, 
      Integer fetchSize, 
      Map<String,Object> parameters) 
{
   Query query = session.createQuery(sql);
   if (fetchSize != null) {
      query.setFetchSize(fetchSize);
   }
   for (Map.Entry<String, Object> parameter : parameters.entrySet()) {
      query.setParameter(parameter.getKey(), parameter.getValue());
   }
   query.setReadOnly(true);
   ScrollableResults scroll = query.scroll(ScrollMode.FORWARD_ONLY);
   return StreamSupport.stream(toSplitIterator(scroll, type), false)
            .onClose(scroll::close);
}

A JPA Stream API

We can do a little bit better by defining the basics of the type of query object we are currently missing in JPA Standard API:

public interface StreamQuery<T> {
   Stream<T> getResultStream();
   StreamQuery<T> setParameter(String name, Object value);
   StreamQuery<T> setFetchSize(int fetchSize);
}

And putting together all we have learned so far we could create this implementation of our new StreamQuery interface:

public class HibernateQueryStream<T> implements StreamQuery<T> {

   private final Session session;
   private final String sql;
   private final Class<T> type;
   private final Map<String, Object> parameters = new HashMap<>();
   private Integer fetchSize;

   public HibernateQueryStream(
      EntityManager entityManager, 
      String sql, 
      Class<T> type) 
   {
     this.session = entityManager.unwrap(Session.class);
      this.sql = sql;
      this.type = type;
   }

   @Override
   public StreamQuery<T> setParameter(String name, Object value) {
      parameters.put(name, value);
      return this;
   }
   
   @Override
   public StreamQuery<T> setFetchSize(int fetchSize) {
      this.fetchSize = fetchSize;
      return this;
   }

   @Override
   public Stream<T> getResultStream() {
      Query query = session.createQuery(sql);
      if (fetchSize != null) {
         query.setFetchSize(fetchSize);
      }
      query.setReadOnly(true);
      for (Map.Entry<String, Object> parameter : parameters.entrySet()) {
         query.setParameter(parameter.getKey(), parameter.getValue());
      }
      ScrollableResults scroll = query.scroll(ScrollMode.FORWARD_ONLY);
      return StreamSupport.stream(toSplitIterator(scroll, type), false)
               .onClose(scroll::close);
   }
   
   private Spliterator<T> toSplitIterator(ScrollableResults scroll, Class<T> type){
      return Spliterators.spliteratorUnknownSize(
         new ScrollableResultIterator<>(scroll, type),
            Spliterator.DISTINCT | Spliterator.NONNULL | 
            Spliterator.CONCURRENT | Spliterator.IMMUTABLE
      );
   }

   private static class ScrollableResultIterator<T> implements Iterator<T> {

      private final ScrollableResults results;
      private final Class<T> type;
      
      ScrollableResultIterator(ScrollableResults results, Class<T> type) {
         this.results = results;
         this.type = type;
      }
      
      @Override
      public boolean hasNext() {
         return results.next();
      }
      
      @Override
      public T next() {
         return type.cast(results.get(0));
      }
   }
}

The Repository Layer

On top of this component we can now build our repository layer.

@Repository
public class StreamRepository {

   @PersistenceContext(unitName="demo")
   private EntityManager entityManager;

   public Stream<Order> getOrderHistory(String email) {
      String jpql = "SELECT o FROM Order o WHERE o.customer.email=:email";
      StreamQuery<Order> query = new HibernateQueryStream<>(
          entityManager, 
          jpql, 
          Order.class
      );
      return query.getResultStream();
   }
}

And from here the rest is a piece of cake:

orderRepository.findOrderHistory(email)
   .filter(order -> order.total() > 100)
   .map(Order::getOrderLines)
   .flatMap(List::stream)
   .map(OrderLine::getTotal)
   .reduce(0, (x,y) -> x + y);

It is important to highlight that the Hibernate session must be alive by the time we actually consume the stream, because it is at that point that the entities will be mapped. So, make sure that wherever you consume the stream the Hibernate session or you entity manager context is still alive.

Further Reading