Sentinel的的部署和实践已经结束了,本篇文章是Redis的Java客户端端JedisSentinelAPI的使用和源码解读。

简单例子:

public class RedisSentinelTest {
	public static void main(String[] args) throws Exception {
		
		//Sentinel的配置
		Set<String> sentinels = new HashSet<String>();
		sentinels.add("10.10.10.126:26379");
		sentinels.add("10.10.10.126:26479");
		sentinels.add("10.10.10.126:26579");

		//JedisPool 连接池的配置
		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
		config.setMaxTotal(10000);
		config.setMaxIdle(200);
		config.setTestOnBorrow(true);
		config.setMaxWaitMillis(1000);
		
		//创建JedisPool
		JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config);
		
		//获取Jedis连接
		Jedis jedis = pool.getResource();

		//从Redis中获取k1的值
		String v1 = jedis.get("k1");
		System.out.println(v1);

		//关闭Jedis,返回pool中
		jedis.close();
	}
}

注释写的应该能看懂了,就不啰嗦了 ^_^^_^


线上使用的例子看下面:

1)spring配置

<bean id="jedisPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
 <property name="maxTotal" value="10000" />
 <property name="maxIdle"  value="200" />
 <property name="maxWaitMillis" value="1000" />
</bean>
<bean id="redisSentinel" class="redis.clients.jedis.JedisSentinelPool">
 <constructor-arg index="0" value="mymaster" />
 <constructor-arg index="1">
  <set>  
   <value>10.10.10.126:26379</value>
   <value>10.10.10.126:26479</value>  
   <value>10.10.10.126:26579</value>    
  </set>
 </constructor-arg>
 <constructor-arg index="2" ref="jedisPoolConfig"/>
 <!--constructor-arg index="3" value="tiger"/--> <!-- password configuration-->
</bean>

2)连接基础类:RedisUtil.java

public class RedisUtil {
	private final static RedisUtil POOL_UTIL = new RedisUtil();

	public static RedisUtil getInstance() {
		return POOL_UTIL;
	}

	private RedisUtil() {
	}
	
	private JedisSentinelPool pool;

	private JedisSentinelPool getJedisPool() {
		if (pool == null) {			
			synchronized (this) {
				if (pool == null) {
					init();
				}
			}
		}
		return pool;
	}

	private void init() {
		try {
			pool = (JedisSentinelPool) Config.ctx.getBean("redisSentinel");
		} catch (Exception e) {
			e.printStackTrace();
			throw new ExceptionInInitializerError("启动系统异常,系统无法启动,请检查配置文件是否存在!");
		}
	}

	//获取Redis连接
	public Jedis getConnection() {
		Jedis jedis = null;
		try {
			jedis = getJedisPool().getResource();
			jedis.select(0);
		} catch (Exception e) {
			if (jedis != null)
				jedis.close();
			e.printStackTrace();
			throw new RuntimeException("连接Redis异常.", e);
		}
		return jedis;
	}

	//关闭Redis连接
	public void closeConnection(Jedis jedis) {
		if (jedis != null) {
			try {
				jedis.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

3)使用例子

	public String getValue() {
		Jedis jedis = RedisUtil.getInstance().getConnection();
		try {
			return jedis.get("k100");
		} catch (Exception e) {
			;
		} finally {
			RedisUtil.getInstance().closeConnection(jedis);
		}
		return "";
	}

JedisSentinelAPI源码阅读

其实源码也比较简单,就几个类,还是看文章开头的代码例子

顺着这一句 JedisSentinelPool pool = new JedisSentinelPool("mymaster", sentinels, config); 是创建了一个Jedis的连接池,

"mymaster" 是我们配置监视Master时的名字; sentinels 则是一个Set集合,里面是所有Sentinel的信息,用IP:PORT组成字符串;config 是连接池的一些信息

JedisSentinelPool extends Pool , JedisSentinelPool 有几个构造方法,根据实际情况来使用不同,但最终会调用下面的构造方法

 public JedisSentinelPool(String masterName, Set<String> sentinels,
     final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
     final String password, final int database, final String clientName) {
   this.poolConfig = poolConfig;
   this.connectionTimeout = connectionTimeout;
   this.soTimeout = soTimeout;
   this.password = password;
   this.database = database;
   this.clientName = clientName;

   HostAndPort master = initSentinels(sentinels, masterName);
   initPool(master);
 }

注意红色的部分 initSentinels,其实是从Sentinel里面获取当前Redis Master信息,看源码,里面我已经加了注释


  private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

    HostAndPort master = null;          //构造一个Redis服务配置对象,里面包括ip和port
    boolean sentinelAvailable = false;  //sentinel服务是否正常

    log.info("Trying to find master from available Sentinels...");

    for (String sentinel : sentinels) {//遍历所有配置的Sentinel的信息
      final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); //把配置的Sentinel的信息组合成个一个对象,我们当初配置就是 ip:port 的字符串

      log.fine("Connecting to Sentinel " + hap);

      Jedis jedis = null;
      try {
        jedis = new Jedis(hap.getHost(), hap.getPort()); //和Sentinel服务建立连接

        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); //这里其实是 sentinel get-master-addr-by-name mymaster  的实现,来获取Master的 ip和port 信息
        // connected to sentinel...
        sentinelAvailable = true;//执行此,则认为sentinel的服务可用。但这里只是当前连接的sentienl服务可用,并不能说明sentinel集群可用

        if (masterAddr == null || masterAddr.size() != 2) { //没有从当前sentinel服务里面获取master信息,可能sentinel的配置信息有误导致,继续访问下一个sentinel服务
          log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
              + ".");
          continue;
        }

        master = toHostAndPort(masterAddr); //构建Redis Master的配置对象
        log.fine("Found Redis master at " + master);
        break; //跳出遍历,只要从一个Sentinel来获取Master信息正常就可以了
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
            + ". Trying next one.");
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }

    if (master == null) { // master的信息没有取到
      if (sentinelAvailable) { // sentinel服务正常,此时说明sentinel的配置有问题,没有配置监视master的信息
        // can connect to sentinel, but master name seems to not
        // monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }

    log.info("Redis master running at " + master + ", starting Sentinel listeners...");

    for (String sentinel : sentinels) { //这里很重要! 客户端对每个Sentinel服务建立监听,当发生failover时,获取新的Master信息,重新建立pool连接
      final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
      MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  } 

我们看下 MasterListener 这个线程类,它订阅了"+switch-master" 这个channel,当发生failover时会收到Sentinel发来的消息,消息如下

"mymaster 10.10.10.126 6379 10.10.10.118 6379"  ,格式是“监视master名称 旧Master的ip 旧Master的port 新Master的ip 新Master的port”

    public void run() {

      running.set(true); // AtomicBoolean 类型,此类提供了一些原子操作

      while (running.get()) {

        j = new Jedis(host, port); // 和Sentinel建立连接。j前面修饰符 volatile, 多线程间值可见性,禁止编译器对指令的重排序

        try {
          // double check that it is not being shutdown
          if (!running.get()) { //二次check,防止线程destroy掉
            break;
          }

          j.subscribe(new JedisPubSub() { //订阅服务 channel是"+switch-master"
            @Override
            public void onMessage(String channel, String message) {
              log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");

              String[] switchMasterMsg = message.split(" "); //解析sentinel发过来的消息,以空格分隔

              if (switchMasterMsg.length > 3) { //check解析出来的消息的长度是否满足

                if (masterName.equals(switchMasterMsg[0])) { //由于sentinel可以监视多个master信息,所以这里要确认是否是本身的master
                  initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); //重新初始化pool
                } else {
                  log.fine("Ignoring message on +switch-master for master name "
                      + switchMasterMsg[0] + ", our master name is " + masterName);
                }

              } else {
                log.severe("Invalid message received on Sentinel " + host + ":" + port
                    + " on channel +switch-master: " + message);
              }
            }
          }, "+switch-master");
        } catch (JedisConnectionException e) { ;} finally { j.close(); } } }