常用大数据组件HA客户端的创建(JAVA版)
文章目录
- 常用大数据组件HA客户端的创建(JAVA版)
- Elasticsearch client HA
- ClickHouse Client Advanced HA
- HBase Client HA
- HDFS Client HA
- Hive Client HA
- Kafka Client HA
- Yarn Client HA
在大数据组件采用集群化部署,并实现了HA之后,客户端与服务端建立链接时,也可以保证HA。
Elasticsearch client HA
gradle引入依赖
dependencies {// elasticsearch.client implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:6.7.2" implementation "org.elasticsearch.client:transport:6.7.2" implementation "org.elasticsearch.client:elasticsearch-rest-client:6.7.2" implementation "org.elasticsearch.client:elasticsearch-rest-client-sniffer:6.7.2"}
若ES集群在部署的时候,没有采用类似VIP负载均衡形式的时候。无论是在创建TransportClient还是RestClient时,为了增加首次创建客户端连接的成功性,需要尽可能的将集群中的ES节点一起传入。整个集群的端口需要统一。
ES客户端提供了一种嗅探器,其允许自动发现运行中ES集群中的节点。可以开启嗅探,除了在客户端启动时配置嗅探器外可以在失败时启用嗅探器,在每次失败后,节点列表都会立即更新,而不是在接下来的普通嗅探循环中更新。
/** * @param ip ES节点的ip,多个使用逗号分隔,例如:192.168.1.1,192.168.1.2,192.168.1.3 * @param port Http端口号 * @param clusterName 集群名称 * @param username 若开启安全,可传入用户名 * @param password 若开启安全,可传入密码 */ private RestHighLevelClient initSecurityRestHighLevelClientByIp(String ip, Integer port, String username, String password) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); String[] ips = ip.split(","); HttpHost[] httpHosts = new HttpHost[ips.length]; for (int i = 0; i < ips.length; i++) { httpHosts[i] = new HttpHost(ips[i], port); } // 开启嗅探 SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); RestHighLevelClient highLevelClient = new RestHighLevelClient( RestClient.builder(httpHosts).setFailureListener(sniffOnFailureListener) .setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); })); // 嗅探器可以通过setSniffIntervalMills(以毫秒为单位)更新一次节点列表,配置其在1min中后进行嗅探 Sniffer sniffer = Sniffer.builder(highLevelClient.getLowLevelClient()) .setSniffAfterFailureDelayMillis(60000).build(); sniffOnFailureListener.setSniffer(sniffer); return highLevelClient; }/** * @param ip ES节点的ip,多个使用逗号分隔,例如:192.168.1.1,192.168.1.2,192.168.1.3 * @param port tcp端口号 * @param clusterName 集群名称 */ private TransportClient initTransportClientByIp(String ip, Integer port, String clusterName) { TransportClient client = null; try { Settings settings = Settings.builder() .put("cluster.name", clusterName) .put("client.transport.sniff", true) .build(); final PreBuiltTransportClient transportClient = new PreBuiltTransportClient(settings); String[] ips = ip.split(","); TransportAddress[] transportAddresses = new TransportAddress[ips.length]; for (int i = 0; i < ips.length; i++) { transportAddresses[i] = new TransportAddress(InetAddress.getByName(ip), port); } client = transportClient.addTransportAddresses(transportAddresses); } catch (Exception e) { LOGGER.error("初始化客户端异常", e); } return client; }
ClickHouse Client Advanced HA
gradle引入依赖
dependencies {// clickhouse-jdbc implementation("com.clickhouse:clickhouse-jdbc:0.4.0:http")}
JDBC驱动式访问CH,采用HTTP的访问方式,随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端链接的高可用。
public static Connection getConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException { // 加载CH驱动 Driver driver = (Driver) Class.forName("com.clickhouse.jdbc.ClickHouseDriver").newInstance(); DriverManager.registerDriver(driver); Properties props = new Properties(); // 随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端的高可用。 String url="jdbc:ch://(http://192.168.1.1:8123),(http://192.168.1.2:8123),(http://192.168.1.3:8123)/default?failover=1&load_balancing_policy=random"; return DriverManager.getConnection(url, props); }
HBase Client HA
gradle引入依赖
dependencies {// hbase implementation "org.apache.hbase:hbase-client:1.1.2"}
HBase客户端高可用是通过Zookeeper实现的。
public static Connection getConnection() { // 创建配置 Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "192.168.1.1,192.168.1.2,192.168.1.3"); config.set("zookeeper.znode.parent", "/hbase-unsecure"); config.set("hbase.zookeeper.property.client.Port", "2181"); // 创建连接 Connection conn = null; try { UserGroupInformation userGroupInformation = UserGroupInformation.createRemoteUser("hbase"); conn = ConnectionFactory.createConnection(config, User.create(userGroupInformation)); } catch (IOException e) { LOGGER.error("HBaseClientHelper initConnect Exception.", e); } return conn; }
HDFS Client HA
gradle引入依赖
dependencies {// hadoop-client implementation("org.apache.hadoop:hadoop-client:2.7.3")}
hdfs需要的客户端就是hadoop client,hadoop的ha与hadoop集群内部的HA使用的相同的配置(来自core-site.xml
,hdfs-site.xml
),例如:
public static FileSystem fs = null; public static void init() { System.setProperty("HADOOP_USER_NAME", MonitorSupport.HADOOP_USER_NAME); Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://mycluster"); conf.set("dfs.nameservices", "mycluster"); conf.set("dfs.ha.namenodes.mycluster", "n1,n2"); conf.set("dfs.namenode.rpc-address.mycluster.n1", "192.168.1.1:8020"); conf.set("dfs.namenode.rpc-address.mycluster.n2", "192.168.1.2:8020"); conf.set("dfs.namenode.http-address.hadoopdemo.nn1", "192.168.1.1:50070"); conf.set("dfs.namenode.http-address.hadoopdemo.nn2", "192.168.1.2:50070"); conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false); conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); try { fs = FileSystem.get(conf); } catch (IOException e) { LOGGER.error(" Failed to initialize the HDFS client. ", e); } }
Hive Client HA
gradle引入依赖
dependencies {// hadoop-client implementation("org.apache.hadoop:hadoop-client:2.7.3") // hive-jdbc implementation "org.apache.hive:hive-jdbc:1.2.1"}
Hive 本身也是使用的Hadoop Client,所有需要引入hadoo-client的包。
Hive分为HiveServer2 高可用和Metastore 高可用。业务方访问Hive是通过HiveServer2进行的。HiveServer2的高可用是通过Zookeeper来实现高可用。
java客户端可以通过JDBC的方式来访问ZK上HiveServer2的HA的namespace,来实现高可用。
public static Connection getConnection() throws SQLException, ClassNotFoundException, InstantiationException, IllegalAccessException { // 加载hive驱动 Driver driver = (Driver) Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance(); DriverManager.registerDriver(driver); Properties props = new Properties(); props.put("user","hdfs"); // 随机选择几个端点中的一个进行连接,当出现连接问题时,故障转移到另一个。通过这种方式实现客户端的高可用。 String url="jdbc:hive2://192.168.1.1:2181,192.168.1.2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk"; return DriverManager.getConnection(url, props); }
Kafka Client HA
gradle引入依赖
dependencies {// kafka implementation "org.apache.kafka:kafka-clients:1.0.0"}
kafka是通过broker来创建连接的,为了保证客户端连接的高可用,可以多配置一些broker地址。无论是AdminClient,还是KafkaProducer和KafkaConsumer,效果是一样有的。
// AdminClientProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);AdminClient kafkaAdminClient = AdminClient.create(PROPERTIES);// KafkaConsumerProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);PROPERTIES.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());PROPERTIES.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());PROPERTIES.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerAnalysisGroup-1");PROPERTIES.put(ConsumerConfig.CLIENT_ID_CONFIG, "monitor-consumer-client-1");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PROPERTIES);// KafkaProducerProperties PROPERTIES = new Properties();PROPERTIES.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092");PROPERTIES.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 30000);PROPERTIES.put(ProducerConfig.CLIENT_ID_CONFIG, "monitor-producer-client-1");PROPERTIES.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());PROPERTIES.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(PROPERTIES);
Yarn Client HA
gradle引入依赖
dependencies {// hadoop-client implementation("org.apache.hadoop:hadoop-client:2.7.3")}
Yarn需要的客户端与yarn集群内部的HA使用的相同的配置(来自yarn-site.xml
),例如:
YarnClient client = YarnClient.createYarnClient(); Configuration conf = new Configuration(); conf.set("yarn.resourcemanager.ha.enabled", "true"); conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2"); conf.set("yarn.resourcemanager.hostname.rm1", "192.168.1.1"); conf.set("yarn.resourcemanager.hostname.rm2", "192.168.1.2"); client.init(conf); client.start();