Kettle Beam update 0.5.0

Dear Kettle friends,

As you may or may not know integration with Apache Beam has been slowly but steadily moving along. Apache Beam in short:

Implement batch and streaming data processing jobs that run on any execution engine.

quoting beam.apache.org

So for a tool like Kettle this is something very enticing. If we can make a Kettle transformation run on one of the execution engines, we can run it on all of them. This is what the Kettle Beam project is all about. For an overview of this, check out this presentation I did on the subject:

http://beam.kettle.be

And the video of the presentation at PLUG a few weeks ago at Skills Matter:

I’m happy to say that with the current release of version 0.5.0 of the Kettle Beam project we’re now supporting 4 very interesting engines on Apache Beam version 2.10.0:

If you want to get started trying out Beam I want you to visit my friend Diethard Steiner’s blog post on the subject.

The new thing in 0.5.0 is the addition of Apache Flink as an execution engine. Flink in short (Quote from the website):

Stateful Computations over Data Streams

quoting flink.apache.org

Trying out Kettle on Apache Flink is surprisingly easy. You can actually just run on Flink in Spoon using the server shortcut [local] in a Beam Configuration. This allows you to test your transformation running locally just like the Direct Runner.

If you want to run it on a real Flink server we need to get an instance of Flink running. This is really easy as well. Download a recent version, we picked the last version 1.7.2. Any 1.5/1.6/1.7 version should work with Kettle Beam 0.5.0.
Follow the instructions to set up a local cluster. (basically unzip and start-cluster.sh, very similar to Kettle: I approve!)
Browse to the configured server on port 8081:

This is a good sign that you did something right. Please note that if you want to run with higher parallelism with more tasks, consider giving the Flink job manager and/or task manager a bit more memory. I changed conf/flink-conf.yaml in the following ways:

jobmanager.heap.size: 1g
taskmanager.heap.size: 2g
taskmanager.numberOfTaskSlots: 6
parallelism.default: 6

I also created an input/, tmp/ and output/ folder somewhere matching the parameters you set in the “Flink Server” job configuration and put the sample data in there from the kettle-beam-examples project.
You can load up these examples and set up a new Beam Job Configuration:

Then you can run one of the examples, let’s take the toughest challenge: complex.ktr


Now we’re going to do a few one-off things:

  • Create a “fat” jar which contains all libraries in the Kettle Beam ecosystem. Use the Beam menu: “Generate a Spark/Flink fat jar“. The file is ending up in /tmp/kettle-beam-fat.jar (sorry, it’s hardcoded for now, changes are coming in this area)
  • Export all your objects from the metastore(s) you have active in Spoon. Use the Spoon Beam menu: “Export current metastore“. The file will end up in /tmp/metastore.json
  • Copy these 2 files to the machine where you’re running Flink. (remix in my case)
  • Create a startup file on the server. Use this as a base: https://github.com/mattcasters/kettle-beam/blob/master/flink-notes-matt.txt

Now we’re ready to execute a transformation. We can copy our transformation(s) to the server and then run our script:

kettle@remix:~/flink-1.7.2$ sh run-kettle-flink.sh ../metadata/complex.ktr  
Starting execution of program
Starting clustered transformation execution on environment: 'Apache Flink'
Transformation ktr / args[0] : ../metadata/complex.ktr
MetaStore JSON     / args[1] : file:///home/kettle/metadata/metastore.json
Beam Job Config    / args[2] : Flink server
Step plugins       / args[3] : org.kettle.beam.steps.io.BeamInputMeta,org.kettle.beam.steps.bq.BeamBQOutputMeta,org.kettle.beam.steps.pubsub.BeamPublishMeta,org.kettle.beam.
steps.pubsub.BeamSubscribeMeta,org.kettle.beam.steps.window.BeamTimestampMeta,org.kettle.beam.steps.io.BeamOutputMeta,org.kettle.beam.steps.window.BeamWindowMeta,org.kettle.
beam.steps.bq.BeamBQInputMeta
XP plugins         / args[4] : org.kettle.beam.xp.RunBeamTransExecutionPoint
>>>>>> Initializing Kettle runtime (8 step classes, 1 XP classes)
>>>>>> Loading transformation metadata
>>>>>> Loading Kettle Beam Job Config 'Flink server'
>>>>>> HADOOP_CONF_DIR='null'
>>>>>> Building Apache Beam Kettle Pipeline...
>>>>>> Found Beam Input step plugin class loader
>>>>>> Found Beam Input step plugin class loader
>>>>>> Pipeline executing starting...
Warning: couldn't find plugins folder: ./plugins/kettle-beam
2019/02/24 20:14:28 - General - Created pipeline job with name 'complex'
2019/02/24 20:14:29 - General - Handled step (INPUT) : State data
2019/02/24 20:14:29 - General - Handled step (INPUT) : Customer data
2019/02/24 20:14:29 - General - Handled step (STEP) : uppercase state, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Handled Merge Join (STEP) : Merge join
2019/02/24 20:14:29 - General - Handled Group By (STEP) : countPerState, gets data from 1 previous step(s)
2019/02/24 20:14:29 - General - Handled step (STEP) : SLU nrPerState, gets data from 1 previous step(s), targets=0, infos=1
2019/02/24 20:14:29 - General - Handled step (STEP) : name<n, gets data from 1 previous step(s), targets=2, infos=0
2019/02/24 20:14:29 - General - Step Label : N-Z reading from previous step targeting this one using : name<n - TARGET - Label : N-Z
2019/02/24 20:14:29 - General - Handled step (STEP) : Label : N-Z, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Step Label : A-M reading from previous step targeting this one using : name<n - TARGET - Label : A-M
2019/02/24 20:14:29 - General - Handled step (STEP) : Label : A-M, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Handled step (STEP) : Switch / case, gets data from 2 previous step(s), targets=4, infos=0
2019/02/24 20:14:29 - General - Step Default reading from previous step targeting this one using : Switch / case - TARGET - Default
2019/02/24 20:14:29 - General - Handled step (STEP) : Default, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Step NY reading from previous step targeting this one using : Switch / case - TARGET - NY
2019/02/24 20:14:29 - General - Handled step (STEP) : NY, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Step FL reading from previous step targeting this one using : Switch / case - TARGET - FL
2019/02/24 20:14:29 - General - Handled step (STEP) : FL, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Step CA reading from previous step targeting this one using : Switch / case - TARGET - CA
2019/02/24 20:14:29 - General - Handled step (STEP) : CA, gets data from 1 previous step(s), targets=0, infos=0
2019/02/24 20:14:29 - General - Handled step (STEP) : Collect, gets data from 4 previous step(s), targets=0, infos=0
2019/02/24 20:14:30 - General - Handled step (OUTPUT) : complex, gets data from Collect
...

What this does is the following:

  • Convert the Kettle metadata into an Apache Beam pipeline
  • Execute the pipeline using the Apache Flink Runner.
  • This in turn converts the pipeline into Flink specific code.

We can now follow what’s going on on the Flink console:

Finally, when the job is done you can check out the output folder (/home/kettle/output in our case). You can use the “wc -l” command to check the number of records in the output:

kettle@remix:~/output$ ls -lrta 
total 131844
drwxr-xr-x 15 kettle kettle     4096 Feb 24 12:29 ..
-rw-rw-r--  1 kettle kettle 11386532 Feb 24 15:12 complex-00005-of-00006.csv
-rw-rw-r--  1 kettle kettle 15583507 Feb 24 15:12 complex-00002-of-00006.csv
-rw-rw-r--  1 kettle kettle 21058047 Feb 24 15:13 complex-00004-of-00006.csv
-rw-rw-r--  1 kettle kettle 20470419 Feb 24 15:13 complex-00001-of-00006.csv
-rw-rw-r--  1 kettle kettle 29839895 Feb 24 15:13 complex-00003-of-00006.csv
-rw-rw-r--  1 kettle kettle 36649557 Feb 24 15:13 complex-00000-of-00006.csv
drwxrwxr-x  2 kettle kettle     4096 Feb 24 15:13 .
kettle@remix:~/output$ wc -l *
  271249 complex-00000-of-00006.csv
  152837 complex-00001-of-00006.csv
  118266 complex-00002-of-00006.csv
  219854 complex-00003-of-00006.csv
  153025 complex-00004-of-00006.csv
   84769 complex-00005-of-00006.csv
 1000000 total

Well there you go. Once you have this set up, you can just copy the transformation metadata to your Flink server and simply run it without any further action required. All of a sudden we now have a way of visually developing complex transformation and ETL logic and then simply running it on DataFlow/Spark/Flink.
Obviously this last manual process is something I plan to get rid of so you can directly run on a Flink/Spark server from within Spoon, just like you can with DataFlow.
I really hope you get some idea of what’s involved and I hope you get a chance to experience how cool it is to use latest greatest execution engines on the planet all thanks to the wonderful Apache Beam developers, most certainly one of the coolest projects of the moment.

Cheers,
Matt

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.