第六章 缓存机制
在 Web 应用中,缓存是必不可少的组件。通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,以减轻数据库压力。作为一个重要的组件,MyBatis自然也在内部提供了相应的支持。通过在框架层面增加缓存功能,可减轻数据库的压力,同时又可以提升查询速度,可谓一举两得。MyBatis 缓存结构由一级缓存和二级缓存构成,这两级缓存均是使用 Cache 接口的实现类。因此本章将首先会向大家介绍 Cache 几种实现类
的源码,然后再分析一级和二级缓存的实现。
6.1 缓存类介绍
在MyBatis中,Cache是缓存接口,它定义了一些基本的缓存操作,所有缓存类都应该实现该接口。实现类如下
具有基本缓存功能的PerpetualCache
具有LRU策略的缓存LruCache
可保证线程安全的缓存SynchronizedCache
具有阻塞功能的BlockingCache
此外,除了PerpetualCache外,其他的类全都是装饰器类。类图如下
下面先看一下Cache接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public interface Cache { String getId () ; void putObject (Object key, Object value) ; Object getObject (Object key) ; Object removeObject (Object key) ; void clear () ; int getSize () ; ReadWriteLock getReadWriteLock () ; }
6.1.1 PerpetualCache
因为这个被修饰的Cache逻辑比较简单,这里只给出源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class PerpetualCache implements Cache { private final String id; private Map<Object, Object> cache = new HashMap<>(); public PerpetualCache (String id) { this .id = id; } @Override public String getId () { return id; } @Override public int getSize () { return cache.size(); } @Override public void putObject (Object key, Object value) { cache.put(key, value); } @Override public Object getObject (Object key) { return cache.get(key); } @Override public Object removeObject (Object key) { return cache.remove(key); } @Override public void clear () { cache.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } @Override public boolean equals (Object o) { if (getId() == null ) { throw new CacheException("Cache instances require an ID." ); } if (this == o) { return true ; } if (!(o instanceof Cache)) { return false ; } Cache otherCache = (Cache) o; return getId().equals(otherCache.getId()); } @Override public int hashCode () { if (getId() == null ) { throw new CacheException("Cache instances require an ID." ); } return getId().hashCode(); } }
6.1.2 LruCache
先介绍LRU策略。LRU策略其实很简单。我们使用的缓存是一块给定大小的区域,它不是无限的。那么当缓存区域满了之后,我们再插入时,就需要选择一个老缓存删除它。对LRU策略来说,删除的是访问时间最早的那一条,因为它在近期是最少被访问的。因此,我们如果要实现LRU策略,就需要维护一个顺序表结构,表中元素按照访问时间先后进行排序,并且当表中某个元素被访问之后,它就要被放到表尾。当我们需要删除一个老缓存时,则直接删除表头的缓存即可。在MyBatis中,是使用LinkedHashList作为LRU结构的。
下面我们直接看一看LruCache的源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class LruCache implements Cache { private final Cache delegate; private Map<Object, Object> keyMap; private Object eldestKey; public LruCache (Cache delegate) { this .delegate = delegate; setSize(1024 ); } @Override public String getId () { return delegate.getId(); } @Override public int getSize () { return delegate.getSize(); } public void setSize (final int size) { keyMap = new LinkedHashMap<Object, Object>(size, .75F , true ) { private static final long serialVersionUID = 4267176411845948333L ; @Override protected boolean removeEldestEntry (Map.Entry<Object, Object> eldest) { boolean tooBig = size() > size; if (tooBig) { eldestKey = eldest.getKey(); } return tooBig; } }; } @Override public void putObject (Object key, Object value) { delegate.putObject(key, value); cycleKeyList(key); } @Override public Object getObject (Object key) { keyMap.get(key); return delegate.getObject(key); } @Override public Object removeObject (Object key) { return delegate.removeObject(key); } @Override public void clear () { delegate.clear(); keyMap.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } private void cycleKeyList (Object key) { keyMap.put(key, key); if (eldestKey != null ) { delegate.removeObject(eldestKey); eldestKey = null ; } } }
6.1.3 BlockingCache
BlockingCache 实现了阻塞特性,该特性是基于 Java 重入锁实现的。同一时刻下,BlockingCache 仅允许一个线程访问指定 key 的缓存项,其他线程将会被阻塞住。下面我们来看一下 BlockingCache 的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class BlockingCache implements Cache { private long timeout; private final Cache delegate; private final ConcurrentHashMap<Object, ReentrantLock> locks; public BlockingCache (Cache delegate) { this .delegate = delegate; this .locks = new ConcurrentHashMap<>(); } @Override public void putObject (Object key, Object value) { try { delegate.putObject(key, value); } finally { releaseLock(key); } } @Override public Object getObject (Object key) { acquireLock(key); Object value = delegate.getObject(key); if (value != null ) { releaseLock(key); } return value; } @Override public Object removeObject (Object key) { releaseLock(key); return null ; } @Override public void clear () { delegate.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } private ReentrantLock getLockForKey (Object key) { return locks.computeIfAbsent(key, k -> new ReentrantLock()); } private void acquireLock (Object key) { Lock lock = getLockForKey(key); if (timeout > 0 ) { try { boolean acquired = lock.tryLock(timeout, TimeUnit.MILLISECONDS); if (!acquired) { throw new CacheException("Couldn't get a lock in " + timeout + " for the key " + key + " at the cache " + delegate.getId()); } } catch (InterruptedException e) { throw new CacheException("Got interrupted while trying" + "to acquire lock for key " + key, e); } } else { lock.lock(); } } private void releaseLock (Object key) { ReentrantLock lock = locks.get(key); if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
大家看到getObject的实现可能会感觉有点奇怪。下面解释下,getObject 方法会先获取与 key 对应的锁,并加锁。若缓存命中,getObject 方法会释放锁,否则将一直锁定。getObject 方法若返回 null,表示缓存未命中。此时 MyBatis 会向数据库发起查询请求,并调用 putObject 方法存储查询结果。此时,putObject 方法会将指定 key 对应的锁进行解锁,这样被阻塞的线程即可恢复运行。
上面的解释对应了BlockingCache类注释中的下面这段话
It sets a lock over a cache key when the element is not found in cache. This way, other threads will wait until this element is filled instead of hitting the database.
6.2 CacheKey
CacheKey类主要抽象了Cache中的k-v对中的key。因为影响key的因素不仅仅是SQL语句,还有运行时参数、分页参数等等因素。所以我们采用了CacheKey这个复合对象,来涵盖可影响查询结果的因子。
下面我们来看看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CacheKey implements Cloneable , Serializable { private final int multiplier; private int hashcode; private long checksum; private int count; private List<Object> updateList; public CacheKey () { this .hashcode = DEFAULT_HASHCODE; this .multiplier = DEFAULT_MULTIPLYER; this .count = 0 ; this .updateList = new ArrayList<>(); } }
上面4个变量,除了multiplier是恒定不变的之外,其他变量都会在更新操作中被修改。下面我们看一看更新操作的代码。每当修改发生时,我们就要一并更新hash值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void update (Object object) { int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object); count++; checksum += baseHashCode; baseHashCode *= count; hashcode = multiplier * hashcode + baseHashCode; updateList.add(object); }
此外,由于CacheKey最终要作为键存入HashMap,因此它需要覆盖equals和hashCode方法。下面看看这两个方法的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public boolean equals (Object object) { if (this == object) { return true ; } if (!(object instanceof CacheKey)) { return false ; } final CacheKey cacheKey = (CacheKey) object; if (hashcode != cacheKey.hashcode) { return false ; } if (checksum != cacheKey.checksum) { return false ; } if (count != cacheKey.count) { return false ; } for (int i = 0 ; i < updateList.size(); i++) { Object thisObject = updateList.get(i); Object thatObject = cacheKey.updateList.get(i); if (!ArrayUtil.equals(thisObject, thatObject)) { return false ; } } return true ; } @Override public int hashCode () { return hashcode; }
6.3 一级缓存
在进行数据库查询时,MyBatis总是按二级缓存、一级缓存、数据库的顺序进行。每个SqlSession都共享一个一级缓存。但一级缓存不可以跨SqlSession,因此不会存在并发问题。此外,一级缓存所存储的查询结果会在MyBatis执行更新操作、提交、回滚时被清空。它通常用于一次会话中,多次查询同一个查询的情况。
一级缓存是在BaseExecutor中被初始化的,这个缓存仅仅是一个PerpetualCache,没有任何装饰器。下面我们来看一看相关的初始化逻辑。
1 2 3 4 5 6 7 8 9 10 protected PerpetualCache localCache;protected BaseExecutor (Configuration configuration, Transaction transaction) { this .localCache = new PerpetualCache("LocalCache" ); }
初始化非常简单,接着我们再看看访问一级缓存的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <E> List<E> query (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameter); CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql); return query(ms, parameter, rowBounds, resultHandler, key, boundSql); }
首先我们先从代码清单6.8的第12行CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);向下,看看如何通过这些影响因素创建一个CacheKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public CacheKey createCacheKey (MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) { if (closed) { throw new ExecutorException("Executor was closed." ); } CacheKey cacheKey = new CacheKey(); cacheKey.update(ms.getId()); cacheKey.update(rowBounds.getOffset()); cacheKey.update(rowBounds.getLimit()); cacheKey.update(boundSql.getSql()); List<ParameterMapping> parameterMappings = boundSql.getParameterMappings(); TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry(); for (ParameterMapping parameterMapping : parameterMappings) { if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; String propertyName = parameterMapping.getProperty(); if (boundSql.hasAdditionalParameter(propertyName)) { value = boundSql.getAdditionalParameter(propertyName); } else if (parameterObject == null ) { value = null ; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else { MetaObject metaObject = configuration.newMetaObject(parameterObject); value = metaObject.getValue(propertyName); } cacheKey.update(value); } } if (configuration.getEnvironment() != null ) { cacheKey.update(configuration.getEnvironment().getId()); } return cacheKey; }
通过上面的代码,我们可以看到:CacheKey中的影响因素有:MappedStatement的id、SQL语句、分页参数、运行时参数、Environment的id。
然后我们从代码清单6.8的15行return query(ms, parameter, rowBounds, resultHandler, key, boundSql);看看这个重载方法做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public <E> List<E> query (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; try { queryStack++; list = resultHandler == null ? (List<E>) localCache.getObject(key) : null ; if (list != null ) { handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { list = queryFromDatabase( ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } return list; }
上面的源码是精简版的,也就是先通过缓存查询。若缓存中没有,再调用queryFromDatabase查询数据库。接着我们再看看如何查询数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private <E> List<E> queryFromDatabase (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; }
到此,一级缓存的逻辑就分析完了。因为它不用考虑并发问题,所以实现起来比较简单。下一节我们继续看二级缓存的实现。
6.4 二级缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public <E> List<E> query (MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameterObject); CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql); return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } public <E> List<E> query (MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { Cache cache = ms.getCache(); if (cache != null ) { flushCacheIfRequired(ms); if (ms.isUseCache() && resultHandler == null ) { ensureNoOutParams(ms, boundSql); @SuppressWarnings ("unchecked" ) List<E> list = (List<E>) tcm.getObject(cache, key); if (list == null ) { list = delegate.query( ms, parameterObject, rowBounds, resultHandler, key, boundSql); tcm.putObject(cache, key, list); } return list; } } return delegate.query( ms, parameterObject, rowBounds, resultHandler, key, boundSql); }
由上文,tcm(TransactionCacheManager,事务缓存管理器)负责了查询缓存和向缓存中放入对象,我们接着看看它的逻辑。
TransactionalCacheManager内部维护了Cache实例与TransactionalCache实例间的映射关系,该类也仅负责维护两者的映射关系,真正做事的还是TransactionalCache。它是一个实现了Cache的装饰器,为被装饰的Cache实例增加事务功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class TransactionalCacheManager { private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>(); public void clear (Cache cache) { getTransactionalCache(cache).clear(); } public Object getObject (Cache cache, CacheKey key) { return getTransactionalCache(cache).getObject(key); } public void putObject (Cache cache, CacheKey key, Object value) { getTransactionalCache(cache).putObject(key, value); } public void commit () { for (TransactionalCache txCache : transactionalCaches.values()) { txCache.commit(); } } public void rollback () { for (TransactionalCache txCache : transactionalCaches.values()) { txCache.rollback(); } } private TransactionalCache getTransactionalCache (Cache cache) { return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new ); } }
xxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public class TransactionalCache implements Cache { private static final Log log = LogFactory.getLog(TransactionalCache.class); private final Cache delegate; private boolean clearOnCommit; private final Map<Object, Object> entriesToAddOnCommit; private final Set<Object> entriesMissedInCache; @Override public Object getObject (Object key) { Object object = delegate.getObject(key); if (object == null ) { entriesMissedInCache.add(key); } if (clearOnCommit) { return null ; } else { return object; } } @Override public void putObject (Object key, Object object) { entriesToAddOnCommit.put(key, object); } @Override public Object removeObject (Object key) { return null ; } @Override public void clear () { clearOnCommit = true ; entriesToAddOnCommit.clear(); } public void commit () { if (clearOnCommit) { delegate.clear(); } flushPendingEntries(); reset(); } public void rollback () { unlockMissedEntries(); reset(); } private void reset () { clearOnCommit = false ; entriesToAddOnCommit.clear(); entriesMissedInCache.clear(); } private void flushPendingEntries () { for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) { delegate.putObject(entry.getKey(), entry.getValue()); } for (Object entry : entriesMissedInCache) { if (!entriesToAddOnCommit.containsKey(entry)) { delegate.putObject(entry, null ); } } } private void unlockMissedEntries () { for (Object entry : entriesMissedInCache) { try { delegate.removeObject(entry); } catch (Exception e) { log.warn("Unexpected exception while notifiying a rollback to the cache adapter." + "Consider upgrading your cache adapter to the latest version. Cause: " + e); } } } }
通过TransactionalCache我们可以解决数据库事务的脏读问题。
脏读问题的解决主要与entriesToAddOnCommit集合有关。该集合用于存储本次事务中查询的结果,那为什么要将结果保存在该集合中,而非 delegate 所表示的缓存中呢?主要是因为直接存到 delegate 会导致脏数据问题。
下面先看看,假如不采用这个集合来控制,为什么就会发生脏读。
时刻2,事务 A 对记录 A 进行了更新。
时刻3,事务 A 从数据库查询记录A,并将记录 A 写入缓存中。
时刻4,事务 B 查询记录 A,由于缓存中存在记录 A,事务B 直接从缓存中取数据。这个时候,脏数据问题就发生了。事务 B 在事务 A 未提交情况下,读取到了事务 A 所修改的记录。
这就是我们需要为每个事务引入一个独立缓存的原因:
查询数据时,仍从delegate缓存(以下统称为共享缓存)中查询。若缓存未命中,
则查询数据库。
存储查询结果时,并不直接存储查询结果到共享缓存中,而是先存储到事务
缓存中,也就是entriesToAddOnCommit集合。当事务提交时,再将事务缓存中的缓存项转
存到共享缓存中。
这样,事务 B 只能在事务 A 提交后,才能读取到事务 A 所做的修改,解决了脏读问题。
下面我们举个例子说明脏读问题是如何得到解决的
时刻2,事务 A 和 B 同时查询记录 A。此时共享缓存中还没没有数据,所以两个事务均会向数据库发起查询请求,并将查询结果存储到各自的事务缓存中。
时刻3,事务 A 更新记录 A,这里把更新后的记录 A 记为 A′。
时刻4,两个事务再次进行查询。此时,事务 A 读取到的记录为修改后的值,而事务 B 读取到的记录仍为原值。
时刻5,事务 A被提交,并将事务缓存 A 中的内容转存到共享缓存中。
时刻6,事务 B 再次查询记录 A,由于共享缓存中有相应的数据,所以直接取缓存数据即可。因此得到记录 A′,而非记录 A。但由于事务 A 已经提交,所以事务 B 读取到的记录 A′ 并非是脏数据。
但需要注意的时,MyBatis 缓存事务机制只能解决脏读问题,并不能解决“不可重复读”问题 。再回到上图,事务 B 在被提交前进行了三次查询。前两次查询得到的结果为记录 A,最后一次查询得到的结果为 A′。最有一次的查询结果与前两次不同,这就会导致“不可重复读”的问题。MyBatis 的缓存事务机制最高只支持“读已提交”,并不能解决“不可重复读”问题。即使数据库使用了更高的隔离级别解决了这个问题,但因 MyBatis 缓存事务机制级别较低。此时仍然会导致“不可重复读”问题的发生,这个在日常开发中需要注意一下。