How to run Apache Beam(Python) pipeline on top of Flink in kebernetes?

6 minute read

Introduction

My Data-Engineering team had a complex data pipline which was running on apache beam/dataflow for last three years without any traceback (miracely). it was bullet-proof kind off pipeline. Suddenly the piplines started failing surprisingly without any tracebacks :smiley: and without tracebacks hard to debug the root cause, whenever i redeploy the pipeline it started working fine for sometime and then failed time and again, Not getting any traceback is bad. yeah that’s strange though, I tried to upgrade the apache beam versions and many other hacks to solve the problem, nevertheless nothing worked out.

So finally i decided to migrate all exiting apache beam/dataflow pipelines to flink cluster in kubernetes. Just to getting the taste of water i installed minikube on local machine and play around with the cluster.

I have Macbook M2 machine though you can choose you desire operating system and move along steps are almost identicals.

Installation

To install the minikube you will have to install the docker on you local machine first, make sure you have minimum 8GB memory and 512GB HDD/SSD machine.

Here you go for the docker installation link

1$ brew install minikube
1$ minikube start

output should be like this

 1πŸ˜„  minikube v1.28.0 on Darwin 12.5 (arm64)
 2✨  Using the docker driver based on existing profile
 3πŸ‘  Starting control plane node minikube in cluster minikube
 4🚜  Pulling base image ...
 5πŸŽ‰  minikube 1.29.0 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.29.0
 6πŸ’‘  To disable this notice, run: 'minikube config set WantUpdateNotification false'
 7
 8πŸ”„  Restarting existing docker container for "minikube" ...
 9🐳  Preparing Kubernetes v1.25.3 on Docker 20.10.20 ...
10πŸ”Ž  Verifying Kubernetes components...
11    β–ͺ Using image gcr.io/k8s-minikube/storage-provisioner:v5
12🌟  Enabled addons: default-storageclass, storage-provisioner
13πŸ„  Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default

install kubectl link

1kubectl get all

Deployment

Now below are the flink configuration files which will required to up and running the flink cluster on minikube.

flink-configuration-configmap.yaml

 1apiVersion: v1
 2kind: ConfigMap
 3metadata:
 4  name: flink-config
 5  labels:
 6    app: flink
 7data:
 8  flink-conf.yaml: |+
 9    jobmanager.rpc.address: flink-jobmanager
10    taskmanager.numberOfTaskSlots: 2
11    blob.server.port: 6124
12    jobmanager.rpc.port: 6123
13    taskmanager.rpc.port: 6122
14    queryable-state.proxy.ports: 6125
15    jobmanager.memory.process.size: 1600m
16    taskmanager.memory.process.size: 1728m
17    parallelism.default: 2    
18  log4j-console.properties: |+
19    # This affects logging for both user code and Flink
20    rootLogger.level = INFO
21    rootLogger.appenderRef.console.ref = ConsoleAppender
22    rootLogger.appenderRef.rolling.ref = RollingFileAppender
23    # Uncomment this if you want to _only_ change Flink's logging
24    #logger.flink.name = org.apache.flink
25    #logger.flink.level = INFO
26    # The following lines keep the log level of common libraries/connectors on
27    # log level INFO. The root logger does not override this. You have to manually
28    # change the log levels here.
29    logger.akka.name = akka
30    logger.akka.level = INFO
31    logger.kafka.name= org.apache.kafka
32    logger.kafka.level = INFO
33    logger.hadoop.name = org.apache.hadoop
34    logger.hadoop.level = INFO
35    logger.zookeeper.name = org.apache.zookeeper
36    logger.zookeeper.level = INFO
37    # Log all infos to the console
38    appender.console.name = ConsoleAppender
39    appender.console.type = CONSOLE
40    appender.console.layout.type = PatternLayout
41    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
42    # Log all infos in the given rolling file
43    appender.rolling.name = RollingFileAppender
44    appender.rolling.type = RollingFile
45    appender.rolling.append = false
46    appender.rolling.fileName = ${sys:log.file}
47    appender.rolling.filePattern = ${sys:log.file}.%i
48    appender.rolling.layout.type = PatternLayout
49    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
50    appender.rolling.policies.type = Policies
51    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
52    appender.rolling.policies.size.size=100MB
53    appender.rolling.strategy.type = DefaultRolloverStrategy
54    appender.rolling.strategy.max = 10
55    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
56    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
57    logger.netty.level = OFF    

jobmanager-service.yaml

 1apiVersion: v1
 2kind: Service
 3metadata:
 4  name: flink-jobmanager
 5spec:
 6  type: ClusterIP
 7  ports:
 8  - name: rpc
 9    port: 6123
10  - name: blob-server
11    port: 6124
12  - name: webui
13    port: 8081
14  selector:
15    app: flink
16    component: jobmanager

jobmanager-session-deployment.yaml

 1apiVersion: apps/v1
 2kind: Deployment
 3metadata:
 4  name: flink-jobmanager
 5spec:
 6  replicas: 1
 7  selector:
 8    matchLabels:
 9      app: flink
10      component: jobmanager
11  template:
12    metadata:
13      labels:
14        app: flink
15        component: jobmanager
16    spec:
17      containers:
18      - name: jobmanager
19        image: flink:1.14.6
20        args: ["jobmanager"]
21        ports:
22        - containerPort: 6123
23          name: rpc
24        - containerPort: 6124
25          name: blob-server
26        - containerPort: 8081
27          name: webui
28        livenessProbe:
29          tcpSocket:
30            port: 6123
31          initialDelaySeconds: 30
32          periodSeconds: 60
33        volumeMounts:
34        - name: flink-config-volume
35          mountPath: /opt/flink/conf
36        securityContext:
37          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
38      volumes:
39      - name: flink-config-volume
40        configMap:
41          name: flink-config
42          items:
43          - key: flink-conf.yaml
44            path: flink-conf.yaml
45          - key: log4j-console.properties
46            path: log4j-console.properties

task-manager-session-deployment.yaml

 1apiVersion: apps/v1
 2kind: Deployment
 3metadata:
 4  name: flink-taskmanager
 5spec:
 6  replicas: 2
 7  selector:
 8    matchLabels:
 9      app: flink
10      component: taskmanager
11  template:
12    metadata:
13      labels:
14        app: flink
15        component: taskmanager
16    spec:
17      containers:
18      - name: taskmanager
19        image: flink:1.14.6
20        args: ["taskmanager"]
21        ports:
22        - containerPort: 6122
23          name: rpc
24        - containerPort: 6125
25          name: query-state
26        livenessProbe:
27          tcpSocket:
28            port: 6122
29          initialDelaySeconds: 30
30          periodSeconds: 60
31        volumeMounts:
32        - name: flink-config-volume
33          mountPath: /opt/flink/conf/
34        securityContext:
35          runAsUser: 9999 
36      - name: beam-worker-pool
37        image: apache/beam_python3.10_sdk:2.43.0
38        args: ["--worker_pool"]
39        ports:
40        - containerPort: 50000
41          name: pool
42        livenessProbe:
43          tcpSocket:
44            port: 50000
45          initialDelaySeconds: 30
46          periodSeconds: 60
47      volumes:
48      - name: flink-config-volume
49        configMap:
50          name: flink-config
51          items:
52          - key: flink-conf.yaml
53            path: flink-conf.yaml
54          - key: log4j-console.properties
55            path: log4j-console.properties

Now let’s start the cluster

1# Configuration and service definition
2    $ kubectl create -f flink-configuration-configmap.yaml
3    $ kubectl create -f jobmanager-service.yaml
4    # Create the deployments for the cluster
5    $ kubectl create -f jobmanager-session-deployment.yaml
6    $ kubectl create -f taskmanager-session-deployment.yaml

Now check if all the containers are up and running by using kubectl get all command

Gotcha : If containers are keep failing or restarting then decrease the number of replicas to 1 in task-manager-session-deployment.yaml

The beam-pool-worker is a sidecar container which will be running with along with flink container and make sure beam is running on it. When we submit beam job in it.

Now it’s time to expose the flink master so that we can check the running job, resource and logs on flink UI.

1kubectl get all
2kubectl port-forward <flink_jobmanager_container> 8081:8081

Check if master is up and running https://localhost:8081/

 1import apache_beam as beam
 2from apache_beam.io import ReadFromText
 3from apache_beam.io import WriteToText
 4from apache_beam.options.pipeline_options import PipelineOptions
 5import apache_beam.transforms.window as window
 6
 7def format_result(word, count):
 8    return '%s: %d' % (word, count)
 9
10def run():
11    options = PipelineOptions([
12        "--runner=FlinkRunner",
13        "--flink_version=1.14",
14        "--flink_master=localhost:8081",
15        "--environment_type=EXTERNAL",
16        "--environment_config=localhost:50000"
17    ])
18
19    with beam.Pipeline(options=options) as p:
20        (p
21            | 'Create words' >> beam.Create(['Beam on top of flink makes life easier'])
22            | 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
23            | 'PairWithOne' >> beam.Map(lambda words: (words, 1))
24            | 'GroupAndSum' >> beam.CombinePerKey(sum)
25            | 'Format' >> beam.MapTuple(format_result)
26            | 'Write to file' >> WriteToText('test.txt')
27        )
28
29if __name__ == "__main__":
30    run()

Now its time to run the beam pipline python job.

1python <beam_pipeline.py>

Now you should be able to see the submitted job on flink UI.

Errors

if you get below errors like:

1RuntimeError: Service failed to start up with error 1
2job failed during the initilization of job manager
3TaskManager with id 172.17.0.3:6122-7bf30b is no longer reachable.

you have to be very carful with the beam, flink and python versions.

Install python3.10 on you local machine (I have used)

Flink supporting java8 for now so install the same on your local machine

Flink 1.14.6 is the most recent version beam is supporting

Do pip install apache-beam==2.43.0 on you local machine

You should be able to run the beam job on flink successfully.

Tear Down the Server

1$ kubectl delete -f jobmanager-service.yaml
2$ kubectl delete -f flink-configuration-configmap.yaml
3$ kubectl delete -f taskmanager-session-deployment.yaml
4$ kubectl delete -f jobmanager-session-deployment.yaml
comments powered by Disqus