JedisCluster 是 jedis 连接 redis 集群操作的一个类,封装了很多操作方法和具体的类,我们先来看 JedisCluster 体系关系图:
其实从这里就可以看出整个体系结构根据是否为二进制分为两类。BinaryJedisCluster 实现了集群的二进制操作,而JedisCluster 直接继承了 BinaryJedisCluster 方法,实现了各种封装类型的方法操作。
下面我们先看下 JedisCluster 构造方法:
public JedisCluster(HostAndPort node) {this(Collections.singleton(node));}public JedisCluster(HostAndPort node, int timeout) {this(Collections.singleton(node), timeout);}public JedisCluster(HostAndPort node, int timeout, int maxAttempts) {this(Collections.singleton(node), timeout, maxAttempts);}public JedisCluster(HostAndPort node, final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), poolConfig);}public JedisCluster(HostAndPort node, int timeout, final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), timeout, poolConfig);}public JedisCluster(HostAndPort node, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), timeout, maxAttempts, poolConfig);}public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,int maxAttempts, final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, poolConfig);}public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, password, poolConfig);}public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) {this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig);}public JedisCluster(Set nodes) {this(nodes, DEFAULT_TIMEOUT);}public JedisCluster(Set nodes, int timeout) {this(nodes, timeout, DEFAULT_MAX_REDIRECTIONS);}public JedisCluster(Set nodes, int timeout, int maxAttempts) {this(nodes, timeout, maxAttempts, new GenericObjectPoolConfig());}public JedisCluster(Set nodes, final GenericObjectPoolConfig poolConfig) {this(nodes, DEFAULT_TIMEOUT, DEFAULT_MAX_REDIRECTIONS, poolConfig);}public JedisCluster(Set nodes, int timeout, final GenericObjectPoolConfig poolConfig) {this(nodes, timeout, DEFAULT_MAX_REDIRECTIONS, poolConfig);}public JedisCluster(Set jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, timeout, maxAttempts, poolConfig);}public JedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout,int maxAttempts, final GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, poolConfig);}public JedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout,int maxAttempts, String password, final GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, poolConfig);}public JedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout,int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) {super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig);}
会看到其实也没有什么就是集群信息,然后就直接调用父类的构造方法传递这些信息,下面看下父类BinaryJedisCluster 的构造方法:
public static final short HASHSLOTS = 16384;protected static final int DEFAULT_TIMEOUT = 2000;protected static final int DEFAULT_MAX_REDIRECTIONS = 5;protected int maxAttempts;protected JedisClusterConnectionHandler connectionHandler;public BinaryJedisCluster(Set nodes, int timeout) {this(nodes, timeout, DEFAULT_MAX_REDIRECTIONS, new GenericObjectPoolConfig());}public BinaryJedisCluster(Set nodes) {this(nodes, DEFAULT_TIMEOUT);}public BinaryJedisCluster(Set jedisClusterNode, int timeout, int maxAttempts,final GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,timeout);this.maxAttempts = maxAttempts;}public BinaryJedisCluster(Set jedisClusterNode, int connectionTimeout,int soTimeout, int maxAttempts, final GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,connectionTimeout, soTimeout);this.maxAttempts = maxAttempts;}public BinaryJedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,connectionTimeout, soTimeout, password);this.maxAttempts = maxAttempts;}public BinaryJedisCluster(Set jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, GenericObjectPoolConfig poolConfig) {this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,connectionTimeout, soTimeout, password, clientName);this.maxAttempts = maxAttempts;}
可以看到连接管理是通过JedisClusterConnectionHandler 来进行处理,这里看下内部处理:
public class JedisSlotBasedConnectionHandler extends JedisClusterConnectionHandler {public JedisSlotBasedConnectionHandler(Set nodes,final GenericObjectPoolConfig poolConfig, int timeout) {this(nodes, poolConfig, timeout, timeout);}public JedisSlotBasedConnectionHandler(Set nodes,final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {super(nodes, poolConfig, connectionTimeout, soTimeout, null);}public JedisSlotBasedConnectionHandler(Set nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {super(nodes, poolConfig, connectionTimeout, soTimeout, password);}public JedisSlotBasedConnectionHandler(Set nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {super(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);}@Overridepublic Jedis getConnection() {// In antirez's redis-rb-cluster implementation,// getRandomConnection always return valid connection (able to// ping-pong)// or exception if all connections are invalidList pools = cache.getShuffledNodesPool();for (JedisPool pool : pools) {Jedis jedis = null;try {jedis = pool.getResource();if (jedis == null) {continue;}String result = jedis.ping();if (result.equalsIgnoreCase("pong")) return jedis;jedis.close();} catch (JedisException ex) {if (jedis != null) {jedis.close();}}}throw new JedisNoReachableClusterNodeException("No reachable node in cluster");}@Overridepublic Jedis getConnectionFromSlot(int slot) {JedisPool connectionPool = cache.getSlotPool(slot);if (connectionPool != null) {// It can't guaranteed to get valid connection because of node// assignmentreturn connectionPool.getResource();} else {renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover stateconnectionPool = cache.getSlotPool(slot);if (connectionPool != null) {return connectionPool.getResource();} else {//no choice, fallback to new connection to random nodereturn getConnection();}}}}
接下来看下 JedisCluster 中操作数据的方法,就以 set 为例,其它的方法和这个类似:
@Overridepublic String set(final String key, final String value) {return new JedisClusterCommand(connectionHandler, maxAttempts) {@Overridepublic String execute(Jedis connection) {return connection.set(key, value);}}.run(key);}
可以看到直接创建JedisClusterCommand 实现类调用 run() 方法,执行操作。看下JedisClusterCommand 类的方法:
public abstract class JedisClusterCommand {private final JedisClusterConnectionHandler connectionHandler;private final int maxAttempts;private final ThreadLocal askConnection = new ThreadLocal();public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) {this.connectionHandler = connectionHandler;this.maxAttempts = maxAttempts;}public abstract T execute(Jedis connection);public T run(String key) {if (key == null) {throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");}return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, false);}public T run(int keyCount, String... keys) {if (keys == null || keys.length == 0) {throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");}// For multiple keys, only execute if they all share the same connection slot.int slot = JedisClusterCRC16.getSlot(keys[0]);if (keys.length > 1) {for (int i = 1; i 1) {for (int i = 1; i < keyCount; i++) {int nextSlot = JedisClusterCRC16.getSlot(keys[i]);if (slot != nextSlot) {throw new JedisClusterException("No way to dispatch this command to Redis Cluster "+ "because keys have different slots.");}}}return runWithRetries(slot, this.maxAttempts, false, false);}public T runWithAnyNode() {Jedis connection = null;try {connection = connectionHandler.getConnection();return execute(connection);} catch (JedisConnectionException e) {throw e;} finally {releaseConnection(connection);}}private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, boolean asking) {if (attempts <= 0) {throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");}Jedis connection = null;try {if (asking) {// TODO: Pipeline asking with the original command to make it// faster....connection = askConnection.get();connection.asking();// if asking success, reset asking flagasking = false;} else {if (tryRandomNode) {connection = connectionHandler.getConnection();} else {connection = connectionHandler.getConnectionFromSlot(slot);}}return execute(connection);} catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {// release current connection before recursionreleaseConnection(connection);connection = null;if (attempts <= 1) {//We need this because if node is not reachable anymore - we need to finally initiate slots renewing,//or we can stuck with cluster state without one node in opposite case.//But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.//TODO make tracking of successful/unsuccessful operations for node - do renewing only//if there were no successful responses from this node last few secondsthis.connectionHandler.renewSlotCache();}return runWithRetries(slot, attempts - 1, tryRandomNode, asking);} catch (JedisRedirectionException jre) {// if MOVED redirection occurred,if (jre instanceof JedisMovedDataException) {// it rebuilds cluster's slot cache// recommended by Redis cluster specificationthis.connectionHandler.renewSlotCache(connection);}// release current connection before recursion or renewingreleaseConnection(connection);connection = null;if (jre instanceof JedisAskDataException) {asking = true;askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));} else if (jre instanceof JedisMovedDataException) {} else {throw new JedisClusterException(jre);}return runWithRetries(slot, attempts - 1, false, asking);} finally {releaseConnection(connection);}}private void releaseConnection(Jedis connection) {if (connection != null) {connection.close();}}}
从上面的执行可以看到,其实这里执行就是先获取到 jedis 后,调用 execut() 方法执行,最终还是使用的 jedis 里面封装的方法执行。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END