「javastorm程序」storm代码

博主:adminadmin 2023-01-08 03:21:06 440

今天给各位分享javastorm程序的知识,其中也会对storm代码进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

storm基本概念

流式计算中,各个中间件产品对计算过程中的角色的抽象都不尽相同,实现方式也是千差万别。本文针对storm中间件在进行流式计算中的几个概念做个概括总结。

storm分布式计算结构称为topology(拓扑)由stream,spout,bolt组成。

spout代表一个storm拓扑中的数据入口,连接到数据源,将数据转化为一个个tuple,并发射tuple

stream是由无限制个tuple组成的序列。tuple为storm的核心数据结构,是包含了一个或多个键值对的列表。

bolt可以理解为计算程序中的运算或者函数,bolt的上游是输入流,经过bolt实施运算后,可输出一个或者多个输出流。

bolt可以订阅多个由spout或者其他bolt发射的数据流,用以构建复杂的数据流转换网络。

上述即为storm最基本的组成元素,无论storm如何运行,都是以stream,spout,bolt做为最基本的运行单元。而这三者则是共同构成了一个storm拓扑topology。

首先需要明确一个概念,bolt,spout实例,都属于任务,spout产生数据流,并发射,bolt消费数据流,进行计算,并进行落地或再发射,他们的存在以及运行过程都需要消耗资源,而storm集群是一个提供了资源的集群,我们要做的就是将spout/boult实例合理分配到storm集群提供的计算资源上,这样就可以让spout/bolt得以执行。

worker为JVM进程,一个topology会分配到一个或者多个worker上运行。

executor是worker内的java线程,是具体执行bolt/spout实例用的。下篇文章在介绍如何提供storm并行计算能力时会介绍worker以及executor的配置。

在storm中,worker是由supervisor进程创建,并进行监控的。storm集群遵循主从模式,主为nimbus,从为supervisor,storm集群由一个主节点(确实有单点问题),和多个工作节点(supervisor)组成,并使用zookeeper来协调集群中的状态信息,比如任务分配情况,worker状态,supervisor的拓扑度量。

通过配置可指定supervisor上可运行多少worker。一个worker代表一个slot。

nimbus守护进程的主要职责是管理,协调和监控在集群上运行的topology.包括topology的发布,任务指派,事件处理失败时重新指派任务。

supervisor守护进程等待nimbus分配任务后生成并监控workers执行任务。supervosior和worker都是运行在不同的JVM进程上。

了解了集群模式下,storm大致的分布概念,下面结合笔者做的一个实例,了解一下如何发布计算资源到storm集群上。

笔者定义了一个spout,两个bolt 运算过程如下:

其中streamMaking是一个不断生成随机数(5~30)的spout实例,Step1Bolt会过滤掉15以下的随机数(过滤),15以上的随机数会乘以16(计算),再将结果向后发射。Step2Bolt订阅Step1Bolt发射的数据,接收数据后,打印输出。流程结束。

笔者在定义spout/bolt实例时,配置了spout,bolt的并行执行数。其中

streamMaking:4   Step1Bolt:2  Step2Bolt 1

这样,发布成功后,storm会根据我的配置,分配足够的计算资源给予spout/bolt进行执行。

发布:

发布时,spout和bolt都是在一起以jar的形式发布到nimbus上的,分配后,内部定义的spout和bolt将以组件的形式被nimbus分配至worker进程中执行。

其中worker都是由supervisor创建的,创建出来的worker进程与supervisor是分开的不同进程。一个supervisor可创建多少worker可通过修改storm安装目录下的storm.yaml进行配置。

task是执行的最小单元。spout/bolt实例在定义中指定了,要起多少task,以及多少executor。也即一个topology发布之前已经定义了task总量,和需要多少资源来执行我的task总量。nimbus将根据已有的计算资源进行分配。

下图中:  nimbus左边代表着计算任务量,和所需计算配置

nimbus右边代表着计算资源

nimbus将根据计算资源信息,合理的分发计算任务量。

发布成功后,通过storm自带的UI功能,可以查看你发布的topology运行以及其中每个组件的分布执行情况。

监控图像中清晰的显示了,目前部署的topology,以及topology中每个组件所分配的计算资源所在host,以及每个组件发射了多少tuple,接收了多少tuple,以及有多少个executor在并行执行。

本文讲述了storm内的基本元素以及基本概念,后续将讲述storm的重点配置信息,以及如何提高并发计算能力,窗口概念等高级特性,后续会进行源码分析,以及与其他实时计算中间件的比较。

storm分布式计算与问题connection refuse排查。

由于项目需要,需要用到storm做分布式计算与数据处理,storm的原理和相关介绍就不在此赘叙了。

项目中storm下发的bolt有2层:

首先编写一个topology:

public class HomeBandToplogy {

private static final String TOPOLOGY_NAME = "HomeBandToplogy";

private static final String KAFKA_SPOUT = "kafkaSpout";

private static final String KAFKA_BOLT = "kafkaBolt";

private static final String ANYNASIS_BOLT = "AnynasisBolt";

private static final Log log = LogFactory.getLog(HomeBandToplogy.class);

}

然后编写一个kafkabolt和一个AnynasisBolt,如下:

kafkabolt:

public class KafkaBolt extends BaseRichBolt {

OutputCollector collector;

Log logger;

}

AnynasisBolt:

public class AnynasisBolt extends BaseRichBolt {

private OutputCollector collector;

Log logger;

}

工厂类:

public class BoxFactory {

}

接口类:

public interface BoxService extends Serializable {

}

抽象类:

**

@override

public Boolean executeRedis( return null;);

@override

public Boolean executeHbase( return null;);

}

storm程序启动以后,小批量数据运行正常。

继续加大数据测试,数据量达到几十万的时候,出现异常,异常如下:

[ERROR] connection attempt 9 to Netty-Client-node5/172.16.1.100:6700 failed: java.net.ConnectException: Connection refused: node5/172.16.1.100:6700

2018-11-16 17:46:11.533 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 420ms (MAX)

同时storm程序大量ack失败。

开始以后是线程数过多,以及环境资源紧张导致此种异常。

后来经过重重排查,将接口去掉,将抽象类中的方法变为抽象方法后,程序运行正常。

为何出现这样的异常呢?

原因在于,storm在处理的时候只会处理当前进程下的任务,跨进程的调度是无法实现的,故产生这样的故障。

请教storm本地模式启动的问题

一、介绍

storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies

因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

二、实施步骤

如何基于eclipse+maven调试storm程序,步骤如下:

1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

Github上的pom.xml,引入的依赖太多,有些不需要,

3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

重要的是LocalCluster cluster = new LocalCluster();这一句

Config conf = new Config();

conf.setDebug(true);

conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

Utils.sleep(10000);

cluster.killTopology("test");

cluster.shutdown();

pom.xml文件

project xmlns="" xmlns:xsi=""

xsi:schemaLocation=" "

4.0.0

storm.starter

storm-starter

0.0.1-SNAPSHOT

jar

UTF-8

github-releases

clojars.org

junit

junit

4.11

test

storm

storm

0.9.0.1

provided

commons-collections

commons-collections

3.2.1

storm程序

package storm.starter;

import java.util.HashMap;

import java.util.Map;

import storm.starter.spout.RandomSentenceSpout;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

/**

* This topology demonstrates Storm's stream groupings and multilang

* capabilities.

*/

public class WordCountTopology {

public static class SplitSentence extends BaseBasicBolt {

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

try {

String msg = input.getString(0);

System.out.println(msg + "-------------------");

if (msg != null) {

String[] s = msg.split(" ");

for (String string : s) {

collector.emit(new Values(string));

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

public static class WordCount extends BaseBasicBolt {

Map counts = new HashMap();

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0);

Integer count = counts.get(word);

if (count == null)

count = 0;

count++;

counts.put(word, count);

collector.emit(new Values(word, count));

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "count"));

}

}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(

"spout");

builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",

new Fields("word"));

Config conf = new Config();

conf.setDebug(true);

if (args != null args.length 0) {

conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf,

builder.createTopology());

} else {

conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("word-count", conf, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();

}

}

}

package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

import java.util.Map;

import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {

SpoutOutputCollector _collector;

Random _rand;

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_collector = collector;

_rand = new Random();

}

@Override

public void nextTuple() {

Utils.sleep(100);

String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };

String sentence = sentences[_rand.nextInt(sentences.length)];

_collector.emit(new Values(sentence));

}

@Override

public void ack(Object id) {

}

@Override

public void fail(Object id) {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

}

关于javastorm程序和storm代码的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。