数据处理服务在 Kubernetes 上基于 keda 的弹性伸缩实践

数据处理服务在 Kubernetes 上基于 keda 的弹性伸缩实践

背景

我们在阿里云托管的 Kubernetes 集群上部署了一套用于工商、司法等公开数据结构化处理的数据链路。该数据处理服务包含了几十个数据类型,每个数据类型作为一个 Deployment 部署,其主要功能是监听 Kafka 队列消息并做 ETL 处理后索引到 Elasticsearch。通常我们会将 Deployment 的副本数设置为其对应的 Kafka 队列的分区数量以保障数据处理的及时性。目前已经部署了 80 多个 Deployment,按照上述部署方式会占用大量的内存资源,但并不是所有的数据类型都会一直有新消息待处理,造成了极大的资源浪费。我们想通过 Kubernetes 的水平自动伸缩器(Horizontal Pod Autoscaler 以下简称 HPA)来根据 Kafka 队列积压的消息数量自动弹性伸缩 Deployment 副本数量,让持续没有新消息的 Deployment 副本数缩容到最小,并在有消息开始积压的时候自动扩容。

方案调研

Custom Metrics

Kubernetes 原生自带了基于 CPU/Memory 指标的 HPA,显然我们这个场景用不上。也支持基于自定义指标的 HPA,对我们这个场景似乎有些用处。

于是找了下阿里云上已有的 custom metrics adapter 发现了 alibaba-cloud-metrics-adapter 这个项目,它支持 Ingress、SLB、CMS、AHAS Sentinel 以及 ARMS Prometheus custom metrics,而我们在用的 Kubernetes 集群已经接入了 ARMS Prometheus 并且通过 kafka-exporter 收集了 Kafka 队列消息积压的指标数据,理论上可以使用。在尝试了 alibaba-cloud-metrics-adapter 之后感觉它真的不是很好用:

  1. 首先,文档写得不是很清楚,3.modify configuration 这个步骤都不知道在哪里更新这个配置 image.png
  2. 对于我们的场景,需要配置很多 Prometheus 的规则对应到每个 Kafka 队列的消息积压数量,很麻烦

于是放弃了使用 alibaba-cloud-metrics-adapter 这个方案,同时也否决了类似的 k8s-prometheus-adpater 项目因为也是要配置非常多的 Prometheus 规则,并且对于我们这些没有经常使用 Prometheus 查询的同学而言 Prometheus 的查询语法有些难掌握,难以维护。

keda

偶然在 GitHub Explore 里面发现了 keda 这个项目,它是由 Microsoft 和 Read Hat 合作开发的一个基于事件触发的 Kubernetes 自动伸缩器,目前是 CNCF sandbox 项目,已经发布到了 2.0 版本。看到它 Scalers 列表中支持 Apache Kafka 便尝试了一下,在 k8s 集群中安装好 keda 之后,对于想要做 HPA 的 Deployment 只需要关联一个 ScaledObject 自定义资源即可。

比如说我们有个 Deployment 叫 brm-index-basic,它运行的应用程序使用的 Kafka 队列 topic 叫 basic,消费者 consumer group 叫 basic,则可以定义如下 ScaledObject 实现 HPA

hpa.yaml

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
namespace: default
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: brm-index-basic
pollingInterval: 10
minReplicaCount: 1
maxReplicaCount: 20
triggers:
- type: kafka
metadata:
bootstrapServers: kafka-server:9092
consumerGroup: basic
topic: basic
lagThreshold: "100"
offsetResetPolicy: latest
  1. 其中,scaleTargetRef 定义了 HPA 要伸缩的对象这里即 brm-index-basic 这个 Deployment
  2. pollingInterval 定义了 keda-operator 刷新 triggers 这里即 Kafka 队列消息积压数量的频率,单位为秒,这个数字决定了 HPA 的灵敏度
  3. minReplicaCountmaxReplicaCount 分别定义了要伸缩的对象的最小和最大副本数量,minReplicaCount 可以为 0 即缩容到没有副本,比方说 Kafka 队列一直没有新消息就可以完全缩容,到有新消息进来的时候 keda 又会自动扩容,上述的 pollingInterval 就会影响这里从 0 到扩容的时间间隔,maxReplicaCount 我们会设置为 Kafka 队列 topic 的分区数量
  4. triggers 中则配置了 Kafka 集群的连接信息和关联的 topic/consumer group 名字以及消息积压阈值 lagThresholdlagThreshold 越小 HPA 伸缩时越激进,越大越保守
  5. offsetResetPolicy 主要是为 Kafka 消息队列还没有过消费者的时候设计的,一般设置成 latest 即可,具体可以参考官方文档:https://keda.sh/docs/2.0/scalers/apache-kafka/#new-consumers-and-offset-reset-policy

kubectl apply -f hpa.yaml 后即为 brm-index-basic Deployment 增加了基于 keda 的 HPA,通过 kubectl get hpa 可以查看 HPA 的信息:

$ kubectl get hpa
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE
keda-hpa-kafka-scaledobject-basic Deployment/brm-index-basic 39500m/100 (avg) 1 20 2 5d22h

这里需要注意的是 HPA TARGETS 里面的带 m 结尾的数字单位不是百万而是千分之一,即这里平均积压数量是 39.5,参考文档:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/#appendix-quantities

总结

最终我们给大部分数据索引部署都增加了基于 keda 的自动水平伸缩,通过监控 Kafka 队列未消费消息数量自动伸缩索引任务容器数量,没有索引消息的时候自动缩容到最少 0 个容器,大大降低日常 Kubernetes 集群 CPU/内存资源占用,同时也建立了在大量索引任务进来的时候快速扩容的能力。

Some rights reserved
Except where otherwise noted, content on this page is licensed under a Creative Commons Attribution-NonCommercial 4.0 International license.