「java调用flink」java调用flink实现实时

博主:adminadmin 2023-03-20 03:24:10 319

今天给各位分享java调用flink的知识,其中也会对java调用flink实现实时进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

使用下面其中一个命令来创建Flink Java工程

1、使用Maven archetypes:

$ mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=1.0.0

2、运行quickstart脚本

$ curl | bash

Flink内存管理

java所有数据类型对应的字节大小

java对象的组成 : 对象头,实例数据,对齐部分

jvm 序列化缺点

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .

同时MemorySegment也提供了对二进制数据的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

类继承图

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r实际写入的是MemorySegment 写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

buffer申请

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是

Flink的类加载器解析

在运行 Flink 应用程序时,JVM 会随着时间的推移加载各种类。 这些类可以根据它们的来源分为三组:

作为一般规则,无论何时您先启动 Flink 进程然后再提交作业,作业的类都会动态加载。 如果 Flink 进程与作业/应用程序一起启动,或者如果应用程序产生 Flink 组件(JobManager、TaskManager 等),那么所有作业的类都在 Java 类路径中。

插件组件中的代码由每个插件的专用类加载器动态加载一次。

以下是有关不同部署模式的更多详细信息:

当作为独立会话启动 Flink 集群时,JobManagers 和 TaskManagers 使用 Java 类路径中的 Flink 框架类启动。 针对会话(通过 REST / CLI)提交的所有作业/应用程序中的类都是动态加载的。

Docker / Kubernetes 设置首先启动一组 JobManagers / TaskManagers,然后通过 REST 或 CLI 提交作业/应用程序,其行为类似于独立会话:Flink 的代码位于 Java 类路径中,插件组件和作业代码在启动时动态加载。

YARN 类加载在单个作业部署和会话之间有所不同:

当直接向 YARN 提交 Flink 作业/应用程序时(通过 bin/flink run -m yarn-cluster ...),将为该作业启动专用的 TaskManager 和 JobManager。 这些 JVM 在 Java 类路径中具有用户代码类。 这意味着在这种情况下,作业不涉及动态类加载。

当启动一个 YARN 会话时,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架类启动的。 针对会话提交的所有作业的类都是动态加载的。

在涉及动态类加载的设置中(插件组件、会话设置中的 Flink 作业),通常有两个类加载器的层次结构:(1)Java 的应用程序类加载器,它包含类路径中的所有类,以及(2)动态插件/ 用户代码类加载器。 用于从插件或用户代码 jar 加载类。 动态 ClassLoader 将应用程序类加载器作为其父级。

默认情况下,Flink 反转类加载顺序,这意味着它首先查看动态类加载器,如果类不是动态加载代码的一部分,则仅查看父类(应用程序类加载器)。

反向类加载的好处是插件和作业可以使用与 Flink 核心本身不同的库版本,这在不同版本的库不兼容时非常有用。 该机制有助于避免常见的依赖冲突错误,如 IllegalAccessError 或 NoSuchMethodError。 代码的不同部分只是具有单独的类副本(Flink 的核心或其依赖项之一可以使用与用户代码或插件代码不同的副本)。 在大多数情况下,这运行良好,不需要用户进行额外配置。

但是,在某些情况下,反向类加载会导致问题(请参阅下文,“X cannot be cast to X”)。 对于用户代码类加载,您可以通过在 Flink 配置中通过 classloader.resolve-order 将 ClassLoader 解析顺序配置为 parent-first(从 Flink 的默认 child-first)来恢复到 Java 的默认模式。

请注意,某些类总是以父级优先的方式解析(首先通过父类加载器),因为它们在 Flink 的核心和插件/用户代码或面向插件/用户代码的 API 之间共享。 这些类的包是通过 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父级优先加载的新包,请设置 classloader.parent-first-patterns-additional 配置选项。

所有组件(JobManger、TaskManager、Client、ApplicationMaster 等)在启动时记录它们的类路径设置。 它们可以作为日志开头的环境信息的一部分找到。

当运行 JobManager 和 TaskManagers 专用于一项特定作业的设置时,可以将用户代码 JAR 文件直接放入 /lib 文件夹中,以确保它们是类路径的一部分而不是动态加载。

通常将作业的 JAR 文件放入 /lib 目录中。 JAR 将成为类路径(AppClassLoader)和动态类加载器(FlinkUserCodeClassLoader)的一部分。 因为 AppClassLoader 是 FlinkUserCodeClassLoader 的父级(并且 Java 加载父级,默认情况下),这应该导致类只加载一次。

对于无法将作业的 JAR 文件放入 /lib 文件夹的设置(例如因为安装程序是由多个作业使用的会话),仍然可以将公共库放入 /lib 文件夹,并避免动态为那些类进行加载。

在某些情况下,转换函数、源或接收器需要手动加载类(通过反射动态加载)。 为此,它需要能够访问作业类的类加载器。

在这种情况下,函数(或源或接收器)可以成为 RichFunction(例如 RichMapFunction 或 RichWindowFunction)并通过 getRuntimeContext().getUserCodeClassLoader() 访问用户代码类加载器。

在使用动态类加载的设置中,您可能会看到 com.foo.X cannot be cast to com.foo.X 样式中的异常。 这意味着 com.foo.X 类的多个版本已被不同的类加载器加载,并且该类的类型试图相互分配。

一个常见的原因是库与 Flink 的反向类加载方法不兼容。 您可以关闭反向类加载来验证这一点(在 Flink 配置中设置 classloader.resolve-order: parent-first)或从反向类加载中排除库(在 Flink 配置中设置 classloader.parent-first-patterns-additional)。

另一个原因可能是缓存对象实例,如 Apache Avro 之类的某些库或通过注册(例如通过 Guava 的 Interners)生成的对象实例。 这里的解决方案是要么在没有任何动态类加载的情况下进行设置,要么确保相应的库完全是动态加载代码的一部分。 后者意味着该库不能被添加到 Flink 的 /lib 文件夹中,而必须是应用程序的 fat-jar/uber-jar 的一部分

所有涉及动态用户代码类加载(会话)的场景都依赖于再次卸载类。 类卸载意味着垃圾收集器发现类中不存在任何对象,因此删除该类(代码、静态变量、元数据等)。

每当 TaskManager 启动(或重新启动)一个任务时,它将加载该特定任务的代码。 除非可以卸载类,否则这将成为内存泄漏,因为加载了新版本的类,并且加载的类总数会随着时间的推移而累积。 这通常通过 OutOfMemoryError: Metaspace 表现出来。

类泄漏的常见原因和建议的修复:

卸载动态加载类的一个有用工具是用户代码类加载器释放钩子。 这些是在卸载类加载器之前执行的钩子。 通常建议关闭和卸载资源作为常规函数生命周期的一部分(通常是 close() 方法)。 但在某些情况下(例如对于静态字段),最好在不再需要类加载器时卸载。

类加载器释放钩子可以通过 RuntimeContext.registerUserCodeClassLoaderReleaseHookIfAbsent() 方法注册。

从应用程序开发人员的角度解决依赖冲突的一种方法是通过隐藏它们来避免暴露依赖关系。

Apache Maven 提供了 maven-shade-plugin,它允许在编译后更改类的包(因此您编写的代码不受阴影影响)。 例如,如果您的用户代码 jar 中有来自 aws sdk 的 com.amazonaws 包,则 shade 插件会将它们重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代码调用您的 aws sdk 版本。

注意 Flink 的大部分依赖,比如 guava、netty、jackson 等,都被 Flink 的维护者屏蔽掉了,所以用户通常不用担心。

操作符将一个或多个数据流转换为新的数据流。程序可以将多个转换组合成复杂的数据流拓扑。

本节描述了基本转换、应用这些转换后的有效物理分区以及对Flink的操作符chaining的理解。

通过匿名模式匹配从元组、case类和集合中提取,如下所示:

API不支持开箱即用。要使用这个特性,您应该使用 Scala API扩展

元组的数据流可以进行以下转换:

JAVA

Flink还通过以下函数对转换后的流分区进行低级控制(如果需要的话)。

链接两个后续转换意味着将它们放在同一个线程中以获得更好的性能。如果可能的话,Flink默认情况下是链操作符(例如,两个后续的map转换)。如果需要,该API提供了对链接的细粒度控制:

如果想在整个作业中禁用链接,请使用 StreamExecutionEnvironment.disableOperatorChaining() 。对于更细粒度的控制,可以使用以下函数。请注意,这些函数只能在DataStream转换之后使用,因为它们引用前一个转换。例如,您可以使用 someStream.map(…). startnewchain() ,但不能使用 someStream.startNewChain() 。

资源组是Flink中的插槽,参见 插槽 。如果需要,可以手动将操作符隔离在不同的插槽中。

怎么在java的flink中调用python程序?

一、在java类中直接执行python语句

import org.python.util.PythonInterpreter;

public class FirstJavaScript {

public static void main(String args[]) {

PythonInterpreter interpreter = new PythonInterpreter();

interpreter.exec("days=('mod','Tue','Wed','Thu','Fri','Sat','Sun'); ");

interpreter.exec("print days[1];");

}// main

}

调用的结果是Tue,在控制台显示出来,这是直接进行调用的。

二、在java中调用本机python脚本中的函数

首先建立一个python脚本,名字为:my_utils.py

def adder(a, b):

return a + b

然后建立一个java类,用来测试,

java类代码 FirstJavaScript:

import org.python.core.PyFunction;

import org.python.core.PyInteger;

import org.python.core.PyObject;

import org.python.util.PythonInterpreter;

public class FirstJavaScript {

public static void main(String args[]) {

PythonInterpreter interpreter = new PythonInterpreter();

interpreter.execfile("C:\\Python27\\programs\\my_utils.py");

PyFunction func = (PyFunction) interpreter.get("adder",

PyFunction.class);

int a = 2010, b = 2;

PyObject pyobj = func.__call__(new PyInteger(a), new PyInteger(b));

System.out.println("anwser = " + pyobj.toString());

}// main

}

得到的结果是:anwser = 2012

三、使用java直接执行python脚本

建立脚本inputpy

#open files

print 'hello'

number=[3,5,2,0,6]

print number

number.sort()

print number

number.append(0)

print number

print number.count(0)

print number.index(5)

建立java类,调用这个脚本:

import org.python.util.PythonInterpreter;

public class FirstJavaScript {

public static void main(String args[]) {

PythonInterpreter interpreter = new PythonInterpreter();

interpreter.execfile("C:\\Python27\\programs\\input.py");

}// main

}

得到的结果是:

hello

[3, 5, 2, 0, 6]

[0, 2, 3, 5, 6]

[0, 2, 3, 5, 6, 0]

2

3

Flink性能调优(一)

Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情况优化。

监控节点进程的YARN的Container GC日志,如果频繁出现Full GC,需要优化GC。

GC的配置:在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“

此处默认已经添加GC日志。

任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。

•在使用yarn-session命令时,添加“-jm MEM”参数设置内存。

•在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。

每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。

•在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。

•在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。

每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。

•在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。

•在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。

TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。

•将在使用yarn-sesion命令时,添加“-tm MEM”参数设置内存。

•将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。

java调用flink的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java调用flink实现实时、java调用flink的信息别忘了在本站进行查找喔。