Publishing Committed Rdb Transactions to Kafka With JCC LogMiner Loader V3.6

Introduction

JCC LogMiner Loader V3.6 (released February 2019) supports replicating Rdb committed transactions to an Apache Kafka server with the separately licensable Kafka Option. The JCC LogMiner Loader Kafka Option supports publishing rows in three formats, XML, JSON, and Avro (using Confluent Schema Registration). Once a message has been published to a Kafka server, it can be consumed by a variety of clients and by more than one client simultaneously. With Kafka as a JCC LogMiner Loader target, Rdb transactional data can be integrated into a data lake and/or evaluated with streaming analytical methods using a wide variety of existing software and tools.

In this example, we focus on publishing committed transactions to Kafka, and mostly ignore the Kafka Consumer(s):

Rdb -> JCC LogMiner Loader -> Kafka -> <Kafka Consumer(s)> -> <ultimate target>

This blog includes examples of publishing data in JSON and Avro formats.

Before discussing details, we will briefly discuss each of those pieces of the architecture.

The JCC LogMiner Loader Kafka Option documentation provides additional details on the capabilities and how to use them.

What is JCC LogMiner Loader?

JCC LogMiner Loader uses the Oracle Rdb Continuous LogMiner (RMU/Unload/After/Continuous) to extract committed changes from the Rdb After Image Journal (transaction log) and replicate those changes to a variety of targets. With the JCC LogMiner Loader V3.6 release, the following targets are supported:

  • Oracle Rdb
  • Oracle OCI
  • JDBC drivers
  • Tuxedo
  • Files
  • API
  • Kafka

The JCC LogMiner supports a variety of methods for transforming the data it replicates. These methods are all available for transforming the data before it is published to Kafka.

This blog uses JCC LogMiner Loader V3.6.0 that adds support for Kafka on IA64. Note that Kafka requires Java 1.8 be installed on OpenVMS IA64:

$ java -version
java version "1.8.0.11-OpenVMS"
Java(TM) SE Runtime Environment (build 1.8.0.11-vms-b1)
Java HotSpot(TM) 64-Bit Server VM (build 25.11-b1, mixed mode)
$

For the source database, we are using Oracle Rdb V7.3-300 on OpenVMS IA64 V8.4-2L1.

What is Apache Kafka?

Kafka is an open-source message broker that uses a Publish – Subscribe (Pub-Sub) model. Software (in this case the JCC LogMiner Loader) publishes messages to a Kafka server. Those messages can be consumed by one or more subscribers. Messages are published to one or more topics. Topics can be created in advance or created the first time a message is published to that topic.

By default, the same message can be consumed by multiple subscribers. It is possible to configure a Kafka topic so that each message can be consumed only once, but this example does not do that.

There are a number of Kafka distributions, including:

The Confluent Kafka distributions support Schema Registration and Avro.

A Kafka installation can be configured as a cluster using Zookeeper to coordinate the nodes participating in the cluster. The details of possible Kafka configurations are beyond the scope of this blog.

Messages can be published to Kafka servers either in plaintext or encrypted using TLS.

In these examples, we are using the following configurations:

Message
Format
Kafka Distribution and Version Encryption OS and Version Number
of Nodes
JSON Apache Kafka V1.1.0 Plaintext Centos V7.6 1
Avro Confluent V4.1.2
(Kafka V1.1.1-cp1)
SSL Centos V7.6 3

 

JCC LogMiner Loader Configuration

Two steps are needed to configure the JCC LogMiner Loader to work with Kafka:

  • Copy the Apache Kafka Jar files to OpenVMS
  • Specify JCC LogMiner Loader configuration parameters

Copying the Kafka Jar files to OpenVMS

Kafka is built with Java, with all of the routines packaged in Java Jar files. These Jar files are not included in the JCC LogMiner Loader installation kit. Instead, the "Kafka Option" documentation describes how to migrate the appropriate Jar files from the Kafka server installation to OpenVMS.

If you need assistance in migrating the appropriate Kafka Jar files to OpenVMS, please contact us.

Apache Kafka on Linux

For replicating Rdb committed transactions in JSON or XML format, the libraries from an Apache Kafka distribution are sufficient. By default, the Java libraries from an Apache Kafka distribution installed on an Linux system will be in the folder:

  • /opt/kafka/libs

The files in this folders can be copied (using FTP or some other file transfer utility) to an appropriate directory tree on OpenVMS. Note that these files must be transferred using binary mode.

When Java Jar files are transferred to OpenVMS, the file characteristics must be reset with a set file command:

$ set file/attr=(rfm:stmlf,rat:cr,lrl:0,mr:0) -
    DISK$STATIC:[kafka.1-1-0.libs]*.jar

In this example, the Java libraries from an Apache Kafka 1.1.0 installation have been transferred to the OpenVMS directory DISK$STATIC:[kafka.1-1-0.libs].

Confluent Kafka on Linux

The Avro Kafka Option takes advantage of the Confluent Avro Schema Registration and Serialization and De-serialization routines that are available only with the Confluent Kafka distribution. On a Confluent Open Source Kafka installation on a Linux server, the required libraries are in the following folders:

  • /usr/share/java/confluent-common
  • /usr/share/java/kafka
  • /usr/share/java/kafka-serde-tools

The files in these folders can be copied (using FTP or some other file transfer utility) to an appropriate directory tree on OpenVMS. Note that these files must be transferred using binary mode.

The associated Confluent license files are in the following folders:

  • /usr/share/doc/confluent-common
  • /usr/share/doc/kafka
  • /usr/share/doc/kafka-serde-tools

While the license files are not strictly necessary it is good practice to keep them with the binary files. The license files must be transferred using text mode.

When Java Jar files are transferred to OpenVMS, the file characteristics must be reset with a set file command:

$ set file/attr=(rfm:stmlf,rat:cr,lrl:0,mr:0) -
    DISK$STATIC:[kafka.CONFLUENT-4-1-2...]*.jar

In this example, the Java libraries from a Confluent 4.1.2 installation have been transferred to sub-directories in the OpenVMS directory DISK$STATIC:[kafka.CONFLUENT-4-1-2].

JCC LogMiner Loader Configuration Parameters

The JCC LogMiner Loader Configuration files have several distinct sections that describe the:

  • Target
  • Data received from the RMU/Unload/After command
  • Data sent to the target
  • Filters and transformations

Only the target directives are different between any targets so the following examples focus only on the differences specific to Kafka..

Publishing as JSON

 The following example publishes data in JSON format to a Kafka V1.1.0 server running on the Centos 7 node Kafka01. This Kafka server accepts plaintext (as apposed to SSL) connections on port 9092:

!
! Define the Kafka JSON Specific stuff
!
!
! routine to expand wildcards does not work on unix-style names.!
output~kafka~synch~kafka01:9092~record~json
!
kafka~classpath~DISK$STATIC:[KAFKA.1-1-0.LIBS]*.jar
!
JSON~NULL~EXPLICIT
JSON~SCHEMA~RegTest
!
kafka~Topic~'RegTest.',Table_Name
kafka~Model~ExactlyOnce

 The Kafka Java libraries are located in the OpenVMS directory DISK$STATIC:[KAFKA.1-1-0.LIBS].

 JSON~NULL~EXPLICIT causes source columns that are Null to be explicitly set to Null in the JSON message:

{...
  "ADDRESS_DATA_2": null,
 ...}

With the default NULL~IMPLICIT, Null columns are not included in the output JSON message. This example uses explicit Nulls to meet the requirements for a particular Kafka consumer.

JSON~SCHEMA~RegTest causes a line to be added to the JSON message preamble listing RegTest as the schema_name.

kafka~Topic~'RegTest.',Table_Name causes messages to be posted to Kafka topics where the topic name is made up of "RegTest." concatenated with the name out of the output table.  For example, updates for the table DETAILS will be published to a topic named "RegTest.DETAILS" while updates for the table PEOPLE will be published to topic "RegTest.PEOPLE". The kafka~topic options support a rich variety of topic naming.

kafka~Model supports two options; EactlyOnce and Transaction. The correct choice depends on what the Kafka consumer is expecting. If the Consumer can handle potentially receiving multiple copies of a particular message, Transaction provides slightly better performance.

 JSON Example

The following example is a JSON document from one of the JCC LogMiner Loader regression tests. This example is formatted for human readability.  There is some duplication between columns in the preamble and columns in the actual record, "action" and "JCCLML_ACTION" for example.

{
  "row": {
    "source_name": "KAFKA_JSON",
    "schema_name": "RegTest",
    "name": "DETAILS",
    "action": "M",
    "loader_sequence_number": 300490,
    "column_values": {
      "DETAILS_ID": 593600,
      "PROGNAME": "DETAILS",
      "PROGINDEX": "009",
      "SEQNUM": 1,
      "AMOUNT_F": 6.659906769e+01,
      "AMOUNT_G": 6.6599067687988281e+01,
      "AMOUNT_N": 66.00,
      "DATE_VMS": "2082-04-17 14:53:15.8301326",
      "DATE_ANSI": "2002-05-17 00:00:00.0000000",
      "TIMESTAMP_A": "2082-05-04 16:55:17.3000000",
      "TIMESTAMP_A_CHAR": "2082-05-04 16:55:17.30",
      "UUID": "9771E9427E5BE9118A8AD89D67F5352A",
      "JCCLML_ACTION": "M",
      "LOADER_SEQUENCE_NUMBER": 300490,
      "LOADERNAME": "KAFKA_JSON",
      "LOADER_VERSION": "V03.06.00",
      "LOADER_LINK_DATE_TIME": "2019-02-21 13:29:36.5800000",
      "TRANSACTION_COMMIT_TIME": "2019-04-10 10:27:21.6086346",
      "TRANSACTION_START_TIME": "2019-04-10 10:27:21.5836346",
      "JCCLML_READ_TIME": "2019-04-10 10:49:02.7093458",
      "JCCLML_AERCP": "1-28-8580-2975846-504123690-504123690",
      "TRANSMISSION_DATE_TIME": "2019-04-10 10:49:06.4332738",
      "ORIGINATING_DBKEY": 29273399457087501
    }
  }
}

Many of these columns were added using the JCC LogMiner Loader virtual column or MapResult capabilities.

The column UUID was added with the MapResult directive:

MapResult~details~UUID~cast(translate(sys_guid() using rdb$hex) as char(32) character set -11)

The Rdb function sys_guid generates a 128-bit UUID. The translate and cast combination converts the 128-bit UUID into a 32 character hex number.

By default, Rdb data-time datatypes are stored with seven digits of precision in the fractional seconds. While Rdb data-time operations default to two digits of precision in the fractional seconds, it is possible for applications to generate more precision so the JCC LogMiner Loader defaults to the maximum possible precision with the underlying OpenVMS Date-time datatype. The column TIMESTAMP_A_CHAR was added with the MapResult directive:

MapResult~details~timestamp_a_char~cast(cast(timestamp_a as timestamp(2)) as char(22))

Because of the sequence of cast statements, the resulting value has two digits precision in the fractional seconds.

Publishing as Avro

 Publishing Kafka messages as Avro has two (at least) advantages:

  1. The record structure of the messages are registered using the Confluent Schema Registration capabilities. This allows the Kafka consumer to read the record structure and parse the messages based on that structure. If a message changes (a new column is added), the Kafka consumer can read the new structure.
  2. The Avro messages are compressed using the Confluent Avro serializer and deserializer routines (SerDe).

Because of the Confluent Schema Registration service, Avro requires the Java libraries from the Confluent Kafka distribution.

The following example publishes to a three-node Confluent Kafka cluster (nodes cnflnt4-02, cnflnt4-03, and cnflnt4-04) using an SSL (TLS) connection on port 9093.

The Avro SchemaRegistry is using an HTTP connection on node cnflnt4-02, port 8081.

!
! Define the Kafka Avro Specific stuff
!
output~kafka~synch~connect~record~Avro
!
kafka~connect~cnflnt4-04:9093,cnflnt4-03:9093,cnflnt4-02:9093
kafka~avro~SchemaRegistry~http://cnflnt4-02:8081
!
kafka~avro~namespace~RegTest.avro
!
! routine to expand wildcards does not work on unix-style names.
!
kafka~classpath~disk$static:[kafka.CONFLUENT-4-1-2.kafka-serde-tools]*.jar
kafka~classpath~disk$static:[kafka.CONFLUENT-4-1-2.confluent-common]*.jar
kafka~classpath~disk$static:[kafka.CONFLUENT-4-1-2.kafka]*.jar
!
kafka~Topic~'RegTest.',Table_Name
!
kafka~Model~Transaction
!
! External Properties File
!
java~properties~/control_files/kafka_avro.properties
!

The Confluent Kafka distribution includes Java libraries in three OpenVMS directories so three classpath statements are needed.

In the java~properties directive, control_files is an OpenVMS logical name that points to the directory that contains the file kafka_avro.properties. Kafka_avro.properties contains the SSL information:

#
# SSL Configuration
#
security.protocol=ssl
ssl.truststore.location=/regtest_ssl/kafka_truststore.jks
ssl.truststore.password=<your password 1>
ssl.keystore.location=/regtest_ssl/kafka_keystore.jks
ssl.keystore.password=<your password 2>
ssl.key.password=<your password 3>
#

Regtest_ssl is another OpenVMS logical name that points to the OpenVMS directory that contains the truststore, keystore, and certificate that were provided by the person who manages our Kafka servers. This information allows this JCC LogMiner Loader Kafka Option session to make an encrypted SSL (TLS V1.2) connection to the Kafka server.

Avro Example

With Avro, the message is compressed when it is published and decompressed when it is consumed. The following example is a decompressed message viewed as a human readable JSON document. Avro documents differ from JSON documents in several important ways:

  • There is no preamble
  • Data is represented using the Avro Type and Logical Type specifications as defined in the Apache Avro™ 1.8.0 Specification. The difference is most obvious in the columns AMOUNT_N (source BIGINT(2)) and the date-time columns.

{
  "ORIGINATING_DBKEY": 29273399270309913,
  "DETAILS_ID": 473912,
  "PROGNAME": "DETAILS                        ",
  "PROGINDEX": "008",
  "SEQNUM": 1,
  "AMOUNT_F": 59.263252,
  "AMOUNT_G": 59.26325225830078,
  "AMOUNT_N": {
    "bytes": "\u0017\f"
  },
  "DATE_VMS": 1999834870435758,
  "DATE_ANSI": 12059,
  "TIMESTAMP_A": 2624897135220000,
  "TIMESTAMP_A_CHAR": "2053-03-06 18:05:35.22",
  "UUID": "CF9F1E55675BE9118F7FD89D67F5352A",
  "JCCLML_ACTION": "M",
  "LOADER_SEQUENCE_NUMBER": 275156,
  "LOADERNAME": "KAFKA_AVRO",
  "LOADER_VERSION": "V03.06.00",
  "LOADER_LINK_DATE_TIME": 1550755776580000,
  "TRANSACTION_COMMIT_TIME": 1554882210943797,
  "TRANSACTION_START_TIME": 1554882210940797,
  "JCCLML_READ_TIME": 1554883497927023,
  "JCCLML_AERCP": "1-28-8575-2948497-503695762-503695762",
  "TRANSMISSION_DATE_TIME": 1554883497963023
}

Rdb date-time datatypes are represented as Avro {"type":"long","logicalType":"timestampmicros"} types. The Avro libraries provide functions for converting between the Avro representations and human readable formats.

For more details about how Rdb types are represented in Avro, see "Comparison of Rdb and Avro Datatypes" in the JCC LogMiner Loader Kafka Option documentation.

Monitoring JCC LogMiner Loader Performance

The JCC LogMiner Loader includes a performance monitoring utility, JCC_LML_STATISTICS. By default, JCC_LML_STATISTICS displays information to the local terminal. It also supports outputting data in CSV and T4 format. The following examples use the default display-to-screen mode.

The utility is started with the command:

$ jcc_lml_statistics kafka_avro 2 d

  • Kafka_avro is the loader name
  • Display is refreshed every 2 seconds
  • Display is Details

With the Details display, the initial screen is:

Rate:   2.00                       KAFKA_AVRO            12-APR-2019 16:56:49.00
================================================================================
   Input: 12-APR-2019 16:53:02.29                 KAFKA: (none)
--[Trail:    0.00]---------------             ---[Trail:    0.00]---------------
Transactions                 4012             Checkpoints                     67
Records                      8960                Timeout                       0
   Modify                    4898                BufferLimit(  2160)           0
   Delete                      50                NoWork                        0
   Commit                    4012             Records(   4)                 7570
 Discarded                                       Messages(  N/A )            N/A
   Filtered                     0                Filtered                   2625
   Excluded                     0             Failure                          0
   Unknown                      0             Timeout                          0
   Restart                      0        - Current ---------------- Ave/Second -
   NoWork                     391        Checkpoints             2          1.71
   Heartbeat                    0        Records               241        205.81
Timeout                        34        Rate                 6.58%
--- Restart Context ------               - Latency(sec) ------ LML detail ------
M|AIJ#               8665 |              CLM    3.8m |  Inpt   0.5%  Cnvt   8.6%
Q|VBN               21303 |              ------------   Sort   0.1%  Trgt  90.7%
P|TSN           510114551 |              LML    0.85    Sync   0.0%  Ckpt   0.2%
 CTSN           510114551 |        - Loaders - 0123 ----------------------------
  LSN                   0 |        - States  - >>>>
 

 JCC LogMiner Loader V3.6 provides a number of additional options. Entering "?" or "h" will list the following options:

Interactive Key Help
  Key   Description
  ?,h   This help screen
   b    Switch to the 'b'rief display
   d    Switch to the 'd'etail display
   s    Switch to the 's'tates display
   f    Switch to the 'f'ull display
   r    If on 'f'ull display, show rates rather than totals
   t    If on 'f'ull display, show totals rather than rates
 ctrl-t Print runtime information
 ctrl-w If in display mode, clear screen
 ctrl-m Switch between display mode and scroll mode
 ctrl-c |
 ctrl-y | Exit the jcc_lml_statistics utility
 ctrl-z |

The States 's' display shows the status of each of the LML threads:

Rate:   2.00                       KAFKA_AVRO            12-APR-2019 16:57:06.00
================================================================================
 In: 12-APR-2019 16:53:02.84 [   0.00]  KAFKA: (none)                  [   0.00]
                                         Chkpt             3  Recs           366
                                         CLM    4.0m |  Inpt   0.5%  Cnvt   8.2%
 0123 ----------------------------       ------------   Sort   0.0%  Trgt  91.2%
 >>>>                                    LML    0.80    Sync   0.0%  Ckpt   0.1%
 [  0.08] 0 commitTrans[recs:120]
 [  0.96] 1 Write->KAFKA
 [  1.53] 2 Write->KAFKA
 [  1.3m] 3 initTransactions

In this example, there are four loader threads publishing data to the Kafka server:

  • Thread 0 is has been executing a Kafka commitTransaction call for 0.08 seconds. It is committing 120 records (messages).
  • Thread 1 has been writing to Kafka for 0.96 seconds
  • Thread 2 has been writing to Kafka for 1.53 seconds
  • Thread 3 has been executing a Kafka initTransactions call for 1.3 minutes

In our regression test environment, we observe that the first initTransactions call in a process can take well over a minute. This delay takes place in the Kafka code and communication and we have not identified a reason for the delay. Once the first initTransaction has completed, performance is good.

The following example is from a few seconds later, after thread 3 had completed its first initTransactions call:


Rate:   2.00                       KAFKA_AVRO            12-APR-2019 16:57:40.00
================================================================================
 In: 12-APR-2019 16:53:05.10 [   4.6m]  KAFKA: 12-APR-2019 16:53:04.95 [   4.6m]
                                         Chkpt             1  Recs           418
                                         CLM    4.5m |  Inpt   0.1%  Cnvt  78.3%
 0123 ----------------------------       ------------   Sort   0.0%  Trgt  21.6%
 >>>>                                    LML    3.81    Sync   0.0%  Ckpt   0.0%
 [  0.30] 0 commitTrans[recs:121]
 [  1.03] 1 commitTrans[recs:129]
 [  0.74] 2 commitTrans[recs:121]
 [  0.47] 3 send(RegTest.PEOPLE(Commit TAD: 12-APR-2019 16:53:05.09 Read TAD

In this example, threads 0, 1, and 2 are executing Kafka commitTrans calls while thread 3 is publishing a message to the topic RegTest.PEOPLE.

Kafka Consumer

Because of the wide variety of potential requirements for consuming Kafka messages, the JCC LogMiner Loader Kafka Option does not include a Kafka Consumer.

Within the JCC LogMIner Loader regression test environment, we use custom Java programs to consume JSON and Avro messages. In this test environment, the messages are parsed and applied to a target database using a JDBC driver. This allows us to use existing tools to compare the target data with the source data.

In an environment that is using Kafka to integrate multiple data sources into a data lake, tools may already exist to consume Kafka messages.

Existing products (SQLStream for example) can be used to consume and process Kafka messages. Oracle has discussed (but not yet released) tools to allow Kafka messages to be mapped as Oracle external tables.

If you need assistance in setting up Kafka consumers in your environment, please contact us.

Ultimate Target

In the JCC LogMiner Loader test environment, the Kafka consumers apply the data to a target database using JDBC drivers. This allows us to compare the target data with the source data using our existing testing tools, but it is not the most likely architecture for a production data replication environment.

The use case that provided the impetus for the JCC LogMiner Loader Kafka Option was integrating Rdb data into a corporate data lake that contains information from multiple corporate sources.The goal of the data lake is to provide integrated input for larger data analysis projects.

Kafka could also be used to provide input for a management dashboard.

Once data is available in Kafka, there are a wide variety of tools and resources that can utilize the data.

Resources

The JCC LogMiner Loader Kafka Option documentation provides more information about publishing Rdb committed transactions to Kafka servers.

The JCC LogMiner Loader V3.6 Release Notes and JCC LogMiner Loader V3.6 Documentation provide additional information about JCC LogMiner Loader capabilities.