本文介绍如何使用阿里云 E-MapReduce(以下简称 EMR) 部署 Storm 集群和 Kafka 集群,并运行 Storm 作业消费 Kafka 数据。
环境准备
本文选择在杭州 Region 进行测试,版本选择 EMR-3.8.0,本次测试需要的组件版本有:
- Kafka:2.11_1.0.0
- Storm: 1.0.1
本文使用阿里云 EMR 服务自动化搭建Kafka集群,详细过程请参考创建集群。
- 创建 Hadoop 集群
- 创建 Kafka 集群
说明
- 如果使用经典网络,请注意将 Hadoop 集群和Kafka集群放置在同一个安全组下面,这样可以省去配置安全组,避免网络不通的问题。
- 如果使用 VPC 网络,请注意将 Hadoop 集群和 Kafka 集群放置在同一个 VPC/VSwitch 以及安全组下面,这样同样省去配置网路和安全组,避免网络不通。
- 如果您熟悉ECS的网络和安全组,可以按需配置。
- 配置 Storm 环境
如果我们想在 Storm 上运行作业消费 Kafka 的话,集群初始环境下是会失败的,因为 Storm 运行环境缺少了必须的依赖包,如下:
- curator-client
- curator-framework
- curator-recipes
- json-simple
- metrics-core
- scala-library
- zookeeper
- commons-cli
- commons-collections
- commons-configuration
- htrace-core
- jcl-over-slf4j
- protobuf-java
- guava
- hadoop-common
- kafka-clients
- kafka
- storm-hdfs
- storm-kafka
以上版本依赖包经过测试可用,如果你再测试过程中引入了其他依赖,也一同添加在 Storm lib 中,具体操作如下:
上述操作需要在 Hadoop 集群的每台机器执行一遍。执行完在 E-MapReduce 控制台重启 Storm 服务,如下:
查看操作历史,待 Storm 重启完毕:
开发Storm和Kafka作业
EMR 已经提供了现成的示例代码,直接使用即可,地址如下:
- e-mapreduce-demo
- e-mapreduce-sdk
- Topic 数据准备
- 登录到 Kafka 集群
- 创建一个test topic,分区数10,副本数2
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:/kafka-1.0.0 --topic test --create
- 向 test topic 写入100条数据
/usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 100 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
说明 以上命令在 kafka 集群的emr-header-1节点执行,当然也可以客户端机器上执行。
- 运行 Storm 作业
登录到 Hadoop 集群,将编译得到的
/target/shaded目录下的
examples-1.1-shaded.jar拷贝到集群 emr-header-1上,这里以放在 root 根目录下面为例提交作业:/usr/lib/storm-current/bin/storm jar examples-1.1-shaded.jar com.aliyun.emr.example.storm.StormKafkaSample test aaa.bbb.ccc.ddd hdfs://emr-header-1:9000 sample
- 查看作业运行
- 查看 Storm 运行状态
查看集群上服务的 WebUI 有2种方式:
- 通过 Knox 方式,参考文档 Knox 使用说明
- SSH 隧道,参考文档SSH 登录集群
本文选择使用 SSH 隧道方式,访问地址:
http://localhost:9999/index.html。可以看到我们刚刚提交的 Topology。点进去可以看到执行详情: - 查看 HDFS 输出
- 查看 HDFS 文件输出
[root@emr-header-1 ~]# hadoop fs -ls /foo/-rw-r--r-- 3 root hadoop 615000 2018-02-11 13:37 /foo/bolt-2-0-1518327393692.txt-rw-r--r-- 3 root hadoop 205000 2018-02-11 13:37 /foo/bolt-2-0-1518327441777.txt[root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l200
- 向 Kafka写120条数据
[root@emr-header-1 ~]# /usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 120 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test120 records sent, 816.326531 records/sec (0.80 MB/sec), 35.37 ms avg latency, 134.00 ms max latency, 35 ms 50th, 39 ms 95th, 41 ms 99th, 134 ms 99.9th.
- 查看 HDFS 文件输出
[root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l320
- 查看 HDFS 文件输出
- 查看 Storm 运行状态
总结
至此,我们成功实现了在 E-MapReduce 上部署一套 Storm 集群和一套 Kafka 集群,并运行 Storm 作业消费 Kafka 数据。当然,E-MapReduce 也支持 Spark Streaming 和 Flink 组件,同样可以方便在 Hadoop 集群上运行,处理 Kafka 数据。
说明
由于 E-MapReduce 没有单独的 Storm 集群类别,所以我们是创建的 Hadoop 集群,并安装了 Storm 组件。如果你在使用过程中用不到其他组件,可以很方便地在 E-MapReduce 管理控制台将那些组件停掉。这样,可以将 Hadoop 集群作为一个纯粹的 Storm 集群使用。
原创文章,作者:网友投稿,如若转载,请注明出处:https://www.cloudads.cn/archives/33362.html