My recent post on how to make memoize more flexible has sparked an interesting discussion about the actual implementation. Christophe Grand and Eugen Dück joined to discuss almost everything Clojure gives you as a tool to handle concurrency. Don't miss the evolution of a quite simple function in a non-concurrent world to a quite complex function in case multiple threads of execution are involved.
And to make it clear from the beginning: only part of the below is „grown on my droppings“ (as we say in german). Eugen and in particular Christophe created this in big parts!
So let's start with my original version using Refs.
(defn memoize
([f] (memoize f [find (fn [c _] c)]))
([f [cache-lookup cache-update]]
(let [cache (ref {})]
(fn [& args]
(if-let [e (cache-lookup @cache args)]
(val e)
(dosync
(if-let [e (cache-lookup @cache args)]
(val e)
(let [result (apply f args)]
(alter cache assoc args result)
(alter cache cache-update args)
result))))))))
I chose to use Refs, because one has to coordinate the changes in the
memoize cache and the state of the strategy. However, this approach has
some serious drawbacks as it turns out. The most impact has the nesting
property of dosync
: a nested transaction merges with the surrounding
one. Now in case of retry of the outer transaction we also loose the
memoisation of our function. Certainly not in our interest.
Another point made by Laurent Petit: Why do we need Refs? We can put everything in an atom. Then we don't need to coordinate changes between different entities and get rid of the transaction.
Let's look at Christophe's memoize3:
(defn memoize3
([f] (memoize3 f [{} identity identity assoc]))
([f [init cached hit assoc]]
(let [mem (atom init)]
(fn [& args]
(if-let [e (find (cached @mem) args)]
(do
(swap! mem hit args)
(val e))
(let [ret (apply f args)]
(swap! mem assoc args ret)
ret))))))
It uses an atom to hold a data structure defined by the strategy. Together with the provided helpers we can modify the atom depending on the calls to our memoised function. So we are done, right?
Eh. No. We are not. One sort of synchronisation the STM transactions do is between different Refs. However, they do also a different sort of synchronisation: to a single Ref between accesses.
Can you spot the bug? Right, we deref
the atom and find that the
desired item is contained in the cache. Then we update the state with
the hit function of our strategy and finally return the found value. And
this is exactly the problem. The atom may change underneath our hands
between the lookup and the swap!
call.
Thread 1 | Thread 2 |
---|---|
Lookup X – found in cache | |
Lookup Y – not found, causes X to be deleted from cache | |
Modifies strategy state for X – now invalid |
Now the strategy state is inconsistent with the cache. We have to do
all the work in one single call to swap!
.
On to Christophe's memoize5:
(defn memoize5
([f] (memoize5 f [{} identity (fn [mem args] mem) assoc]))
([f [init cached hit assoc]]
(let [mem (atom init)
hit-or-assoc (fn [mem args ret]
(if (find (cached mem) args)
(hit mem args)
(assoc mem args ret)))]
(fn [& args]
(let [ret (if-let [e (find (cached @mem) args)]
(val e)
(apply f args))]
(swap! mem hit-or-assoc args ret)
ret)))))
We check the cache and then do a single swap!
, which either updates
the cache or modifies the strategy's state. Finally return the value we
found: either the cached one or the computed one. Inside the swap!
we
check again to guard against the atom having changed underneath our
hands.
And since we compute the call to f
outside the swap!
we don't have
to fear costly recomputations of f
because of retries, right?
Well… No. There are costly recomputations possible and in the worst case there is no memoisation at all.
user=> (defn f
[x]
(println "Got" x "from" (Thread/currentThread))
(Thread/sleep 5000)
(case x
3 (f 2)
2 (f 1)
1 :done))
#'user/f
user=> (def f (memoize5 f))
#'user/f
user=> (-> #(do (f 3)
(println "Done for" (Thread/currentThread)))
Thread.
.start)
(Thread/sleep 2500)
(-> #(do (f 3)
(println "Done for" (Thread/currentThread)))
Thread.
.start)
Got 3 from #<Thread Thread[Thread-2,5,main]>
Got 3 from #<Thread Thread[Thread-3,5,main]>
Got 2 from #<Thread Thread[Thread-2,5,main]>
Got 2 from #<Thread Thread[Thread-3,5,main]>
Got 1 from #<Thread Thread[Thread-2,5,main]>
Got 1 from #<Thread Thread[Thread-3,5,main]>
Done for #<Thread Thread[Thread-2,5,main]>
Done for #<Thread Thread[Thread-3,5,main]>
The problem is that a missing item from the cache is first computed before it is registered in the cache. Whenever a new request for this item is encountered before the previous computation is finished, the computation will start again. In the end it will find the previous result already added to the cache and the freshly computed will be thrown away.
To solve this problem we have to control when a new computation is triggered. I proposed to use locking. We enter a short protected region guarded by a lock where we trigger the computation. This reduces concurrency a little because it serialises access to the cache at that point, but saves us from the redundant computations.
(defn memoize
([f] (memoize f naive-strategy))
([f [cache-lookup cache-update]]
(let [cache-state (atom {})]
(fn [& args]
(if-let [e (cache-lookup cache-state args)]
@(val e)
@(locking cache-state
(if-let [e (cache-lookup cache-state args)]
(val e)
(let [result (future (apply f args))]
(swap! cache-state assoc args result)
(cache-update cache-state args)
result))))))))
So when we enter the protected region we fire off a future
which
computes the desired value. Then we add the future to the cache and
return immediately. Upon returning the future
is dereferenced,
basically returning the computation result. Since we release the lock by
doing so, other threads can still access the cache for writing. If now a
second thread tries to compute the same value, it gets back the already
created future
and hence the value is computed only once.
This slightly strange construct is necessary because we do not want to
compute the value of f
inside the protected region. This would block
all users until the computation is done. Not very nice.
In my original code I used another concurrency utility provided by
Clojure: promise
. This creates a reference which can be filled in
exactly once via deliver
. All tries to dereference a Promise block
until a value is delivered. Then this value will be returned without
blocking. I basically reinvented future
with promise
.
(.start (Thread. #(deliver result (apply f args))))
The last two versions – using future
or promise
– are working as
desired. However there is an ugly spot: we fire off a new thread for the
computation while the current thread just sits there – idle – waiting
for the completion of the computation. Why can't we reuse this thread?
In fact we can and it is quite embarrassing that I haven't come up with
the right solution. The utility we use is the basic block of my
lazymap library. So I should be
very familiar with delay
.
delay
takes an expression and delays its evaluation until explicitly
called for via force
(or dereferencing the Delay). So together with
some nice enhancement by Eugen we arrive at Christophe's
memoize8:
(defn memoize8
([f] (memoize8 f [{} identity (fn [mem args] mem) assoc]))
([f [init cached hit assoc]]
(let [mem (atom init)
hit-or-assoc (fn [mem args]
(if (contains? (cached mem) args)
(hit mem args)
(assoc mem args (delay (apply f args)))))]
(fn [& args]
(let [m (swap! mem hit-or-assoc args)]
(-> (cached m) (get args) deref))))))
So what happens here? We enter a swap!
. If the requested item is
contained in the cache we call the hit
function of the strategy. It
might modify its state and returns the new one, which is saved in the
atom. In case we have a cache miss we create a Delay and store it in the
cache.
Then there is another important point. After returning from the swap!
we use the return value of swap!
. Why is this important? Suppose we
have a fifo strategy with a very low limit and would simply deref
the
atom again.
(fn [& args]
(swap! mem hit-or-assoc args)
(-> (cached @mem) (get args) deref))
If we bang new requests very hard on our poor memoised function it might
happen that our Delay is already removed again from the cache between
the swap!
and the deref
. Then we would be in trouble. In the return
value of the swap!
however the Delay will still be contained.
Finally there is some last gotcha: we might create multiple Delays in a
spinning swap!
. However I agree with Christophe: Delays are cheap. It
doesn't matter to create several Delays and use only one. If you want to
trade optimality karma with elegance karma you can wrap the swap!
call
into a locking
protected area, which ensures that the swap!
immediately succeeds and only one Delay is created.
Phew. Who would have thought, that memoize
would take us on a tour of
Clojure's concurrency features. Only agents were missing… It shows that
the STM, atoms, futures
, etc. are all no silver bullets. One has to
understand what's going on and one has to clearly think through all the
consequences of choosing one option over the other.
Clojure does not provide ready-made solutions, but it provides a lot of tools. The art is to combine them correctly. And when you are done, you end up with something small and elegant. Or you did something wrong. Compare some of the temporary results with the final version!
So let's see our work in all its glory together with the example strategies from my first post.
(declare naive-strategy)
(defn memoize
"Returns a memoized version of a referentially transparent function.
The memoized version of the function keeps a cache of the mapping from
arguments to results and, when calls with the same arguments are repeated
often, has higher performance at the expense of higher memory use.
Optionally takes a cache strategy. The strategy is provided as a map
containing the following keys. All keys are mandatory!
- :init – the initial value for the cache and strategy state
- :cache – access function to access the cache
- :lookup – determines whether a value is in the cache or not
- :hit – a function called with the cache state and the argument
list in case of a cache hit
- :miss – a function called with the cache state, the argument list
and the computation result in case of a cache miss
The default strategy is the naive safe-all strategy."
([f] (memoize f naive-strategy))
([f strategy]
(let [{:keys [init cache lookup hit miss]} strategy
cache-state (atom init)
hit-or-miss (fn [state args]
(if (lookup state args)
(hit state args)
(miss state args (delay (apply f args)))))]
(fn [& args]
(let [cs (swap! cache-state hit-or-miss args)]
(-> cs cache (get args) deref))))))
(def #^{:doc "The naive safe-all cache strategy for memoize."}
naive-strategy
{:init {}
:cache identity
:lookup contains?
:hit (fn [state _] state)
:miss assoc})
(defn fifo-cache-strategy
"Implements a first-in-first-out cache strategy. When the given limit
is reached, items from the cache will be dropped in insertion order."
[limit]
{:init {:queue (into clojure.lang.PersistentQueue/EMPTY
(repeat limit :dummy))
:cache {}}
:lookup (fn [state k] (contains? (:cache state) k))
:cache :cache
:hit (fn [state _] state)
:miss (fn [state args result]
(let [k (-> state :queue peek)]
(-> state
(update-in [:queue] conj args)
(update-in [:queue] pop)
(update-in [:cache] dissoc k)
(assoc-in [:cache] args result))))})
(defn lru-cache-strategy
"Implements a LRU cache strategy, which drops the least recently used
argument lists from the cache. If the given limit of items in the cache
is reached, the longest unaccessed item is removed from the cache. In
case there is a tie, the removal order is unspecified."
[limit]
{:init {:lru (into {} (for [x (range (- limit) 0)] [x x]))
:tick 0
:cache {}}
:cache :cache
:lookup (fn [state k] (contains? (:cache state) k))
:hit (fn [state args]
(-> state
(assoc-in [:lru] args (:tick state))
(update-in [:tick] inc)))
:miss (fn [state args result]
(let [k (apply min-key (:lru state) (keys (:lru state)))]
(-> state
(update-in [:lru] dissoc k)
(update-in [:cache] dissoc k)
(assoc-in [:lru] args (:tick state))
(update-in [:tick] inc)
(assoc-in [:cache] args result))))})
(defn ttl-cache-strategy
"Implements a time-to-live cache strategy. Upon access to the cache
all expired items will be removed. The time to live is defined by
the given expiry time span. Items will only be removed on function
call. No background activity is done."
[ttl]
(let [dissoc-dead (fn [state now]
(let [ks (map key (filter #(> (- now (val %)) ttl)
(:ttl state)))
dissoc-ks #(apply dissoc % ks)]
(-> state
(update-in [:ttl] dissoc-ks)
(update-in [:cache] dissoc-ks))))]
{:init {:ttl {} :cache {}}
:cache :cache
:lookup (fn [state args]
(when-let [t (get (:ttl state) args)]
(< (- (System/currentTimeMillis) t) ttl)))
:hit (fn [state args]
(dissoc-dead state (System/currentTimeMillis)))
:miss (fn [state args result]
(let [now (System/currentTimeMillis)]
(-> state
(dissoc-dead now)
(assoc-in [:ttl] args now)
(assoc-in [:cache] args result))))}))
(defn lu-cache-strategy
"Implements a least-used cache strategy. Upon access to the cache
it will be tracked which items are requested. If the cache size reaches
the given limit, items with the lowest usage count will be removed. In
case of ties the removal order is unspecified."
[limit]
{:init {:lu (into {} (for [x (range (- limit) 0)] [x x])) :cache {}}
:cache :cache
:lookup (fn [state k] (contains? (:cache state) k))
:hit (fn [state args] (update-in state [:lu args] inc))
:miss (fn [state args result]
(let [k (apply min-key (:lu state) (keys (:lu state)))]
(-> state
(update-in [:lu] dissoc k)
(update-in [:cache] dissoc k)
(assoc-in [:lu] args 0)
(assoc-in [:cache] args result))))})
Update As the icing on the cake here a version of the above using protocols and deftypes from the bleeding edge.
Update 2 Fixed some bugs with the different strategies.
Update 3 Moved things to new deftype syntax.
(defprotocol PCachingStrategy
"A caching strategy implements the backend for memoize. It handles the
underlying cache and might define different strategies to remove „old“
items from the cache."
(retrieve [cache item] "Get the requested cache item.")
(cached? [cache item] "Checks whether the given argument list is cached.")
(hit [cache item] "Called in case of a cache hit.")
(miss [cache item result] "Called in case of a cache miss."))
(declare naive-cache-strategy)
(defn memoize
"Returns a memoized version of a referentially transparent function.
The memoized version of the function keeps a cache of the mapping from
arguments to results and, when calls with the same arguments are repeated
often, has higher performance at the expense of higher memory use.
Optionally takes a cache strategy. Default is the naive safe all strategy."
([f] (memoize f (naive-cache-strategy)))
([f strategy]
(let [cache-state (atom strategy)
hit-or-miss (fn [cache item]
(if (cached? cache item)
(hit cache item)
(miss cache item (delay (apply f item)))))]
(fn [& args]
(let [cs (swap! cache-state hit-or-miss args)]
@(retrieve cs args))))))
(deftype NaiveStrategy [cache]
PCachingStrategy
(retrieve
[_ item]
(get cache item))
(cached?
[_ item]
(contains? cache item))
(hit
[this _]
this)
(miss
[_ item result]
(NaiveStrategy. (assoc cache item result))))
(defn naive-cache-strategy
"The naive safe-all cache strategy for memoize."
[]
(NaiveStrategy. {}))
(deftype FifoStrategy [cache queue]
PCachingStrategy
(retrieve
[_ item]
(get cache item))
(cached?
[_ item]
(contains? cache item))
(hit
[this _]
this)
(miss
[_ item result]
(let [k (peek queue)]
(FifoStrategy. (-> cache (dissoc k) (assoc item result))
(-> queue pop (conj item))))))
(defn fifo-cache-strategy
"Implements a first-in-first-out cache strategy. When the given limit
is reached, items from the cache will be dropped in insertion order."
[limit]
(FifoStrategy. {} (into clojure.lang.PersistentQueue/EMPTY
(repeat limit :dummy))))
(deftype LruStrategy [cache lru tick]
PCachingStrategy
(retrieve
[_ item]
(get cache item))
(cached?
[_ item]
(contains? cache item))
(hit
[_ item]
(let [tick (inc tick)]
(LruStrategy. cache (assoc lru item tick) tick)))
(miss
[_ item result]
(let [tick (inc tick)
k (apply min-key lru (keys lru))]
(LruStrategy. (-> cache (dissoc k) (assoc item result))
(-> lru (dissoc k) (assoc item tick))
tick))))
(defn lru-cache-strategy
"Implements a LRU cache strategy, which drops the least recently used
argument lists from the cache. If the given limit of items in the cache
is reached, the longest unaccessed item is removed from the cache. In
case there is a tie, the removal order is unspecified."
[limit]
(LruStrategy. {} (into {} (for [x (range (- limit) 0)] [x x])) 0))
(declare dissoc-dead)
(deftype TtlStrategy [cache ttl limit]
PCachingStrategy
(retrieve
[_ item]
(get cache item))
(cached?
[_ item]
(when-let [t (get ttl item)]
(< (- (System/currentTimeMillis) t) limit)))
(hit
[this _]
this)
(miss
[this item result]
(let [now (System/currentTimeMillis)
this (dissoc-dead this now)]
(TtlStrategy. (assoc (:cache this) item result)
(assoc (:ttl this) item now)
limit))))
(defn- dissoc-dead
[state now]
(let [ks (map key (filter #(> (- now (val %)) (:limit state))
(:ttl state)))
dissoc-ks #(apply dissoc % ks)]
(TtlStrategy. (dissoc-ks (:cache state))
(dissoc-ks (:ttl state))
(:limit state))))
(defn ttl-cache-strategy
"Implements a time-to-live cache strategy. Upon access to the cache
all expired items will be removed. The time to live is defined by
the given expiry time span. Items will only be removed on function
call. Outdated items might be returned. No background activity is
done."
[limit]
(TtlStrategy. {} {} limit))
(deftype LuStrategy [cache lu]
PCachingStrategy
(retrieve
[_ item]
(get cache item))
(cached?
[_ item]
(contains? cache item))
(hit
[_ item]
(LuStrategy. cache (update-in lu [item] inc)))
(miss
[_ item result]
(let [k (apply min-key lu (keys lu))]
(LuStrategy. (-> cache (dissoc k) (assoc item result))
(-> lu (dissoc k) (assoc item 0))))))
(defn lu-cache-strategy
"Implements a least-used cache strategy. Upon access to the cache
it will be tracked which items are requested. If the cache size reaches
the given limit, items with the lowest usage count will be removed. In
case of ties the removal order is unspecified."
[limit]
(LuStrategy. {} (into {} (for [x (range (- limit) 0)] [x x]))))
Published by Meikel Brandmeyer on .
I'm a long-time Clojure user and the developer of several open source projects mostly involving Clojure. I try to actively contribute to the Clojure community.
My most active projects are at the moment VimClojure, Clojuresque and ClojureCheck.
Copyright © 2009-2014 All Right Reserved. Meikel Brandmeyer