一、Storm集群构建
编写storm 与 zookeeper的yml文件
storm yml文件的编写
具体如下:
version: '2'services: zookeeper1: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk1.cloud environment: - SERVER_ID=1 - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888 zookeeper2: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk2.cloud environment: - SERVER_ID=2 - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888 zookeeper3: image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8 container_name: zk3.cloud environment: - SERVER_ID=3 - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888 - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888 ui: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: ui -c nimbus.host=nimbus environment: - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud restart: always container_name: ui ports: - 8080:8080 depends_on: - nimbus nimbus: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: nimbus -c nimbus.host=nimbus restart: always environment: - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud container_name: nimbus ports: - 6627:6627 supervisor: image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0 command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703] restart: always environment: - affinity:role!=supervisor - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud depends_on: - nimbusnetworks: default: external: name: zk-net
拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8 storm:1.0.0
键入命令:
docker pull zookeeper:3.4.8 docker pull storm:1.0.0
storm镜像 获取
使用docker-compose 构建集群
在power shell中执行以下命令:
docker-compose -f storm.yml up -d
docker-compose 构建集群
在浏览器中打开localhost:8080 可以看到storm集群的详细情况
storm UI 展示
二、Storm统计任务
统计股票交易情况交易量和交易总金额 (数据文件存储在csv文件中)
编写DataSourceSpout类
DataSourceSpout类
编写bolt类
编写topology类
需要注意的是 Storm Java API 下有本地模型和远端模式
在本地模式下的调试不依赖于集群环境,可以进行简单的调试
如果需要使用生产模式,则需要将
1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包
2、将上述的jar包放到客户端代码能读到的任何位置,
3、使用如下方式定义一个拓扑(Topology)
演示结果:
本地模式下的调试:
正在执行:
根据24小时
根据股票种类
生产模式:
向集群提交topology
三、核心计算bolt的代码
1.统计不同类型的股票交易量和交易总金额:
package bolt; import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.Set; import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values; @SuppressWarnings("serial")public class TypeCountBolt extends BaseRichBolt { OutputCollector collector; Map map = new HashMap(); Map map2 = new HashMap(); public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getStringByField("line"); String[] data = line.split(","); Integer count = map.get(data[2]); Float total_amount = map2.get(data[2]); if(count==null){ count = 0; } if(total_amount==null){ total_amount = 0.0f; } count++; total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]); map.put(data[2],count); map2.put(data[2],total_amount); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~"); Set<Map.Entry> entrySet = map.entrySet(); for(Map.Entry entry :entrySet){ System.out.println("交易量:"); System.out.println(entry); } System.out.println(); Set<Map.Entry> entrySet2 = map2.entrySet(); for(Map.Entry entry :entrySet2){ System.out.println("交易总金额:"); System.out.println(entry); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
2. 统计不同每个小时的交易量和交易总金额
package bolt; import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple; import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.Set; public class TimeCountBolt extends BaseRichBolt { OutputCollector collector; Map map = new HashMap(); Map map2 = new HashMap(); public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getStringByField("line"); String[] data = line.split(","); Date date = new Date(); SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); try { date = dateFormat.parse(data[0]); } catch (ParseException e) { e.printStackTrace(); } Integer count = map.get(date.getHours()); Float total_amount = map2.get(date.getHours()); if(count==null){ count = 0; } if(total_amount==null){ total_amount = 0.0f; } count++; total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]); map.put(date.getHours(),count); map2.put(date.getHours(),total_amount); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~"); Set<Map.Entry> entrySet = map.entrySet(); for(Map.Entry entry :entrySet){ System.out.println("交易量:"); System.out.println(entry); } System.out.println(); Set<Map.Entry> entrySet2 = map2.entrySet(); for(Map.Entry entry :entrySet2){ System.out.println("交易总金额:"); System.out.println(entry); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { }}