How to run Apache Beam(Python) pipeline on top of Flink in kebernetes?
Introduction
My Data-Engineering team had a complex data pipline which was running on apache beam/dataflow for last three years without any traceback (miracely). it was bullet-proof kind off pipeline. Suddenly the piplines started failing surprisingly without any tracebacks :smiley: and without tracebacks hard to debug the root cause, whenever i redeploy the pipeline it started working fine for sometime and then failed time and again, Not getting any traceback is bad. yeah that’s strange though, I tried to upgrade the apache beam versions and many other hacks to solve the problem, nevertheless nothing worked out.
So finally i decided to migrate all exiting apache beam/dataflow pipelines to flink cluster in kubernetes. Just to getting the taste of water i installed minikube on local machine and play around with the cluster.
I have Macbook M2 machine though you can choose you desire operating system and move along steps are almost identicals.
Installation
To install the minikube you will have to install the docker on you local machine first, make sure you have minimum 8GB memory and 512GB HDD/SSD machine.
Here you go for the docker installation link
1$ brew install minikube
1$ minikube start
output should be like this
1π minikube v1.28.0 on Darwin 12.5 (arm64)
2β¨ Using the docker driver based on existing profile
3π Starting control plane node minikube in cluster minikube
4π Pulling base image ...
5π minikube 1.29.0 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.29.0
6π‘ To disable this notice, run: 'minikube config set WantUpdateNotification false'
7
8π Restarting existing docker container for "minikube" ...
9π³ Preparing Kubernetes v1.25.3 on Docker 20.10.20 ...
10π Verifying Kubernetes components...
11 βͺ Using image gcr.io/k8s-minikube/storage-provisioner:v5
12π Enabled addons: default-storageclass, storage-provisioner
13π Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default
install kubectl link
1kubectl get all
Deployment
Now below are the flink configuration files which will required to up and running the flink cluster on minikube.
flink-configuration-configmap.yaml
1apiVersion: v1
2kind: ConfigMap
3metadata:
4 name: flink-config
5 labels:
6 app: flink
7data:
8 flink-conf.yaml: |+
9 jobmanager.rpc.address: flink-jobmanager
10 taskmanager.numberOfTaskSlots: 2
11 blob.server.port: 6124
12 jobmanager.rpc.port: 6123
13 taskmanager.rpc.port: 6122
14 queryable-state.proxy.ports: 6125
15 jobmanager.memory.process.size: 1600m
16 taskmanager.memory.process.size: 1728m
17 parallelism.default: 2
18 log4j-console.properties: |+
19 # This affects logging for both user code and Flink
20 rootLogger.level = INFO
21 rootLogger.appenderRef.console.ref = ConsoleAppender
22 rootLogger.appenderRef.rolling.ref = RollingFileAppender
23 # Uncomment this if you want to _only_ change Flink's logging
24 #logger.flink.name = org.apache.flink
25 #logger.flink.level = INFO
26 # The following lines keep the log level of common libraries/connectors on
27 # log level INFO. The root logger does not override this. You have to manually
28 # change the log levels here.
29 logger.akka.name = akka
30 logger.akka.level = INFO
31 logger.kafka.name= org.apache.kafka
32 logger.kafka.level = INFO
33 logger.hadoop.name = org.apache.hadoop
34 logger.hadoop.level = INFO
35 logger.zookeeper.name = org.apache.zookeeper
36 logger.zookeeper.level = INFO
37 # Log all infos to the console
38 appender.console.name = ConsoleAppender
39 appender.console.type = CONSOLE
40 appender.console.layout.type = PatternLayout
41 appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
42 # Log all infos in the given rolling file
43 appender.rolling.name = RollingFileAppender
44 appender.rolling.type = RollingFile
45 appender.rolling.append = false
46 appender.rolling.fileName = ${sys:log.file}
47 appender.rolling.filePattern = ${sys:log.file}.%i
48 appender.rolling.layout.type = PatternLayout
49 appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
50 appender.rolling.policies.type = Policies
51 appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
52 appender.rolling.policies.size.size=100MB
53 appender.rolling.strategy.type = DefaultRolloverStrategy
54 appender.rolling.strategy.max = 10
55 # Suppress the irrelevant (wrong) warnings from the Netty channel handler
56 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
57 logger.netty.level = OFF
1apiVersion: v1
2kind: Service
3metadata:
4 name: flink-jobmanager
5spec:
6 type: ClusterIP
7 ports:
8 - name: rpc
9 port: 6123
10 - name: blob-server
11 port: 6124
12 - name: webui
13 port: 8081
14 selector:
15 app: flink
16 component: jobmanager
jobmanager-session-deployment.yaml
1apiVersion: apps/v1
2kind: Deployment
3metadata:
4 name: flink-jobmanager
5spec:
6 replicas: 1
7 selector:
8 matchLabels:
9 app: flink
10 component: jobmanager
11 template:
12 metadata:
13 labels:
14 app: flink
15 component: jobmanager
16 spec:
17 containers:
18 - name: jobmanager
19 image: flink:1.14.6
20 args: ["jobmanager"]
21 ports:
22 - containerPort: 6123
23 name: rpc
24 - containerPort: 6124
25 name: blob-server
26 - containerPort: 8081
27 name: webui
28 livenessProbe:
29 tcpSocket:
30 port: 6123
31 initialDelaySeconds: 30
32 periodSeconds: 60
33 volumeMounts:
34 - name: flink-config-volume
35 mountPath: /opt/flink/conf
36 securityContext:
37 runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
38 volumes:
39 - name: flink-config-volume
40 configMap:
41 name: flink-config
42 items:
43 - key: flink-conf.yaml
44 path: flink-conf.yaml
45 - key: log4j-console.properties
46 path: log4j-console.properties
task-manager-session-deployment.yaml
1apiVersion: apps/v1
2kind: Deployment
3metadata:
4 name: flink-taskmanager
5spec:
6 replicas: 2
7 selector:
8 matchLabels:
9 app: flink
10 component: taskmanager
11 template:
12 metadata:
13 labels:
14 app: flink
15 component: taskmanager
16 spec:
17 containers:
18 - name: taskmanager
19 image: flink:1.14.6
20 args: ["taskmanager"]
21 ports:
22 - containerPort: 6122
23 name: rpc
24 - containerPort: 6125
25 name: query-state
26 livenessProbe:
27 tcpSocket:
28 port: 6122
29 initialDelaySeconds: 30
30 periodSeconds: 60
31 volumeMounts:
32 - name: flink-config-volume
33 mountPath: /opt/flink/conf/
34 securityContext:
35 runAsUser: 9999
36 - name: beam-worker-pool
37 image: apache/beam_python3.10_sdk:2.43.0
38 args: ["--worker_pool"]
39 ports:
40 - containerPort: 50000
41 name: pool
42 livenessProbe:
43 tcpSocket:
44 port: 50000
45 initialDelaySeconds: 30
46 periodSeconds: 60
47 volumes:
48 - name: flink-config-volume
49 configMap:
50 name: flink-config
51 items:
52 - key: flink-conf.yaml
53 path: flink-conf.yaml
54 - key: log4j-console.properties
55 path: log4j-console.properties
Now let’s start the cluster
1# Configuration and service definition
2 $ kubectl create -f flink-configuration-configmap.yaml
3 $ kubectl create -f jobmanager-service.yaml
4 # Create the deployments for the cluster
5 $ kubectl create -f jobmanager-session-deployment.yaml
6 $ kubectl create -f taskmanager-session-deployment.yaml
Now check if all the containers are up and running by using
kubectl get all command
Gotcha : If containers are keep failing or restarting then decrease the number of replicas to 1 in task-manager-session-deployment.yaml
The beam-pool-worker is a sidecar container which will be running with along with flink container and make sure beam is running on it. When we submit beam job in it.
Now it’s time to expose the flink master so that we can check the running job, resource and logs on flink UI.
1kubectl get all
2kubectl port-forward <flink_jobmanager_container> 8081:8081
Check if master is up and running https://localhost:8081/
1import apache_beam as beam
2from apache_beam.io import ReadFromText
3from apache_beam.io import WriteToText
4from apache_beam.options.pipeline_options import PipelineOptions
5import apache_beam.transforms.window as window
6
7def format_result(word, count):
8 return '%s: %d' % (word, count)
9
10def run():
11 options = PipelineOptions([
12 "--runner=FlinkRunner",
13 "--flink_version=1.14",
14 "--flink_master=localhost:8081",
15 "--environment_type=EXTERNAL",
16 "--environment_config=localhost:50000"
17 ])
18
19 with beam.Pipeline(options=options) as p:
20 (p
21 | 'Create words' >> beam.Create(['Beam on top of flink makes life easier'])
22 | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
23 | 'PairWithOne' >> beam.Map(lambda words: (words, 1))
24 | 'GroupAndSum' >> beam.CombinePerKey(sum)
25 | 'Format' >> beam.MapTuple(format_result)
26 | 'Write to file' >> WriteToText('test.txt')
27 )
28
29if __name__ == "__main__":
30 run()
Now its time to run the beam pipline python job.
1python <beam_pipeline.py>
Now you should be able to see the submitted job on flink UI.
Errors
if you get below errors like:
1RuntimeError: Service failed to start up with error 1
2job failed during the initilization of job manager
3TaskManager with id 172.17.0.3:6122-7bf30b is no longer reachable.
you have to be very carful with the beam, flink and python versions.
Install python3.10 on you local machine (I have used)
Flink supporting java8 for now so install the same on your local machine
Flink 1.14.6 is the most recent version beam is supporting
Do pip install apache-beam==2.43.0 on you local machine
You should be able to run the beam job on flink successfully.
Tear Down the Server
1$ kubectl delete -f jobmanager-service.yaml
2$ kubectl delete -f flink-configuration-configmap.yaml
3$ kubectl delete -f taskmanager-session-deployment.yaml
4$ kubectl delete -f jobmanager-session-deployment.yaml