flink sql 优化

文章目录

  • 一、参数方面
  • 二、资源方面
  • 三、总结


提示:实时flink sql 参考很多网上方法与自己实践方法汇总(版本:flink1.13+)

一、参数方面

  • flink sql参数配置
//关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点   ###有用的参数
pipeline.operator-chaining: 'true'
//指定时区  ###实用的参数
table.local-time-zone: Asia/Shanghai
//对flink sql是否要敏感大小(建议false,不区分大小写。默认为true)
table.identifier-case-sensitive: 'false'
//开启 miniBatch
table.exec.mini-batch.enabled: 'true'
//批量输出的间隔时间
table.exec.mini-batch.allow-latency: 5s
//防止 OOM 设置每个批次最多缓存数据的条数
table.exec.mini-batch.size: '500'
//提交批次数据大小
batchSize: '127108864'
//刷数据间隔
flushIntervalMs: '60000'
//几个flush线程
numFlushThreads: '5'
// 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector
compressAlgorithm: snappy
//开启异步状态后端
state.backend.async: 'true'
//状态后端开启增量(默认就是true 增量)
state.backend.incremental: 'true'
//作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源  ###有用的参数
table.exec.split-slot-sharing-group-per-vertex: 'true'
//Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升
execution.checkpointing.interval: 180s
//State数据的生命周期,单位为毫秒。默认36小时
table.exec.state.ttl: 129600000
//Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间   ###有用的参数
execution.checkpointing.timeout	:10min
  • datastream代码配置
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal(job有聚合函数使用)
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目 (job有聚合函数使用)
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的缓存条数 (job有分组top使用)
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");
  • flink sql 简单作业优化实验截图
    1).调大checkpoint生成时间
    在这里插入图片描述
    在这里插入图片描述
    2).去掉参数:pipeline.operator-chaining: ‘false’
    在这里插入图片描述
    在这里插入图片描述
    3).加攒批参数
    在这里插入图片描述
    在这里插入图片描述
    4).由于full GC导致job性能过差(排查)
    在这里插入图片描述
    查看gc日志:
    在这里插入图片描述
    在这里插入图片描述
    解决方案:对taskmanager增加内存(jobmanager略,因为它很少会出现频繁full gc)。

5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致:

  • 检查增量快照是否正常配置并生效。
  • 是否为特定情况。在特定情况下,这种现象是正常的,例如:
    a.在数据注入前(18:29之前),作业没有处理任何数据,此时Checkpoint只包含了初始化的源(Source)状态信息。由于没有其他状态数据,此时的Checkpoint实际上是一个全量Checkpoint。
    b.在18:29时注入了100万条数据。假设数据在接下来的Checkpoint间隔时间(3分钟)内被完全处理,并且期间没有其他数据注入,此时发生的第一个增量Checkpoint将会包含这100万条数据产生的所有状态信息。
    在这种情况下,全量Checkpoint和增量Checkpoint的大小一致是符合预期的。因为第一个增量Checkpoint需要包含全量数据状态,以确保能够从该点恢复整个状态,导致它实际上也是一个全量Checkpoint。

增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。

二、资源方面

当上面添加配置性能还是不行是,可以增加资源。

  • 添加cu
    一般对taskmanager添加cup,默认给1的整数倍,例如1,2,3等。jobmanager 基本不咋干活(业务数据处理),不用添加资源,之前给很少cup即可
  • 添加内存
    一般默认每个taskmanager给4G内存,后面再对它增加资源。jobmanager不用增加内存
  • 槽位(slot)
    每个 TaskManager Slot 数给1个( TaskManager 只能同时执行一个 Subtask),性能比较好(一般简单的job没有大量的回撤流的情况下)。
    A.如果开3个并行度,每个taskmanager1个槽位:1个槽位 乘 3个并行度 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
    B.如果开3个并行度,每个taskmanager3个槽位:1个槽位 乘 每个taskmanager分配的资源+job manager资源=job的总使用资源
  • 并行度
    在 TaskManager Slot 数给1个情况下(此方案性能比较好),增加并行度可以提升处理性能。但taskmanager资源(内存和cpu)也会成倍增加

*上面只是建议给taskmanager 1cup,4Gb内存起,原因现在很多平台大多是云虚拟资源,这样分配性能较好,同时也是养成良好习惯。

三、总结

不是所有job资源越堆越多好。有时作业的复杂或数据的特殊情况(外部系统性能除外,例如写数据库),增加资源只会让job性能越来越差或报错(亲身经历job性能差,特别痛苦,一直加资源性能还是差或运行报错)。需要不断找根源问题,多使用不同方法测试才能找到适合job的处理性能。

  • 如果优化很多次后job性能还是很差(资源给的很多性能还是不理想)(略增加一些资源)
    可以将一个job拆分两个job(将占用比较多的业务数据(50%更好)在新的job单独处理)
  • 性能优化一直无法提升,要么看业务要么看job的性能瓶颈业务(业务牺牲)
  • 要么flink只做业务写表,离线负责处理业务写其他表(时效牺牲)

  • 调优举例(真实案例,折腾了很久):
    背景:(flink 双流join) 默认资源配置(taskmanager 1cpu,4Bb内存,1个槽位,1个并行度)
    数据有堆积,且越堆越多,写入性能弱(每秒十几条写入),CP(checkpoint有时失败,但很大,生成很慢),业务处理简单,单日数量在1700万条数据。
  • 后面开始对此job加资源,加并行度,加各种优化配置,增加CP生成时间等等。
    开启job运行后生成CP一直失败(生成CP更大,之前200多兆,改后生成700-800兆还没有生成,生成变慢,生成时间变成)
    在这里插入图片描述
    即使加大CP生产时间和CP校验时间,CP依旧是失败。
    CP一直失败导致处理性能极差(CP在生成时整个job几乎都在停止),如下截图
    在这里插入图片描述
    后面是各种调优尝试都不能,发现问题是flink在双流join时,有大量的回撤流.如果撤回数据较多的话 , 就会造成这个节点的state大 从而导致SinkMaterializer节点压力大(自己结合UI监控图观察得到)。
  • 后面经过很多次调优将并行度改为3,每个 TaskManager Slot 数 给3个,其他不变,性能有提升,CP生成也变快了
    在这里插入图片描述
  • 又做调整将taskmanager 给3cup,内存给15Gb,开5个并行度,每个 TaskManager Slot 数 给5个。
    目的:将5个并行度放到一个槽位,资源也没有使用多少。
    测试后发现CP比上面3个并发的增量存储要大(意料之中),CP生成特别快,已将数据堆压十几个小时的数据全部追上。

在这里插入图片描述
*上面案例思想:
A.减少CP生成时间。flink才能快速处理数据(提交完已处理的偏移量数据,快速进行下一轮的新数据)。
B.在有回撤流,需要状态(自己观察在一个并发时CP较大几百兆,一般join情况出现的比较多)将多个并发尽量放到一个slot,减少数据传输和交换(一个槽位共享状态)。其他简单的job没有或很少回撤流的情况下可以只建一个槽位。
C.增加并行度会导致CP增大。原因之前一个线程一个CP,现在是多个线程有自己的状态(可能会有重复数据状态),多个状态合在一起CP就大了。
在这里插入图片描述

参考:文档1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/594691.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

4. HBuilderX中的插件商城

前言 在HBuilderX中有一个插件市场,这个和VSCode的插件库不太像,硬要做个简单类比的话,个人认为HBuilderX中的插件市场更像是npm库,它里面有许多其他开发者开发的插件,这些插件更多的是为uniapp服务的,比如…

第23章 微内核架构软件测试(下午题)

一、微内核架构概述 (一)概念 1、微内核架构 微内核:精简的内核 宏内核:中央集权控制中心 核心系统 能运行的最小模块插件模块 专业处理,额外特性的独立组件增加/扩展核心系统的业务逻辑能力连接方式 OSGI、消息机…

springAI框架学习总结

springAI 1.springAI基本介绍 springAI是一个AI工程应用框架,其目标是将 Spring 生态系统设计原则(例如可移植性和模块化设计)应用于 AI 领域,并推广使用 POJO 作为 AI 领域应用程序的构建块。 2.特性 灵活的AIP支持chat,text…

WPF之绑定属性值转换

1&#xff0c;使用Binding.Format属性简易设置绑定的属性数据显示格式。 <TextBox Grid.Row"2" Grid.Column"1"><TextBox.Text><Binding Path"UnitCost" StringFormat"{}{0:C3}" > …

[论文笔记]Longformer: The Long-Document Transformer

引言 今天带来论文Longformer: The Long-Document Transformer的笔记。 基于Transformer的模型由于其自注意力操作而无法处理长序列&#xff0c;该操作随着序列长度呈二次扩展。为了解决这一限制&#xff0c;本篇工作提出了Longformer&#xff0c;其注意力机制随着序列长度呈…

Edge扩展应用程序的上架流程

前言 在软件开发的生命周期中&#xff0c;发布流程是将产品推向市场并交付给用户的关键阶段。它不仅标志着一个项目从开发阶段到用户手中的转变&#xff0c;也是确保软件质量和用户体验的重要环节。那么一个清晰、高效且可重复的发布流程对于任何软件项目的成功至关重要&#…

CMakeLists.txt语法规则:提供信息的变量说明一

一. 简介 前面几篇文章学习了 CMakeLists.txt语法中 部分常用命令。 接下来学习CMakeLists.txt语法中部分常用变量&#xff0c;变量也是 cmake 中的一个重头戏&#xff0c;cmake 提供了很多内置变量。每一个变量都有它自己的含义&#xff0c;可以通过如下链接地址查询到所有…

环保设备在线监控系统

随着环保意识的日益提升&#xff0c;对环境污染的监控与管理成为了我们不可忽视的重要任务。在这个背景下&#xff0c;HiWoo Cloud平台凭借其强大的环保设备在线监控系统&#xff0c;为环保事业注入了新的活力&#xff0c;助力我们共同迈向绿色未来。 一、环保设备在线监控系统…

jsPDF + html2canvas + Vue3 + ts项目内,分页导出当前页面为PDF、A 页面内导出 B 页面的内容为PDF,隐藏导出按钮等多余元素

jsPDF html2canvas Vue3 ts Arco Design项目&#xff0c;分页导出当前页面为PDF、A 页面内导出 B 页面的内容为PDF&#xff0c;隐藏导出按钮等多余元素… 1.下载所需依赖 pnpm install --save html2canvaspnpm install --save jspdf引入依赖 <script setup lang"…

JavaScript —— APIs(五)

一、Window对象 1. BOM&#xff08;浏览器对象模型&#xff09; 2. 定时器-延时函数 ①、定义 ②、定时器比较 ③、【案例】 3. JS执行机制 4. location对象 注意&#xff1a;hash应用 不点击页面刷新号&#xff0c;点击刷新按钮也可以实现页面刷新 【案例】 5. navig…

WAF防火墙可以给您解决什么问题?哪些情况下使用WAF最适合?

一、什么是WAF&#xff1f; Web应用防护系统&#xff08;也称为&#xff1a;网站应用级入侵防御系统。英文&#xff1a;Web Application Firewall&#xff0c;简称&#xff1a;WAF&#xff09;。利用国际上公认的一种说法&#xff1a;Web应用防火墙是通过执行一系列针对HTTP/H…

Occlum原理解析及使用说明

目录 一、设计初衷 二、背景知识 1.什么是可信计算 2.什么是TEE 3.传统SGX SDK的问题 三、Occlum 1.如何使用 2.特点 3.如何使用 1.Docker部署 1 硬件支持 2 环境 3 拉取镜像创建虚机 4 简单验证 4.Occlum中gcc编译 1 交叉编译 2 初始化Occlum实例 3 Occlum构…

如何更好的使用cpm

nvidia发布了RAFT库&#xff0c;支持向量数据库的底层计算优化&#xff0c;RAFT 也使用CMake Package Manager( CPM )和rapids-cmake管理项目&#xff0c;可以方便快捷的下载到需要的对应版本的thirdparty的依赖库&#xff0c;但是&#xff0c;一般情况下&#xff0c;项目是直接…

数据库复习1

1.试述数据、数据库、数据库管理系统、数据库系统的概念 1.数据(Data): 数据是关于事物的符号表示或描述。它可以是任何事实、观察或者测量的结果&#xff0c;如数字、字符、声音、图像等。数据在没有上下文的情况下可能没有明确的意义。 2.数据库(Database): 数据库是一个持…

面试题:集合篇

说说 List, Set, Queue, Map 四者的区别&#xff1f; List(对付顺序的好帮手): 存储的元素是有序的、可重复的。Set(注重独一无二的性质): 存储的元素是无序的、不可重复的。Queue(实现排队功能的叫号机): 按特定的排队规则来确定先后顺序&#xff0c;存储的元素是有序的、可重…

传输层协议 TCP UDP协议 解析(二)

文章目录 UDP&#xff1a;用户数据报协议UDP报文格式TCP与UDP的区别 UDP&#xff1a;用户数据报协议 UDP是一种面向无连接的传输层协议&#xff08;数据一直发送&#xff0c;没有ack&#xff0c;所以不需要考虑ack&#xff09;&#xff0c;传输可靠性没有保证。 UDP不提供重传…

Rust 实战thiserror+自定义错误消息体

导航 一、背景二、实践1、导入thiserror2、自定义错误消息体&#xff08;1&#xff09;创建ErrMsg.rs和创建自定义结构体&#xff08;2&#xff09;lib.rs添加ErrMsg&#xff08;3&#xff09;main函数&#xff08;4&#xff09;完整代码 一、背景 开发中遇到需要通用、能够满…

Note-backbone预训练权重对模型收敛速度的影响和mmlab实验测试

简介 在训练一些复杂模型时候&#xff0c;通常会考虑读取backbone的预训练权重&#xff0c;这种方法有以下好处&#xff1a; 初始化网络参数&#xff1a;在深度学习模型训练过程中&#xff0c;通常需要随机初始化神经网络的参数。然而&#xff0c;如果采用Backbone预训练权重进…

拼多多不花钱推广能做起来吗

拼多多推广可以使用3an推客。3an推客&#xff08;CPS模式&#xff09;给商家提供的营销工具&#xff0c;由商家自主设置佣金比例&#xff0c;激励推广者去帮助商家推广商品链接&#xff0c;按最终有效交易金额支付佣金&#xff0c;不成交不扣费。是商家破零、积累基础销量的重要…

背景音乐广播系统解决方案

背景音乐广播系统解决方案18123651365 在公共广播背景音乐系统虽然是一个小小分支&#xff0c;但是却与人们的生活质量直接挂钩&#xff0c;如早晨时间&#xff0c;可以通过播放一些愉快的音乐&#xff0c;使得住宅居名、上班一族和晨运一族有一个愉快的心情&#xff0c;精神抖…
最新文章