第六章 缓存机制
在 Web 应用中,缓存是必不可少的组件。通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,以减轻数据库压力。作为一个重要的组件,MyBatis自然也在内部提供了相应的支持。通过在框架层面增加缓存功能,可减轻数据库的压力,同时又可以提升查询速度,可谓一举两得。MyBatis 缓存结构由一级缓存和二级缓存构成,这两级缓存均是使用 Cache
接口的实现类。因此本章将首先会向大家介绍 Cache
几种实现类
的源码,然后再分析一级和二级缓存的实现。
6.1 缓存类介绍
在MyBatis中,Cache是缓存接口,它定义了一些基本的缓存操作,所有缓存类都应该实现该接口。实现类如下
具有基本缓存功能的PerpetualCache
具有LRU
策略的缓存LruCache
可保证线程安全的缓存SynchronizedCache
具有阻塞功能的BlockingCache
此外,除了PerpetualCache
外,其他的类全都是装饰器类。类图如下
下面先看一下Cache
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public interface Cache { String getId () ; void putObject (Object key, Object value) ; Object getObject (Object key) ; Object removeObject (Object key) ; void clear () ; int getSize () ; ReadWriteLock getReadWriteLock () ; }
6.1.1 PerpetualCache
因为这个被修饰的Cache
逻辑比较简单,这里只给出源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class PerpetualCache implements Cache { private final String id; private Map<Object, Object> cache = new HashMap<>(); public PerpetualCache (String id) { this .id = id; } @Override public String getId () { return id; } @Override public int getSize () { return cache.size(); } @Override public void putObject (Object key, Object value) { cache.put(key, value); } @Override public Object getObject (Object key) { return cache.get(key); } @Override public Object removeObject (Object key) { return cache.remove(key); } @Override public void clear () { cache.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } @Override public boolean equals (Object o) { if (getId() == null ) { throw new CacheException("Cache instances require an ID." ); } if (this == o) { return true ; } if (!(o instanceof Cache)) { return false ; } Cache otherCache = (Cache) o; return getId().equals(otherCache.getId()); } @Override public int hashCode () { if (getId() == null ) { throw new CacheException("Cache instances require an ID." ); } return getId().hashCode(); } }
6.1.2 LruCache
先介绍LRU
策略。LRU
策略其实很简单。我们使用的缓存是一块给定大小的区域,它不是无限的。那么当缓存区域满了之后,我们再插入时,就需要选择一个老缓存删除它。对LRU
策略来说,删除的是访问时间最早的那一条,因为它在近期是最少被访问的。因此,我们如果要实现LRU
策略,就需要维护一个顺序表结构,表中元素按照访问时间先后进行排序,并且当表中某个元素被访问之后,它就要被放到表尾。当我们需要删除一个老缓存时,则直接删除表头的缓存即可。在MyBatis中,是使用LinkedHashList
作为LRU
结构的。
下面我们直接看一看LruCache
的源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class LruCache implements Cache { private final Cache delegate; private Map<Object, Object> keyMap; private Object eldestKey; public LruCache (Cache delegate) { this .delegate = delegate; setSize(1024 ); } @Override public String getId () { return delegate.getId(); } @Override public int getSize () { return delegate.getSize(); } public void setSize (final int size) { keyMap = new LinkedHashMap<Object, Object>(size, .75F , true ) { private static final long serialVersionUID = 4267176411845948333L ; @Override protected boolean removeEldestEntry (Map.Entry<Object, Object> eldest) { boolean tooBig = size() > size; if (tooBig) { eldestKey = eldest.getKey(); } return tooBig; } }; } @Override public void putObject (Object key, Object value) { delegate.putObject(key, value); cycleKeyList(key); } @Override public Object getObject (Object key) { keyMap.get(key); return delegate.getObject(key); } @Override public Object removeObject (Object key) { return delegate.removeObject(key); } @Override public void clear () { delegate.clear(); keyMap.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } private void cycleKeyList (Object key) { keyMap.put(key, key); if (eldestKey != null ) { delegate.removeObject(eldestKey); eldestKey = null ; } } }
6.1.3 BlockingCache
BlockingCache
实现了阻塞特性,该特性是基于 Java 重入锁实现的。同一时刻下,BlockingCache
仅允许一个线程访问指定 key
的缓存项,其他线程将会被阻塞住。下面我们来看一下 BlockingCache
的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class BlockingCache implements Cache { private long timeout; private final Cache delegate; private final ConcurrentHashMap<Object, ReentrantLock> locks; public BlockingCache (Cache delegate) { this .delegate = delegate; this .locks = new ConcurrentHashMap<>(); } @Override public void putObject (Object key, Object value) { try { delegate.putObject(key, value); } finally { releaseLock(key); } } @Override public Object getObject (Object key) { acquireLock(key); Object value = delegate.getObject(key); if (value != null ) { releaseLock(key); } return value; } @Override public Object removeObject (Object key) { releaseLock(key); return null ; } @Override public void clear () { delegate.clear(); } @Override public ReadWriteLock getReadWriteLock () { return null ; } private ReentrantLock getLockForKey (Object key) { return locks.computeIfAbsent(key, k -> new ReentrantLock()); } private void acquireLock (Object key) { Lock lock = getLockForKey(key); if (timeout > 0 ) { try { boolean acquired = lock.tryLock(timeout, TimeUnit.MILLISECONDS); if (!acquired) { throw new CacheException("Couldn't get a lock in " + timeout + " for the key " + key + " at the cache " + delegate.getId()); } } catch (InterruptedException e) { throw new CacheException("Got interrupted while trying" + "to acquire lock for key " + key, e); } } else { lock.lock(); } } private void releaseLock (Object key) { ReentrantLock lock = locks.get(key); if (lock.isHeldByCurrentThread()) { lock.unlock(); } } }
大家看到getObject
的实现可能会感觉有点奇怪。下面解释下,getObject
方法会先获取与 key
对应的锁,并加锁。若缓存命中,getObject
方法会释放锁,否则将一直锁定。getObject
方法若返回 null
,表示缓存未命中。此时 MyBatis
会向数据库发起查询请求,并调用 putObject
方法存储查询结果。此时,putObject
方法会将指定 key
对应的锁进行解锁,这样被阻塞的线程即可恢复运行。
上面的解释对应了BlockingCache
类注释中的下面这段话
It sets a lock over a cache key when the element is not found in cache. This way, other threads will wait until this element is filled instead of hitting the database.
6.2 CacheKey
CacheKey
类主要抽象了Cache
中的k-v
对中的key
。因为影响key
的因素不仅仅是SQL语句,还有运行时参数、分页参数等等因素。所以我们采用了CacheKey
这个复合对象,来涵盖可影响查询结果的因子。
下面我们来看看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CacheKey implements Cloneable , Serializable { private final int multiplier; private int hashcode; private long checksum; private int count; private List<Object> updateList; public CacheKey () { this .hashcode = DEFAULT_HASHCODE; this .multiplier = DEFAULT_MULTIPLYER; this .count = 0 ; this .updateList = new ArrayList<>(); } }
上面4个变量,除了multiplier
是恒定不变的之外,其他变量都会在更新操作中被修改。下面我们看一看更新操作的代码。每当修改发生时,我们就要一并更新hash
值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void update (Object object) { int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object); count++; checksum += baseHashCode; baseHashCode *= count; hashcode = multiplier * hashcode + baseHashCode; updateList.add(object); }
此外,由于CacheKey
最终要作为键存入HashMap
,因此它需要覆盖equals
和hashCode
方法。下面看看这两个方法的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public boolean equals (Object object) { if (this == object) { return true ; } if (!(object instanceof CacheKey)) { return false ; } final CacheKey cacheKey = (CacheKey) object; if (hashcode != cacheKey.hashcode) { return false ; } if (checksum != cacheKey.checksum) { return false ; } if (count != cacheKey.count) { return false ; } for (int i = 0 ; i < updateList.size(); i++) { Object thisObject = updateList.get(i); Object thatObject = cacheKey.updateList.get(i); if (!ArrayUtil.equals(thisObject, thatObject)) { return false ; } } return true ; } @Override public int hashCode () { return hashcode; }
6.3 一级缓存
在进行数据库查询时,MyBatis总是按二级缓存、一级缓存、数据库的顺序进行。每个SqlSession
都共享一个一级缓存。但一级缓存不可以跨SqlSession
,因此不会存在并发问题。此外,一级缓存所存储的查询结果会在MyBatis执行更新操作、提交、回滚时被清空。它通常用于一次会话中,多次查询同一个查询的情况。
一级缓存是在BaseExecutor
中被初始化的,这个缓存仅仅是一个PerpetualCache
,没有任何装饰器。下面我们来看一看相关的初始化逻辑。
1 2 3 4 5 6 7 8 9 10 protected PerpetualCache localCache;protected BaseExecutor (Configuration configuration, Transaction transaction) { this .localCache = new PerpetualCache("LocalCache" ); }
初始化非常简单,接着我们再看看访问一级缓存的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <E> List<E> query (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameter); CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql); return query(ms, parameter, rowBounds, resultHandler, key, boundSql); }
首先我们先从代码清单6.8
的第12行CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
向下,看看如何通过这些影响因素创建一个CacheKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public CacheKey createCacheKey (MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) { if (closed) { throw new ExecutorException("Executor was closed." ); } CacheKey cacheKey = new CacheKey(); cacheKey.update(ms.getId()); cacheKey.update(rowBounds.getOffset()); cacheKey.update(rowBounds.getLimit()); cacheKey.update(boundSql.getSql()); List<ParameterMapping> parameterMappings = boundSql.getParameterMappings(); TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry(); for (ParameterMapping parameterMapping : parameterMappings) { if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; String propertyName = parameterMapping.getProperty(); if (boundSql.hasAdditionalParameter(propertyName)) { value = boundSql.getAdditionalParameter(propertyName); } else if (parameterObject == null ) { value = null ; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else { MetaObject metaObject = configuration.newMetaObject(parameterObject); value = metaObject.getValue(propertyName); } cacheKey.update(value); } } if (configuration.getEnvironment() != null ) { cacheKey.update(configuration.getEnvironment().getId()); } return cacheKey; }
通过上面的代码,我们可以看到:CacheKey
中的影响因素有:MappedStatement
的id
、SQL语句、分页参数、运行时参数、Environment
的id
。
然后我们从代码清单6.8
的15行return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
看看这个重载方法做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public <E> List<E> query (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; try { queryStack++; list = resultHandler == null ? (List<E>) localCache.getObject(key) : null ; if (list != null ) { handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); } else { list = queryFromDatabase( ms, parameter, rowBounds, resultHandler, key, boundSql); } } finally { queryStack--; } return list; }
上面的源码是精简版的,也就是先通过缓存查询。若缓存中没有,再调用queryFromDatabase
查询数据库。接着我们再看看如何查询数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private <E> List<E> queryFromDatabase (MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; }
到此,一级缓存的逻辑就分析完了。因为它不用考虑并发问题,所以实现起来比较简单。下一节我们继续看二级缓存的实现。
6.4 二级缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public <E> List<E> query (MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException { BoundSql boundSql = ms.getBoundSql(parameterObject); CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql); return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql); } public <E> List<E> query (MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { Cache cache = ms.getCache(); if (cache != null ) { flushCacheIfRequired(ms); if (ms.isUseCache() && resultHandler == null ) { ensureNoOutParams(ms, boundSql); @SuppressWarnings ("unchecked" ) List<E> list = (List<E>) tcm.getObject(cache, key); if (list == null ) { list = delegate.query( ms, parameterObject, rowBounds, resultHandler, key, boundSql); tcm.putObject(cache, key, list); } return list; } } return delegate.query( ms, parameterObject, rowBounds, resultHandler, key, boundSql); }
由上文,tcm
(TransactionCacheManager
,事务缓存管理器)负责了查询缓存和向缓存中放入对象,我们接着看看它的逻辑。
TransactionalCacheManager
内部维护了Cache
实例与TransactionalCache
实例间的映射关系,该类也仅负责维护两者的映射关系,真正做事的还是TransactionalCache
。它是一个实现了Cache
的装饰器,为被装饰的Cache
实例增加事务功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class TransactionalCacheManager { private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>(); public void clear (Cache cache) { getTransactionalCache(cache).clear(); } public Object getObject (Cache cache, CacheKey key) { return getTransactionalCache(cache).getObject(key); } public void putObject (Cache cache, CacheKey key, Object value) { getTransactionalCache(cache).putObject(key, value); } public void commit () { for (TransactionalCache txCache : transactionalCaches.values()) { txCache.commit(); } } public void rollback () { for (TransactionalCache txCache : transactionalCaches.values()) { txCache.rollback(); } } private TransactionalCache getTransactionalCache (Cache cache) { return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new ); } }
xxx
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public class TransactionalCache implements Cache { private static final Log log = LogFactory.getLog(TransactionalCache.class); private final Cache delegate; private boolean clearOnCommit; private final Map<Object, Object> entriesToAddOnCommit; private final Set<Object> entriesMissedInCache; @Override public Object getObject (Object key) { Object object = delegate.getObject(key); if (object == null ) { entriesMissedInCache.add(key); } if (clearOnCommit) { return null ; } else { return object; } } @Override public void putObject (Object key, Object object) { entriesToAddOnCommit.put(key, object); } @Override public Object removeObject (Object key) { return null ; } @Override public void clear () { clearOnCommit = true ; entriesToAddOnCommit.clear(); } public void commit () { if (clearOnCommit) { delegate.clear(); } flushPendingEntries(); reset(); } public void rollback () { unlockMissedEntries(); reset(); } private void reset () { clearOnCommit = false ; entriesToAddOnCommit.clear(); entriesMissedInCache.clear(); } private void flushPendingEntries () { for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) { delegate.putObject(entry.getKey(), entry.getValue()); } for (Object entry : entriesMissedInCache) { if (!entriesToAddOnCommit.containsKey(entry)) { delegate.putObject(entry, null ); } } } private void unlockMissedEntries () { for (Object entry : entriesMissedInCache) { try { delegate.removeObject(entry); } catch (Exception e) { log.warn("Unexpected exception while notifiying a rollback to the cache adapter." + "Consider upgrading your cache adapter to the latest version. Cause: " + e); } } } }
通过TransactionalCache
我们可以解决数据库事务的脏读问题。
脏读问题的解决主要与entriesToAddOnCommit
集合有关。该集合用于存储本次事务中查询的结果,那为什么要将结果保存在该集合中,而非 delegate 所表示的缓存中呢?主要是因为直接存到 delegate 会导致脏数据问题。
下面先看看,假如不采用这个集合来控制,为什么就会发生脏读。
时刻2,事务 A 对记录 A 进行了更新。
时刻3,事务 A 从数据库查询记录A,并将记录 A 写入缓存中。
时刻4,事务 B 查询记录 A,由于缓存中存在记录 A,事务B 直接从缓存中取数据。这个时候,脏数据问题就发生了。事务 B 在事务 A 未提交情况下,读取到了事务 A 所修改的记录。
这就是我们需要为每个事务引入一个独立缓存的原因:
查询数据时,仍从delegate
缓存(以下统称为共享缓存)中查询。若缓存未命中,
则查询数据库。
存储查询结果时,并不直接存储查询结果到共享缓存中,而是先存储到事务
缓存中,也就是entriesToAddOnCommit
集合。当事务提交时,再将事务缓存中的缓存项转
存到共享缓存中。
这样,事务 B 只能在事务 A 提交后,才能读取到事务 A 所做的修改,解决了脏读问题。
下面我们举个例子说明脏读问题是如何得到解决的
时刻2,事务 A 和 B 同时查询记录 A。此时共享缓存中还没没有数据,所以两个事务均会向数据库发起查询请求,并将查询结果存储到各自的事务缓存中。
时刻3,事务 A 更新记录 A,这里把更新后的记录 A 记为 A′。
时刻4,两个事务再次进行查询。此时,事务 A 读取到的记录为修改后的值,而事务 B 读取到的记录仍为原值。
时刻5,事务 A被提交,并将事务缓存 A 中的内容转存到共享缓存中。
时刻6,事务 B 再次查询记录 A,由于共享缓存中有相应的数据,所以直接取缓存数据即可。因此得到记录 A′,而非记录 A。但由于事务 A 已经提交,所以事务 B 读取到的记录 A′ 并非是脏数据。
但需要注意的时,MyBatis 缓存事务机制只能解决脏读问题,并不能解决“不可重复读”问题 。再回到上图,事务 B 在被提交前进行了三次查询。前两次查询得到的结果为记录 A,最后一次查询得到的结果为 A′。最有一次的查询结果与前两次不同,这就会导致“不可重复读”的问题。MyBatis 的缓存事务机制最高只支持“读已提交”,并不能解决“不可重复读”问题。即使数据库使用了更高的隔离级别解决了这个问题,但因 MyBatis 缓存事务机制级别较低。此时仍然会导致“不可重复读”问题的发生,这个在日常开发中需要注意一下。