memoize done right

My recent post on how to make memoize more flexible has sparked an interesting discussion about the actual implementation. Christophe Grand and Eugen Dück joined to discuss almost everything Clojure gives you as a tool to handle concurrency. Don't miss the evolution of a quite simple function in a non-concurrent world to a quite complex function in case multiple threads of execution are involved.

Clarification

And to make it clear from the beginning: only part of the below is „grown on my droppings“ (as we say in german). Eugen and in particular Christophe created this in big parts!

The STM Version

So let's start with my original version using Refs.

(defn memoize
  ([f] (memoize f [find (fn [c _] c)]))
  ([f [cache-lookup cache-update]]
   (let [cache (ref {})]
     (fn [& args]
       (if-let [e (cache-lookup @cache args)]
         (val e)
         (dosync
           (if-let [e (cache-lookup @cache args)]
             (val e)
             (let [result (apply f args)]
               (alter cache assoc args result)
               (alter cache cache-update args)
               result))))))))

I chose to use Refs, because one has to coordinate the changes in the memoize cache and the state of the strategy. However, this approach has some serious drawbacks as it turns out. The most impact has the nesting property of dosync: a nested transaction merges with the surrounding one. Now in case of retry of the outer transaction we also loose the memoisation of our function. Certainly not in our interest.

Another point made by Laurent Petit: Why do we need Refs? We can put everything in an atom. Then we don't need to coordinate changes between different entities and get rid of the transaction.

The Atom Version

Let's look at Christophe's memoize3:

(defn memoize3
  ([f] (memoize3 f [{} identity identity assoc]))
  ([f [init cached hit assoc]]
   (let [mem (atom init)]
     (fn [& args]
       (if-let [e (find (cached @mem) args)]
         (do
           (swap! mem hit args)
           (val e))
         (let [ret (apply f args)]
           (swap! mem assoc args ret)
           ret))))))

It uses an atom to hold a data structure defined by the strategy. Together with the provided helpers we can modify the atom depending on the calls to our memoised function. So we are done, right?

Eh. No. We are not. One sort of synchronisation the STM transactions do is between different Refs. However, they do also a different sort of synchronisation: to a single Ref between accesses.

Can you spot the bug? Right, we deref the atom and find that the desired item is contained in the cache. Then we update the state with the hit function of our strategy and finally return the found value. And this is exactly the problem. The atom may change underneath our hands between the lookup and the swap! call.

Thread 1Thread 2
Lookup X – found in cache
Lookup Y – not found, causes X to be deleted from cache
Modifies strategy state for X – now invalid

Now the strategy state is inconsistent with the cache. We have to do all the work in one single call to swap!.

On to Christophe's memoize5:

(defn memoize5
  ([f] (memoize5 f [{} identity (fn [mem args] mem) assoc]))
  ([f [init cached hit assoc]]
   (let [mem         (atom init)
        hit-or-assoc (fn [mem args ret]
                       (if (find (cached mem) args)
                         (hit mem args)
                         (assoc mem args ret)))]
     (fn [& args]
       (let [ret (if-let [e (find (cached @mem) args)]
                   (val e)
                   (apply f args))]
         (swap! mem hit-or-assoc args ret)
         ret)))))

We check the cache and then do a single swap!, which either updates the cache or modifies the strategy's state. Finally return the value we found: either the cached one or the computed one. Inside the swap! we check again to guard against the atom having changed underneath our hands.

And since we compute the call to f outside the swap! we don't have to fear costly recomputations of f because of retries, right?

The Locking version

Well… No. There are costly recomputations possible and in the worst case there is no memoisation at all.

user=> (defn f
         [x]
         (println "Got" x "from" (Thread/currentThread))
         (Thread/sleep 5000)
         (case x
           3 (f 2)
           2 (f 1)
           1 :done))
#'user/f
user=> (def f (memoize5 f))
#'user/f
user=> (-> #(do (f 3)
              (println "Done for" (Thread/currentThread)))
              Thread.
              .start)
       (Thread/sleep 2500)
       (-> #(do (f 3)
              (println "Done for" (Thread/currentThread)))
              Thread.
              .start)
Got 3 from #<Thread Thread[Thread-2,5,main]>
Got 3 from #<Thread Thread[Thread-3,5,main]>
Got 2 from #<Thread Thread[Thread-2,5,main]>
Got 2 from #<Thread Thread[Thread-3,5,main]>
Got 1 from #<Thread Thread[Thread-2,5,main]>
Got 1 from #<Thread Thread[Thread-3,5,main]>
Done for #<Thread Thread[Thread-2,5,main]>
Done for #<Thread Thread[Thread-3,5,main]>

The problem is that a missing item from the cache is first computed before it is registered in the cache. Whenever a new request for this item is encountered before the previous computation is finished, the computation will start again. In the end it will find the previous result already added to the cache and the freshly computed will be thrown away.

To solve this problem we have to control when a new computation is triggered. I proposed to use locking. We enter a short protected region guarded by a lock where we trigger the computation. This reduces concurrency a little because it serialises access to the cache at that point, but saves us from the redundant computations.

(defn memoize
  ([f] (memoize f naive-strategy))
  ([f [cache-lookup cache-update]]
   (let [cache-state (atom {})]
     (fn [& args]
       (if-let [e (cache-lookup cache-state args)]
         @(val e)
         @(locking cache-state
            (if-let [e (cache-lookup cache-state args)]
              (val e)
              (let [result (future (apply f args))]
                (swap! cache-state assoc args result)
                (cache-update cache-state args)
                result))))))))

So when we enter the protected region we fire off a future which computes the desired value. Then we add the future to the cache and return immediately. Upon returning the future is dereferenced, basically returning the computation result. Since we release the lock by doing so, other threads can still access the cache for writing. If now a second thread tries to compute the same value, it gets back the already created future and hence the value is computed only once.

This slightly strange construct is necessary because we do not want to compute the value of f inside the protected region. This would block all users until the computation is done. Not very nice.

A short detour: `promise`

In my original code I used another concurrency utility provided by Clojure: promise. This creates a reference which can be filled in exactly once via deliver. All tries to dereference a Promise block until a value is delivered. Then this value will be returned without blocking. I basically reinvented future with promise.

(.start (Thread. #(deliver result (apply f args))))

Deutsche Bahn Version (or: „Delay is our core competence“)

The last two versions – using future or promise – are working as desired. However there is an ugly spot: we fire off a new thread for the computation while the current thread just sits there – idle – waiting for the completion of the computation. Why can't we reuse this thread?

In fact we can and it is quite embarrassing that I haven't come up with the right solution. The utility we use is the basic block of my lazymap library. So I should be very familiar with delay.

delay takes an expression and delays its evaluation until explicitly called for via force (or dereferencing the Delay). So together with some nice enhancement by Eugen we arrive at Christophe's memoize8:

(defn memoize8
  ([f] (memoize8 f [{} identity (fn [mem args] mem) assoc]))
  ([f [init cached hit assoc]]
   (let [mem          (atom init)
         hit-or-assoc (fn [mem args]
                        (if (contains? (cached mem) args)
                          (hit mem args)
                          (assoc mem args (delay (apply f args)))))]
     (fn [& args]
       (let [m (swap! mem hit-or-assoc args)]
         (-> (cached m) (get args) deref))))))

So what happens here? We enter a swap!. If the requested item is contained in the cache we call the hit function of the strategy. It might modify its state and returns the new one, which is saved in the atom. In case we have a cache miss we create a Delay and store it in the cache.

Then there is another important point. After returning from the swap! we use the return value of swap!. Why is this important? Suppose we have a fifo strategy with a very low limit and would simply deref the atom again.

(fn [& args]
  (swap! mem hit-or-assoc args)
  (-> (cached @mem) (get args) deref))

If we bang new requests very hard on our poor memoised function it might happen that our Delay is already removed again from the cache between the swap! and the deref. Then we would be in trouble. In the return value of the swap! however the Delay will still be contained.

Finally there is some last gotcha: we might create multiple Delays in a spinning swap!. However I agree with Christophe: Delays are cheap. It doesn't matter to create several Delays and use only one. If you want to trade optimality karma with elegance karma you can wrap the swap! call into a locking protected area, which ensures that the swap! immediately succeeds and only one Delay is created.

Summary

Phew. Who would have thought, that memoize would take us on a tour of Clojure's concurrency features. Only agents were missing… It shows that the STM, atoms, futures, etc. are all no silver bullets. One has to understand what's going on and one has to clearly think through all the consequences of choosing one option over the other.

Clojure does not provide ready-made solutions, but it provides a lot of tools. The art is to combine them correctly. And when you are done, you end up with something small and elegant. Or you did something wrong. Compare some of the temporary results with the final version!

  • small footprint
  • only a single check for cache hit or miss
  • complete flexibility by virtue of pluggable strategies
  • just a single call to update the cache and/or strategy state
  • same behaviour for cache miss or cache hit – no special cases
  • self-contained, no interference with wrapping transactions or the like

So let's see our work in all its glory together with the example strategies from my first post.

(declare naive-strategy)

(defn memoize
  "Returns a memoized version of a referentially transparent function.
  The memoized version of the function keeps a cache of the mapping from
  arguments to results and, when calls with the same arguments are repeated
  often, has higher performance at the expense of higher memory use.

  Optionally takes a cache strategy. The strategy is provided as a map
  containing the following keys. All keys are mandatory!

    - :init   – the initial value for the cache and strategy state
    - :cache  – access function to access the cache
    - :lookup – determines whether a value is in the cache or not
    - :hit    – a function called with the cache state and the argument
                list in case of a cache hit
    - :miss   – a function called with the cache state, the argument list
                and the computation result in case of a cache miss

  The default strategy is the naive safe-all strategy."
  ([f] (memoize f naive-strategy))
  ([f strategy]
   (let [{:keys [init cache lookup hit miss]} strategy
         cache-state (atom init)
         hit-or-miss (fn [state args]
                       (if (lookup state args)
                         (hit state args)
                         (miss state args (delay (apply f args)))))]
     (fn [& args]
       (let [cs (swap! cache-state hit-or-miss args)]
         (-> cs cache (get args) deref))))))

(def #^{:doc "The naive safe-all cache strategy for memoize."}
  naive-strategy
  {:init   {}
   :cache  identity
   :lookup contains?
   :hit    (fn [state _] state)
   :miss   assoc})

(defn fifo-cache-strategy
  "Implements a first-in-first-out cache strategy. When the given limit
  is reached, items from the cache will be dropped in insertion order."
  [limit]
  {:init   {:queue (into clojure.lang.PersistentQueue/EMPTY
                         (repeat limit :dummy))
            :cache {}}
   :lookup (fn [state k] (contains? (:cache state) k))
   :cache  :cache
   :hit    (fn [state _] state)
   :miss   (fn [state args result]
             (let [k (-> state :queue peek)]
               (-> state
                 (update-in [:queue] conj args)
                 (update-in [:queue] pop)
                 (update-in [:cache] dissoc k)
                 (assoc-in  [:cache] args result))))})

(defn lru-cache-strategy
  "Implements a LRU cache strategy, which drops the least recently used
  argument lists from the cache. If the given limit of items in the cache
  is reached, the longest unaccessed item is removed from the cache. In
  case there is a tie, the removal order is unspecified."
  [limit]
  {:init   {:lru   (into {} (for [x (range (- limit) 0)] [x x]))
            :tick  0
            :cache {}}
   :cache  :cache
   :lookup (fn [state k] (contains? (:cache state) k))
   :hit    (fn [state args]
             (-> state
               (assoc-in  [:lru]  args (:tick state))
               (update-in [:tick] inc)))
   :miss   (fn [state args result]
             (let [k (apply min-key (:lru state) (keys (:lru state)))]
               (-> state
                 (update-in [:lru]   dissoc k)
                 (update-in [:cache] dissoc k)
                 (assoc-in  [:lru]   args (:tick state))
                 (update-in [:tick]  inc)
                 (assoc-in  [:cache] args result))))})

(defn ttl-cache-strategy
  "Implements a time-to-live cache strategy. Upon access to the cache
  all expired items will be removed. The time to live is defined by
  the given expiry time span. Items will only be removed on function
  call. No background activity is done."
  [ttl]
  (let [dissoc-dead (fn [state now]
                      (let [ks (map key (filter #(> (- now (val %)) ttl)
                                                (:ttl state)))
                            dissoc-ks #(apply dissoc % ks)]
                        (-> state
                          (update-in [:ttl]   dissoc-ks)
                          (update-in [:cache] dissoc-ks))))]
    {:init   {:ttl {} :cache {}}
     :cache  :cache
     :lookup (fn [state args]
               (when-let [t (get (:ttl state) args)]
                 (< (- (System/currentTimeMillis) t) ttl)))
     :hit    (fn [state args]
               (dissoc-dead state (System/currentTimeMillis)))
     :miss   (fn [state args result]
               (let [now (System/currentTimeMillis)]
                 (-> state
                   (dissoc-dead now)
                   (assoc-in  [:ttl]   args now)
                   (assoc-in  [:cache] args result))))}))

(defn lu-cache-strategy
  "Implements a least-used cache strategy. Upon access to the cache
  it will be tracked which items are requested. If the cache size reaches
  the given limit, items with the lowest usage count will be removed. In
  case of ties the removal order is unspecified."
  [limit]
  {:init   {:lu (into {} (for [x (range (- limit) 0)] [x x])) :cache {}}
   :cache  :cache
   :lookup (fn [state k] (contains? (:cache state) k))
   :hit    (fn [state args] (update-in state [:lu args] inc))
   :miss   (fn [state args result]
             (let [k (apply min-key (:lu state) (keys (:lu state)))]
               (-> state
                 (update-in [:lu]    dissoc k)
                 (update-in [:cache] dissoc k)
                 (assoc-in  [:lu]    args 0)
                 (assoc-in  [:cache] args result))))})

Post Scriptum: Protocols and Types

Update As the icing on the cake here a version of the above using protocols and deftypes from the bleeding edge.

Update 2 Fixed some bugs with the different strategies.

Update 3 Moved things to new deftype syntax.

(defprotocol PCachingStrategy
  "A caching strategy implements the backend for memoize. It handles the
  underlying cache and might define different strategies to remove „old“
  items from the cache."
  (retrieve [cache item] "Get the requested cache item.")
  (cached?  [cache item] "Checks whether the given argument list is cached.")
  (hit      [cache item] "Called in case of a cache hit.")
  (miss     [cache item result] "Called in case of a cache miss."))

(declare naive-cache-strategy)

(defn memoize
  "Returns a memoized version of a referentially transparent function.
  The memoized version of the function keeps a cache of the mapping from
  arguments to results and, when calls with the same arguments are repeated
  often, has higher performance at the expense of higher memory use.
  Optionally takes a cache strategy. Default is the naive safe all strategy."
  ([f] (memoize f (naive-cache-strategy)))
  ([f strategy]
   (let [cache-state (atom strategy)
         hit-or-miss (fn [cache item]
                       (if (cached? cache item)
                         (hit cache item)
                         (miss cache item (delay (apply f item)))))]
     (fn [& args]
       (let [cs (swap! cache-state hit-or-miss args)]
         @(retrieve cs args))))))

(deftype NaiveStrategy [cache]
  PCachingStrategy
  (retrieve
    [_ item]
    (get cache item))
  (cached?
    [_ item]
    (contains? cache item))
  (hit
    [this _]
    this)
  (miss
    [_ item result]
    (NaiveStrategy. (assoc cache item result))))

(defn naive-cache-strategy
  "The naive safe-all cache strategy for memoize."
  []
  (NaiveStrategy. {}))

(deftype FifoStrategy [cache queue]
  PCachingStrategy
  (retrieve
    [_ item]
    (get cache item))
  (cached?
    [_ item]
    (contains? cache item))
  (hit
    [this _]
    this)
  (miss
    [_ item result]
    (let [k (peek queue)]
      (FifoStrategy. (-> cache (dissoc k) (assoc item result))
                     (-> queue pop (conj item))))))

(defn fifo-cache-strategy
  "Implements a first-in-first-out cache strategy. When the given limit
  is reached, items from the cache will be dropped in insertion order."
  [limit]
  (FifoStrategy. {} (into clojure.lang.PersistentQueue/EMPTY
                          (repeat limit :dummy))))

(deftype LruStrategy [cache lru tick]
  PCachingStrategy
  (retrieve
    [_ item]
    (get cache item))
  (cached?
    [_ item]
    (contains? cache item))
  (hit
    [_ item]
    (let [tick (inc tick)]
      (LruStrategy. cache (assoc lru item tick) tick)))
  (miss
    [_ item result]
    (let [tick (inc tick)
          k    (apply min-key lru (keys lru))]
      (LruStrategy. (-> cache (dissoc k) (assoc item result))
                    (-> lru   (dissoc k) (assoc item tick))
                    tick))))

(defn lru-cache-strategy
  "Implements a LRU cache strategy, which drops the least recently used
  argument lists from the cache. If the given limit of items in the cache
  is reached, the longest unaccessed item is removed from the cache. In
  case there is a tie, the removal order is unspecified."
  [limit]
  (LruStrategy. {} (into {} (for [x (range (- limit) 0)] [x x])) 0))

(declare dissoc-dead)

(deftype TtlStrategy [cache ttl limit]
  PCachingStrategy
  (retrieve
    [_ item]
    (get cache item))
  (cached?
    [_ item]
    (when-let [t (get ttl item)]
      (< (- (System/currentTimeMillis) t) limit)))
  (hit
    [this _]
    this)
  (miss
    [this item result]
    (let [now  (System/currentTimeMillis)
          this (dissoc-dead this now)]
      (TtlStrategy. (assoc (:cache this) item result)
                    (assoc (:ttl this) item now)
                    limit))))

(defn- dissoc-dead
  [state now]
  (let [ks (map key (filter #(> (- now (val %)) (:limit state))
                            (:ttl state)))
        dissoc-ks #(apply dissoc % ks)]
    (TtlStrategy. (dissoc-ks (:cache state))
                  (dissoc-ks (:ttl state))
                  (:limit state))))

(defn ttl-cache-strategy
  "Implements a time-to-live cache strategy. Upon access to the cache
  all expired items will be removed. The time to live is defined by
  the given expiry time span. Items will only be removed on function
  call. Outdated items might be returned. No background activity is
  done."
  [limit]
  (TtlStrategy. {} {} limit))

(deftype LuStrategy [cache lu]
  PCachingStrategy
  (retrieve
    [_ item]
    (get cache item))
  (cached?
    [_ item]
    (contains? cache item))
  (hit
    [_ item]
    (LuStrategy. cache (update-in lu [item] inc)))
  (miss
    [_ item result]
    (let [k (apply min-key lu (keys lu))]
      (LuStrategy. (-> cache (dissoc k) (assoc item result))
                   (-> lu (dissoc k) (assoc item 0))))))

(defn lu-cache-strategy
  "Implements a least-used cache strategy. Upon access to the cache
  it will be tracked which items are requested. If the cache size reaches
  the given limit, items with the lowest usage count will be removed. In
  case of ties the removal order is unspecified."
  [limit]
  (LuStrategy. {} (into {} (for [x (range (- limit) 0)] [x x]))))

Published by Meikel Brandmeyer on .