Using the JCC LogMiner Loader, Kafka, and SQLStream to Replicate Rdb Committed Transactions to SQL Server

Introduction

JCC Consulting, Inc. is developing an enhanced version of the JCC LogMiner Loader that supports replicating Rdb committed transactions to an Apache Kafka server. The current development version publishes each committed row to a Kafka server as an XML document but we plan to also support JSON. Kafka provides a reliable message transport. Once a message has been published to a Kafka server, it can be consumed by a variety of clients and by more than one client simultaneously. With Kafka as a JCC LogMiner Loader target, Rdb transactional data can be integrated into a data lake and/or evaluated with streaming analytical methods using a wide variety of existing software and tools.

In this example, we use SQLStream Blaze to consume the Kafka messages and apply them to a Microsoft SQL Server 2016 database. the data path is:

Rdb -> JCC LogMiner Loader -> Kafka -> SQLStream Blaze -> SQL Server

This example includes only one consumer. It would be possible to configure multiple consumers to concurrently accept the same messages from a Kafka server and do something with them.

Before discussing details, we will briefly discuss each of those pieces of the architecture.


What is JCC LogMiner Loader?

JCC LogMiner Loader uses the Oracle Rdb Continuous Logminer (RMU/Unload/After/Continuous) to extract committed changes from the Rdb After Image Journal (transaction log) and replicate those changes to a variety of targets. The current release version of the JCC LogMiner Loader is V3.5.1, which supports the following targets:

  • Oracle Rdb
  • Oracle OCI
  • JDBC drivers
  • Tuxedo
  • Files
  • API

The JCC Logminer supports a variety of methods for transforming the data it replicates. These methods are all available for transforming the data before it is published to Kafka.

This blog uses a JCC LogMiner Loader release after V3.5.1 that adds support for Kafka. Note that this version requires Java 1.8 be installed on OpenVMS:
$ java -version
java version "1.8.0.03-OpenVMS"
Java(TM) SE Runtime Environment (build 1.8.0.03-vms-rc1)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b02, mixed mode)
$

For the source database, we are using Oracle Rdb V7.3-210 on OpenVMS IA64 V8.4-2L1.

What is Apache Kafka?

Kafka is an open-source message broker that uses a Publish – Subscribe (Pub-Sub) model. Software (in this case the JCC LogMiner Loader) publishes messages to a Kafka server. Those messages can be consumed by one or more subscribers. Messages are published to one or more topics. Topics can be created in advance or created the first time a message is published to that topic.

By default, the same message can be consumed by multiple subscribers. It is possible to configure a Kafka topic so that each message can be consumed only once, but this example does not do that.

There are a number of Kafka distributions, including:

A Kafka installation can be configured as a cluster using Zookeeper to coordinate the nodes participating in the cluster. The details of possible Kafka configurations are beyond the scope of this blog.

In this example, we are using an Apache Kafka V1.0.0 server running on a single Centos 7.4 virtual system.

What is SQLStream Blaze?

SQLStream Blaze (http://sqlstream.com/) supports capturing streaming data from a variety of sources, including Kafka, and doing something with it. In this example, we used SQLStream Blaze to consume the XML documents published to Kafka by the JCC LogMiner Loader, parse the XML, and write that data to an SQL Server database. SQLStream Blaze supports a variety of targets as well as providing a variety of analytics on data streams.

In this example, we used SQLstream s-Server V5.2.3 running on a Centos 7.4 virtual server.

Target Database

In this example, we are using SQLStream to write data to a Microsoft SQL Server 2016 database. SQLStream supports a variety of JDBC targets, so choice of writing the data to an SQL Server database was arbitrary.

JCC LogMiner Loader Configuration

Two steps are needed to configure the JCC LogMIner Loader to work with Kafka:

  • Copy the Apache Kafka Jar files to OpenVMS
  • Specify JCC LogMiner Loader configuration parameters

Copying the Apache Kafka Jar files to OpenVMS

For the JCC LogMiner Loader to publish data to Kafka, it needs a number of Kafka routines that exist in a variety of languages, including Java. The JCC LogMiner Loader uses the Kafka Java libraries to publish messages. There is not a specific Kafka installation for OpenVMS, so transferring the Kafka Java libraries to an OpenVMS server requires some manual steps:

  1. Download the Kafka installation to a Windows system. This file has a name such as:
    kafka_2.11-1.1.0.tgz
  2. Use 7zip (or some other utility) to extract kafka_2.11-1.1.0.tar from kafka_2.11-1.1.0.tgz
  3. Use 7zip (or some other utility) to extract the Kafka folder tree from kafka_2.11-1.1.0.tar
    In this example, the expansion resulted in a folder tree that includes the folder:
    C:\Kafka\kafka_2.11-1.1.0\bin
  4. FTP the contents of the bin folder to a directory on the OpenVMS server.  In this example, we copied the contents of the bin folder JCC_ROOT:[KAFKA-1-1-0.LIBS]
  5. Reset the Jar file attributes with the command:
    $ set file/attr=(rfm:stmlf,rat:cr,lrl:0,mr:0) JCC_ROOT:[KAFKA-1-1-0.LIBS]*.jar

Multiple JCC LogMiner Loader sessions can use the same Kafka Jar files, so this step only needs to be done once.

Note that this example uses the Apache Kafka V1.1.0 Jar files to publish messages to an Apache Kafka V1.0.0 server. We are still reviewing how much version skew can be supported.

JCC LogMiner Loader Configuration for Kafka

The JCC LogMiner Loader configuration file needs specify the Kafka class path and the Kafka server, and a few other parameters. In this example, the primary configuration file is:


MF_PERSONNEL_KAFKA.INI
!
logging~initialization
logging~statistics~runtime,timer,commit
logging~input~ignore_unknown_tables,log_restart
!
sort~by_record
input~ipc
!
! Shut down after 10 consecutive failures.
!
input_failure~10
!
! Note that the Kafka classpath *must* be specified in VMS format
! with the wildcard
!
output~kafka~synch~kafka01:9092~record~xml
kafka~classpath~JCC_ROOT:[KAFKA-1-1-0.LIBS]*.jar
checkpoint~1~lml_internal~asynch~loader_kafka_chkpt
parallel~1~1~constrained
!
xml~header~
date_format~|!Y4-!MN0-!D0 !H04:!M0:!S0.!C2|
!
! The remainder of this file is standard Loader "stuff"
!
output_failure~1~10
thread~startup~0.1
thread~shutdown~10
!
include_file~mf_personnel_table.ini
include_file~mf_personnel_maptable.ini

The following comments highlight some specific pieces of the configuration file:

Output

output~kafka~synch~kafka01:9092~record~xml

  • Output is Kafka
  • Kafka server is called kafka01 and uses port 9092 (the default)
  • Record format of the output is XML

Classpath

kafka~classpath~JCC_ROOT:[KAFKA-1-1-0.LIBS]*.jar

  • The Kafka classpath *must* be specified in VMS format with the wildcard
  • This is the folder to which we FTPed the Jar files

Parallel threads

parallel~1~1

  • This example uses only a single JCC LogMiner Loader thread. There are performance benefits in having parallel threads. A full discussion of parallel threads with Kafka is beyond the scope of this blog.

XML Header

xml~header~

  • By default, the XML messages include two additional pieces, the “prolog” and the “document type declaration”. The xml header declaration without any additional syntax suppresses these pieces.

Date Format

date_format~|!Y4-!MN0-!D0 !H04:!M0:!S0.!C2|

  • This string causes dates to be published in the ISO 8601 format “yyyy-mm-dd hh:mm:ss.cc”. This format is understood by a variety of tools that do not understand the default OpenVMS date format.

What is published?

JCC LogMiner Loader publishes each row from a committed change as an XML document. The following four messages resulted from the SQL:
Update salary_history set salary_amount = salary_amount
  where employee_id = ‘00164’
commit;

<pkt><row name='SALARY_HISTORY' actn='M'><col name='EMPLOYEE_ID' type='str' len='5' val='00164'/><col name='SALARY_AMOUNT' type='num' val='26291.00'/><col name='SALARY_START' type='dt' val='1980-07-05 00:00:00.00'/><col name='SALARY_END' type='dt' val='1981-03-02 00:00:00.00'/><col name='ORIGINATING_DBKEY' type='num' val='25614222880800769'/></row></pkt>
<pkt><row name='SALARY_HISTORY' actn='M'><col name='EMPLOYEE_ID' type='str' len='5' val='00164'/><col name='SALARY_AMOUNT' type='num' val='51712.00'/><col name='SALARY_START' type='dt' val='1983-01-14 00:00:00.00'/><col name='SALARY_END' type='dt' null='Y'/><col name='ORIGINATING_DBKEY' type='num' val='25614222880800770'/></row></pkt>
<pkt><row name='SALARY_HISTORY' actn='M'><col name='EMPLOYEE_ID' type='str' len='5' val='00164'/><col name='SALARY_AMOUNT' type='num' val='26291.00'/><col name='SALARY_START' type='dt' val='1981-03-02 00:00:00.00'/><col name='SALARY_END' type='dt' val='1981-09-21 00:00:00.00'/><col name='ORIGINATING_DBKEY' type='num' val='25614222880800771'/></row></pkt>
<pkt><row name='SALARY_HISTORY' actn='M'><col name='EMPLOYEE_ID' type='str' len='5' val='00164'/><col name='SALARY_AMOUNT' type='num' val='50000.00'/><col name='SALARY_START' type='dt' val='1981-09-21 00:00:00.00'/><col name='SALARY_END' type='dt' val='1983-01-14 00:00:00.00'/><col name='ORIGINATING_DBKEY' type='num' val='25614222880800772'/></row></pkt>

While the example above can be parsed by a program, the following two messages are formatted to be human readable:

<pkt>
    <row name='SALARY_HISTORY' actn='M'>
        <col name='EMPLOYEE_ID' type='str' len='5' val='00164'/>
        <col name='SALARY_AMOUNT' type='num' val='51712.00'/>
        <col name='SALARY_START' type='dt' val='1983-01-14 00:00:00.00'/>
        <col name='SALARY_END' type='dt' null='Y'/>
        <col name='ORIGINATING_DBKEY' type='num' val='25614222880800770'/>
    </row>
</pkt>
<pkt>
    <row name='SALARY_HISTORY' actn='M'>
        <col name='EMPLOYEE_ID' type='str' len='5' val='00164'/>
        <col name='SALARY_AMOUNT' type='num' val='26291.00'/>
        <col name='SALARY_START' type='dt' val='1981-03-02 00:00:00.00'/>
        <col name='SALARY_END' type='dt' val='1981-09-21 00:00:00.00'/>
        <col name='ORIGINATING_DBKEY' type='num' val='25614222880800771'/>
    </row>
</pkt>

Several points:

  • XML is verbose
  • Each message contains the row name
  • The Action value can be M (modify) or D (delete). Rdb uses M for both insert and updates in the source database. D indicates that a row was deleted from the source database.
  • For each column, the column name is specified by “name=” and the value of the column by “val=”
  • Columns that were NULL in the source data contain “null='Y'”

Null Values

By default, the XML message that is published includes information about NULL columns with an explicit NULL flag set using the property “null=’Y’”:
<pkt>
    <row name='SALARY_HISTORY' actn='M'>
        <col name='EMPLOYEE_ID' type='str' len='5' val='00164'/>
        <col name='SALARY_AMOUNT' type='num' val='51712.00'/>
        <col name='SALARY_START' type='dt' val='1983-01-14 00:00:00.00'/>
        <col name='SALARY_END' type='dt' null='Y'/>
        <col name='ORIGINATING_DBKEY' type='num' val='25614222880800770'/>
    </row>
</pkt>

Some tools for parsing XML messages may not be able to easily parse the NULL flag. To support such tools, the following directive can be added to the JCC LogMiner Loader configuration file:

xml~null~implicit

With implicit NULLs, the output XML document omits NULL columns completely. In the following example, the SALARY_END column was NULL, so it is omitted from the XML document:
<pkt>
  <row name="SALARY_HISTORY" actn="M">
    <col name="EMPLOYEE_ID" type="str" len="5" val="00164"/>
    <col name="SALARY_AMOUNT" type="num" val="51712.00"/>
    <col name="SALARY_START" type="dt" val="1983-01-14 00:00:00.00"/>
    <col name="ORIGINATING_DBKEY" type="num" val="25614222880800770"/>
  </row>
</pkt>

If the xml~null directive is not included, the default is:

XML~Null~Explicit

Kafka

Kafka relies on Zookeeper for its metadata management and for clustering. We installed Zookeeper and Kafka on a Centos 7.4 (Linux) system. The installations are a bit do-it-yourself. We created the following service files to allow Zookeeper and Kafka to be started and stopped using the Linux systemctl command:

/etc/systemd/system/zookeeper.service

[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=forking
User=zookeeper
Group=zookeeper
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
WorkingDirectory=/var/lib/zookeeper

[Install]
WantedBy=multi-user.target

/etc/systemd/system/kafka.service

[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/etc/alternatives/jre
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

The actual installation and configuration of Kafka on the Centos 7.4 server are beyond the scope of this document.

Once Kafka has been installed and configured, it accepts messages from publishers and returns them to consumers.

Kafka Topics

By default, a Kafka topic is created when a message is published to a topic name. It is possible to configure a Kafka server so that topics have to be explicitly created before messages can be published to a topic. The JCC LogMiner Loader currently expects topics to be automatically created.

Useful OpenVMS Command Procedures

Because OpenVMS supports Java 8, and the Kafka commands are implemented in Java, it is possible to create command procedures that execute Kafka commands.

The following OpenVMS command procedures are useful:

KAFKA_CONSUMER.COM
$!
$! Start up a Kafka Consumer on OpenVMS
$! Server Kafka01
$!
$ @SYS$COMMON:[java$80.com]java$setup.com
$ java -version
$ set process/parse_style=extended
$!
$ kafka_topic = "''p1'"
$!
$!
$! define classpath "/jcc_root/Kafka/libs/*"
$ define classpath "/jcc_root/Kafka-1-1-0/libs/*"
$!
$ java -server kafka.tools.ConsoleConsumer –
      --bootstrap-server kafka01:9092 --topic 'kafka_topic'

KAFKA_TOPICS.COM
$!
$! List Kafka Topics
$!
$ @SYS$COMMON:[java$80.com]java$setup.com
$ java -version
$ set process/parse_style=extended
$!
$! define classpath "/jcc_root/Kafka/libs/*"
$ define classpath "/jcc_root/Kafka-1-1-0/libs/*"
$!
$!
$ java -server kafka.admin.TopicCommand --list --zookeeper kafka01:2181

These procedures could be enhanced a bit.

The following example uses kafka_topics.com to list topics that have been published to the Kafka server kafka01. It then uses the kafka_consumer.com procedure to listen for messages published to the topic “COLLEGES”.

JCC/ares ~dba~Dev~Builder> @kafka_topics.com
java version "1.8.0.03-OpenVMS"
Java(TM) SE Runtime Environment (build 1.8.0.03-vms-rc1)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b02, mixed mode)
%DCL-I-SUPERSEDE, previous value of CLASSPATH has been superseded
ALL_DATATYPES_TABLE
COLLEGES
DEGREES
EMPLOYEES
SALARY_HISTORY
TIME_TURNER
__consumer_offsets
employees
rdb.testtopic

JCC/ares ~dba~Dev~Builder> @kafka_consumer.com COLLEGES
java version "1.8.0.03-OpenVMS"
Java(TM) SE Runtime Environment (build 1.8.0.03-vms-rc1)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b02, mixed mode)
%DCL-I-SUPERSEDE, previous value of CLASSPATH has been superseded
<pkt><row name='COLLEGES' actn='M'><col name='COLLEGE_CODE' type='str' len='4' val='YALE'/><col name='COLLEGE_NAME' type='str' len='25' val='Yale University'/><col name='CITY' type='str' len='20' val='New Haven'/><col name='STATE' type='str' len='2' val='CT'/><col name='POSTAL_CODE' type='str' len='5' val='06520'/></row></pkt>

Kafka_consumer.com will wait for messages to be published until you enter a ctrl-C.

Other Kafka Commands

Once Java 8 has been set up in a process and the classpath logical has been defined, Kafka commands can be executed without creating a command procedure.

Create Partitions for a topic

The following example alters the Kafka topic SALARY_HISTORY to have two partitions:

ares > java  kafka.admin.TopicCommand  --zookeeper kafka01:2181 --alter --topic EMPLOYEES --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
ares >

When a message is published to a topic, it can optionally have a key. If a key is provided, by default Kafka uses a hash of the key to determine which partition is used for the message. Partitioning a topic may have performance benefits as it allows the Kafka server to do more in parallel.

The JCC LogMiner Loader publishes messages with the text-format of the source DBKey as the Kafka message key. This will tend to distribute messages across the partitions while messages for the same DBKey will be in the same partition. Message order is only guaranteed on a partition, not across partitions or topics.

When a Kafka server is configured, it is possible to configure a default number of partitions for a topic.

Topic Details

The following example shows the details of a topic, including the details.

ares > java kafka.admin.TopicCommand --zookeeper kafka01:2181 --topic EMPLOYEES --describe
Topic:EMPLOYEES PartitionCount:5        ReplicationFactor:1     Configs:
        Topic: EMPLOYEES        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: EMPLOYEES        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: EMPLOYEES        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
        Topic: EMPLOYEES        Partition: 3    Leader: 0       Replicas: 0     Isr: 0
        Topic: EMPLOYEES        Partition: 4    Leader: 0       Replicas: 0     Isr: 0
ares >

In this example, the EMPLOYEES topic partitions are not replicated within Kafka. This could result in data loss if the Kafka server fails.

Delete Topic

Delete a topic

ares > java kafka.admin.TopicCommand --zookeeper kafka01:2181 --topic TIME_TURNER --describe
Topic:TIME_TURNER       PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: TIME_TURNER      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
ares > java kafka.admin.TopicCommand --zookeeper kafka01:2181 --topic TIME_TURNER --delete
Topic TIME_TURNER is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
ares >


Topic Retention

The following example sets the retention period for the topic DETAILS_AUDIT to 7,200,000 milliseconds – two hours.

JCC/ares ~dba~Dev> java  kafka.admin.TopicCommand --zookeeper kafka01:2181 –
_JCC/ares ~dba~Dev> --describe --topic DETAILS_AUDIT
Topic:DETAILS_AUDIT     PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: DETAILS_AUDIT    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
JCC/ares ~dba~Dev> java  kafka.admin.TopicCommand --zookeeper kafka01:2181 –
_JCC/ares ~dba~Dev> --topic DETAILS_AUDIT --alter --config retention.ms=720000
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
Updated config for topic "DETAILS_AUDIT".
JCC/ares ~dba~Dev> java  kafka.admin.TopicCommand --zookeeper kafka01:2181 -
_JCC/ares ~dba~Dev> --describe --topic DETAILS_AUDIT
Topic:DETAILS_AUDIT     PartitionCount:1        ReplicationFactor:1     Configs:retention.ms=720000
        Topic: DETAILS_AUDIT    Partition: 0    Leader: 0       Replicas: 0     Isr: 0
JCC/ares ~dba~Dev>

The default retention is 168 hours and can be configured in the Kafka server configuration file. By default, Kafka checks log segments to see if they need to be retained every 300 seconds.

SQLStream Blaze Configuration

SQLStream Blaze is a streaming data utility designed to consume data from a variety of sources and do something with that data. In this example, we have configured an SQLStream installation to:

  • Consume XML messages from the Kafka server Kafka01
  • Parse the XML into SQL records
  • Use the SQL records to update tables in an SQL Server database

SQLStream Installation

We installed SQLStream sServer and StreamLab on a Centos 7.4 virtual system using the installation kits:

  • SQLstream-sServer-5.2.3-x64.run        
  • SQLstream-StreamLab-2.2.3-linux-x64.run

The installation was straightforward – just follow the prompts.

The Centos 7 system has a node-name of "sqlstream".

The commands documented here were initially developed using the SteamLab web tool then refined in a text editor and executed using the SQLStream command line interface, SQLLine. SQLLine can be invoked in a terminal window on sqlstream.jcc.com using the commands:

SQLSTREAM_HOME="/opt/sqlstream/5.2.3.16995/s-Server"
echo $SQLSTREAM_HOME
$SQLSTREAM_HOME/bin/sqllineClient

Example:

[keith@sqlstream ~]$ $SQLSTREAM_HOME/bin/sqllineClient
OpenJDK 64-Bit Server VM warning: Using incremental CMS is deprecated and will likely be removed in a future release
Connecting to jdbc:sqlstream:sdp://sqlstream.jcc.com;sessionName='sqllineClient@/dev/pts/2:keith@sqlstream'
Connected to: SQLstream (version 5.2.3.16995)
Driver: SQLstreamJdbcDriver (version 5.0-distrib)
Autocommit status: true
Transaction isolation: TRANSACTION_REPEATABLE_READ
sqlline version 1.0.10-jh by Marc Prud'hommeaux
0: jdbc:sqlstream:sdp://sqlstream.jcc.com>
0: jdbc:sqlstream:sdp://sqlstream.jcc.com>
0: jdbc:sqlstream:sdp://sqlstream.jcc.com> !quit
Closing: com.sqlstream.aspen.vjdbc.AspenDbcConnection
[keith@sqlstream ~]$

There are a number of commands that start with an “!”, including “!quit”.

SQLStream -- Creating Streams

The following annotated example creates an input stream for EMPLOYEES XML documents and maps it to a target table DBO.EMPLOYEES in a target database. This set of commands can be copied and pasted into an SQLline session.
--
-- SQLStream Config 2018-03-26.txt
-- Keith W. Hare
-- JCC Consulting, Inc.
--
-- Create (or Replace) a Schema for the definitions to read
-- data published to Kafka by the JCC LogMiner Loader
--
CREATE OR REPLACE SCHEMA "JCCKafkaPersonnel";
--
-- Stop any existing pumps and streams
--
ALTER PUMP "JCCKafkaPersonnel".* STOP;
ALTER STREAM "JCCKafkaPersonnel".* RESET;

--
-- remove some objects possible left over from previous attempts.
-- The "SQLLine Configuration Preference setting" "!set force true"
-- means that follow-on commands will work even if the DROPS fail.
--
!set force true
DROP VIEW "JCCKafkaPersonnel"."employees_stream" CASCADE;
DROP FOREIGN STREAM "JCCKafkaPersonnel"."employees_stream" CASCADE;
!set force false

--
--  ECDA reading adapter/agent with Discovery support
--  This maps a library that can be used to consumer and process messages. The
--  library is included with the SQLStream installation.
--
CREATE OR REPLACE FOREIGN DATA WRAPPER ECDAWRAPPER
    LIBRARY 'class com.sqlstream.aspen.namespace.common.CommonDataWrapper' LANGUAGE JAVA;

--
-- The Server is used by the foreign stream to associate it with the appropriate
-- Foreign Data Wrapper
--
CREATE OR REPLACE SERVER "ECDAReaderServer_JCCKafkaPersonnel"
TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDAWRAPPER;

--
-- Create a Foreign Stream that maps the XML of a Kafka topic into
-- columns in an object that can be treated by an XML table.
--
-- The foreign stream first defines the columns, then specifies how to extract
-- the column values from the XML document.
--
CREATE OR REPLACE FOREIGN STREAM "JCCKafkaPersonnel"."employees_stream_raw_fs"
(
    "EMPLOYEE_ID" CHAR(5),
    "LAST_NAME" CHAR(14),
    "FIRST_NAME" CHAR(10),
    "MIDDLE_INITIAL" CHAR(1),
    "ADDRESS_DATA_1" CHAR(25),
    "ADDRESS_DATA_2" CHAR(20),
    "CITY" CHAR(20),
    "STATE" CHAR(2),
    "POSTAL_CODE" CHAR(5),
    "SEX" CHAR(1),
    "BIRTHDAY" TIMESTAMP,
    "STATUS_CODE" CHAR(1),
    "JCCLML_ACTION" CHAR(1)
)
    SERVER "ECDAReaderServer_JCCKafkaPersonnel"
    OPTIONS (
        "PARSER" 'XML',
        "CHARACTER_ENCODING" 'UTF-8',
        "PARSER_XML_ROW_TAGS" '/pkt/row[@name="EMPLOYEES"]',
        "PARSER_XML_USE_ATTRIBUTES" 'false',
        
        "EMPLOYEE_ID_XPATH" 'col[@name="EMPLOYEE_ID"]/@val',
        "LAST_NAME_XPATH" 'col[@name="LAST_NAME"]/@val',
        "FIRST_NAME_XPATH" 'col[@name="FIRST_NAME"]/@val',
        "MIDDLE_INITIAL_XPATH" 'col[@name="MIDDLE_INITIAL"]/@val',
        "ADDRESS_DATA_1_XPATH" 'col[@name="ADDRESS_DATA_1"]/@val',
        "ADDRESS_DATA_2_XPATH" 'col[@name="ADDRESS_DATA_2"]/@val',
        "CITY_XPATH" 'col[@name="CITY"]/@val',
        "STATE_XPATH" 'col[@name="STATE"]/@val',
        "POSTAL_CODE_XPATH" 'col[@name="POSTAL_CODE"]/@val',
        "SEX_XPATH" 'col[@name="SEX"]/@val',
        "BIRTHDAY_XPATH" 'col[@name="BIRTHDAY"]/@val',
        "STATUS_CODE_XPATH" 'col[@name="STATUS_CODE"]/@val',
        "JCCLML_ACTION_XPATH" '/pkt/row/@actn',

        "SEED_BROKERS" 'kafka01',
        "PORT" '9092',
        "TOPIC" 'EMPLOYEES',
        "STARTING_TIME" 'earliest',
        "STARTING_OFFSET" '-1',
        "PARTITION" '',
        "BUFFER_SIZE" '1048576',
        "FETCH_SIZE" '1000000',
        "CLIENT_ID" '',
        "METRICS_PER_PARTITION" 'false'

    );
--
-- The directive PARSER_XML_ROW_TAGS specifies the XPath expression that identifies a row.
-- The Xpath expression:
--    '/pkt/row[@name="EMPLOYEES"]'
-- matches the packets with the row name of EMPLOYEES in the XML message:
--    <pkt>
--        <row name='EMPLOYEES' actn='M'>
--            <col name='EMPLOYEE_ID' type='str' len='5' val='14258'/>
--            <col name='LAST_NAME' type='str' len='14' val='Harrington'/>
--        ...
--        </row>
--    </pkt>  
-- An Xpath expression is needed for each column
--        "EMPLOYEE_ID_XPATH" 'col[@name="EMPLOYEE_ID"]/@val',
-- The column Xpath expressions are interpreted in the contxt of the XML Row Tag so
-- the '/pkt/row' portion are not needed in the xolumn Xpath expressions.
-- In Xpath, the '@' references the value of an element. The Square brackets
-- are used when the value of an element is being compared to something.
--
-- Note the column JCCLML_ACTION and its Xpath JCCLML_ACTION_XPATH. This will
-- include the 'M' or 'D' action that the JCC LogMiner Loader receives
-- from the RMU/Unload/After command.
--
-- The last section specifies how to connect to the Kafka server
--
--   "SEED_BROKERS" 'kafka01',      Kafka server is kafk01
--   "PORT" '9092',                 Default Kafka port
--   "TOPIC" 'EMPLOYEES',           Topic created by the JCC LogMiner Loader
--   "STARTING_TIME" 'earliest',    Start with the earliers message on the Kafka server.
--                                  can also use LATEST.
--
-- We have not yet dug into the details of the additional parameters
--

--
-- The following stream will be populated from the foreign stream. Note
-- the addition of the column SQLSTREAM_ROW time.
--
CREATE OR REPLACE STREAM "JCCKafkaPersonnel"."employees_stream"
(
    "EMPLOYEE_ID" CHAR(5),
    "LAST_NAME" CHAR(14),
    "FIRST_NAME" CHAR(10),
    "MIDDLE_INITIAL" CHAR(1),
    "ADDRESS_DATA_1" CHAR(25),
    "ADDRESS_DATA_2" CHAR(20),
    "CITY" CHAR(20),
    "STATE" CHAR(2),
    "POSTAL_CODE" CHAR(5),
    "SEX" CHAR(1),
    "BIRTHDAY" TIMESTAMP,
    "STATUS_CODE" CHAR(1),
    "JCCLML_ACTION" CHAR(1),
    "SQLSTREAM_ROWTIME" TIMESTAMP
);

--
-- The PUMP moves the incoming Kafka data into the table employee_stream.
-- The column ROWTIME is added by the SQLStream server when it consumes
-- a message from Kafka. The STREAM qualifier says to operate continuously
-- from the point that the pump is started to the point when it is stopped.
-- The syntax in this example assumes that the columns in "employees_stream"
-- are in the same order as the columns in "employees_stream_raw_fs"
--
CREATE OR REPLACE PUMP "JCCKafkaPersonnel"."employees_stream-Pump" STOPPED AS
INSERT INTO "JCCKafkaPersonnel"."employees_stream"
  SELECT STREAM *, "ROWTIME"
    FROM "JCCKafkaPersonnel"."employees_stream_raw_fs";
--
-- Split Modify and Delete
-- I thought separate views of the M and D rows would be useful
-- but did not end up using them.
--
CREATE OR REPLACE VIEW "JCCKafkaPersonnel"."Employees_guide_M"
AS
SELECT  STREAM  *
  FROM "JCCKafkaPersonnel"."employees_stream"
 WHERE JCCLML_ACTION = 'M';

CREATE OR REPLACE VIEW "JCCKafkaPersonnel"."Employees_guide_D"
AS
SELECT  STREAM  *
  FROM "JCCKafkaPersonnel"."employees_stream"
 WHERE JCCLML_ACTION = 'D';


--
-- Output Sink is SQL Server
--
CREATE OR REPLACE SERVER "SQLserver_Thrud_Kafka-Personnel"
    FOREIGN DATA WRAPPER "SYS_JDBC"
    OPTIONS (
        "URL" 'jdbc:sqlserver://thrud.jcc.com;databaseName=Personnel;',
        "USER_NAME" 'test',
        "PASSWORD" 'gratefalsing',
        "DIALECT" 'Microsoft SQL Server',
        "JNDI_WRITEBACK" 'true',
        "pollingInterval" '1000',
        "txInterval" '1000',
        "DRIVER_CLASS" 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
    );

--
-- External table Sink_Employees is mapped to the target table dbo.employees
-- SQLStream requires that this definition have the same columns at the
-- target table. This is an option to import the entire schema, but we have
-- not yet tested that.
--
CREATE OR REPLACE FOREIGN TABLE "JCCKafkaPersonnel"."Sink_Employees"
   ("EMPLOYEE_ID" CHAR(5),
    "LAST_NAME" CHAR(14),
    "FIRST_NAME" CHAR(10),
    "MIDDLE_INITIAL" CHAR(1),
    "ADDRESS_DATA_1" CHAR(25),
    "ADDRESS_DATA_2" CHAR(20),
    "CITY" CHAR(20),
    "STATE" CHAR(2),
    "POSTAL_CODE" CHAR(5),
    "SEX" CHAR(1),
    "BIRTHDAY" TIMESTAMP,
    "STATUS_CODE" CHAR(1),
    "JCCLML_ACTION" CHAR(1),
    "SQLSTREAM_ROWTIME" TIMESTAMP)
    
SERVER "SQLserver_Thrud_Kafka-Personnel"
OPTIONS (
    "SCHEMA_NAME" 'dbo',
    "TABLE_NAME" 'employees',
    "TRANSACTION_ROW_LIMIT" '0',
    "TRANSACTION_ROWTIME_LIMIT" '1000'
);
--
-- TRANSACTION_ROW_LIMIT and TRANSACTION_ROWTIME_LIMIT contol
-- how frequently transactions are committed to the target database.
--


--
-- This pump merges data from employees_stream into the target table
-- Sink_Employees. SQLStream supports INSERT, UPDATE, and MERGE. It does
-- not support DELETE. Because of this, I've added the JCCLML_ACTION code.
--
CREATE OR REPLACE PUMP "JCCKafkaPersonnel"."external_employees-Pump" STOPPED
AS
MERGE INTO "JCCKafkaPersonnel"."Sink_Employees" AS "sink"
    USING (SELECT STREAM
                  "EMPLOYEE_ID",
                  "LAST_NAME",
                  "FIRST_NAME",
                  "MIDDLE_INITIAL",
                  "ADDRESS_DATA_1",
                  "ADDRESS_DATA_2",
                  "CITY",
                  "STATE",
                  "POSTAL_CODE",
                  "SEX",
                  "BIRTHDAY",
                  "STATUS_CODE",
                  "JCCLML_ACTION",
                  "SQLSTREAM_ROWTIME"
                  FROM "JCCKafkaPersonnel"."employees_stream" ) AS "input"
    ON ("sink"."EMPLOYEE_ID" = "input"."EMPLOYEE_ID")
WHEN MATCHED THEN
    UPDATE SET "LAST_NAME"      = "input"."LAST_NAME",
               "FIRST_NAME"     = "input"."FIRST_NAME",
               "MIDDLE_INITIAL" = "input"."MIDDLE_INITIAL",
               "ADDRESS_DATA_1" = "input"."ADDRESS_DATA_1",
               "ADDRESS_DATA_2" = "input"."ADDRESS_DATA_2",
               "CITY"           = "input"."CITY",
               "STATE"          = "input"."STATE",
               "POSTAL_CODE"    = "input"."POSTAL_CODE",
               "SEX"            = "input"."SEX",
               "BIRTHDAY"       = "input"."BIRTHDAY",
               "STATUS_CODE"    = "input"."STATUS_CODE",
               "JCCLML_ACTION"  = "input"."JCCLML_ACTION",
               "SQLSTREAM_ROWTIME"     = "input"."SQLSTREAM_ROWTIME"
WHEN NOT MATCHED THEN
    INSERT ("EMPLOYEE_ID",
            "LAST_NAME",
            "FIRST_NAME",
            "MIDDLE_INITIAL",
            "ADDRESS_DATA_1",
            "ADDRESS_DATA_2",
            "CITY",
            "STATE",
            "POSTAL_CODE",
            "SEX",
            "BIRTHDAY",
            "STATUS_CODE",
            "JCCLML_ACTION",
            "SQLSTREAM_ROWTIME")
    VALUES ("EMPLOYEE_ID" ,
            "LAST_NAME" ,
            "FIRST_NAME" ,
            "MIDDLE_INITIAL" ,
            "ADDRESS_DATA_1" ,
            "ADDRESS_DATA_2" ,
            "CITY" ,
            "STATE" ,
            "POSTAL_CODE" ,
            "SEX" ,
            "BIRTHDAY" ,
            "STATUS_CODE",
            "JCCLML_ACTION",
            "SQLSTREAM_ROWTIME" );

--
-- The pumps were created with the STOPPED qualifier. Altering them to START
-- will cause them to continuously move data from the input to the output tables
-- specified in each of the pump definitions.
--
ALTER PUMP "JCCKafkaPersonnel"."employees_stream-Pump" START;
ALTER PUMP "JCCKafkaPersonnel"."external_employees-Pump" START;

This example shows the steps for a single table. While it is verbose, adding additional tables is relatively straightforward.

What does this all look like?

The following examples show the data in three places:

  • Rdb SQL on OpenVMS
  • SQLStream SQLline
  • DB Visualizer accessing SQL Server

Example 1: Update

The SQLStream SQLline query is a continuous query, so it returns data when it receives it:

0: jdbc:sqlstream:sdp://sqlstream.jcc.com> select EMPLOYEE_ID, LAST_NAME, STATUS_CODE,JCCLML_ACTION,SQLSTREAM_ROWTIME
. . . . . . . . . . . . . . . . . . . . .>  from "JCCKafkaPersonnel"."employees_stream";
'EMPLOYEE_ID','LAST_NAME','STATUS_CODE','JCCLML_ACTION','SQLSTREAM_ROWTIME'

The Rdb SQL query returns rows for two employee_id's:

SQL> select EMPLOYEE_ID, LAST_NAME, STATUS_CODE
cont>  from employees
cont>  where employee_id in ('00164', '20000')
cont>  ;
 EMPLOYEE_ID   LAST_NAME        STATUS_CODE
 00164         Toliver          1
 20000         Mistretta        1
2 rows selected
SQL>
SQL>
SQL> update employees
cont>   set status_code = '2'
cont> where employee_id in ('00164', '20000')
cont> ;
2 rows updated
SQL> commit;

When the Rdb SQL transaction commits, the JCC LogMiner Loader publishes the changes to the Kafka server. SQLStream consumes the Kafka messages and returns the data to the continuous query:

0: jdbc:sqlstream:sdp://sqlstream.jcc.com> select EMPLOYEE_ID, LAST_NAME, STATUS_CODE,JCCLML_ACTION,SQLSTREAM_ROWTIME
. . . . . . . . . . . . . . . . . . . . .>  from "JCCKafkaPersonnel"."employees_stream";
'EMPLOYEE_ID','LAST_NAME','STATUS_CODE','JCCLML_ACTION','SQLSTREAM_ROWTIME'
'00164','Toliver       ','2','M','2018-03-30 17:47:14.842'
'20000','Mistretta     ','2','M','2018-03-30 17:47:14.842'

SQLStream writes the data to the SQL Server table:
 

 Kafka SQL Server Insert

 Note the columns JCCLML_ACTION and SQLSTREAM_ROWTIME.

Example 2: Delete

The Rdb SQL Delete statement deletes the EMPLOYEES row with the employee_id of ‘20000’:

SQL> select EMPLOYEE_ID, LAST_NAME, STATUS_CODE
cont>  from employees
cont>  where employee_id in ('00164', '20000')
cont>  ;
 EMPLOYEE_ID   LAST_NAME        STATUS_CODE
 00164         Toliver          2
 20000         Mistretta        2
2 rows selected
SQL> delete from employees
cont>   where employee_id = '20000'
cont> ;
1 row deleted
SQL> commit;

As soon as the Rdb SQL transaction is committed, the JCC LogMiner Loader publishes the deleted row to the Kafka server, where it is processed by SQLStream and returned to the continuous query:

0: jdbc:sqlstream:sdp://sqlstream.jcc.com> select EMPLOYEE_ID, LAST_NAME, STATUS_CODE,JCCLML_ACTION,SQLSTREAM_ROWTIME
. . . . . . . . . . . . . . . . . . . . .>  from "JCCKafkaPersonnel"."employees_stream";
'EMPLOYEE_ID','LAST_NAME','STATUS_CODE','JCCLML_ACTION','SQLSTREAM_ROWTIME'
'00164','Toliver       ','2','M','2018-03-30 17:47:14.842'
'20000','Mistretta     ','2','M','2018-03-30 17:47:14.842'
'20000','Mistretta     ','2','D','2018-03-30 17:52:00.29'

Note the status_code of ‘D’ for employee_id ‘20000’ and the SQLSTREAM_ROWTIME value.

SQLStream writes the data to the target table:


 Kafka SQL Server Delete

SQLStream makes the timestamp ROWTIME available for every message it reads from a source. ROWTIME is the timestamp when SQLStream consumed the message and is in UTC (Universal Time Coordinate) time.

Future Directions

We are looking at a number of additional complexities and enhancements for the JCC LogMiner Loader Kafka interface.

Performance

This example was focused on how to use Kafka as a target using a minimal workload. This testing did not specifically look at performance. The end-to-end performance could be reviewed by adding timestamps along the way:

  • Add Virtual columns in JCC LML:
    • JCCLML_READ_TIME
    • TRANSMISSION_DATE_TIME
  • Add SQLStream Rowtime (this time is UTC)
  • Add a timestamp column in the target with a default value
  • Write SQL Queries in the target to compare the times

A casual observation is that rows were replicated very quickly.

Transactions

This test replicated the committed data to the target database, but the target transactions were not the same sets of changes as the source transactions. SQLStream was using a timer to choose when to commit transactions against the target database.

We can configure the JCC LogMiner Loader to add the Loader Sequence Number to the data stream. Within a JCC LogMiner Loader session, the Loader Sequence Number is the ordinal position of a transaction commit. The Loader Sequence Number is preserved as a part of the Loader restart information, so it is a reliable indicator of the order in which source transactions are committed. The Loader Sequence Number can be used to coordinate transaction for data replicated through Kafka, but it may require custom code to ensure the data is applied to the target in a sequence that avoids buried transactions.

Kafka Topics

The JCC LogMiner Loader is currently publishing data with topics based on the table name. The XML messages contain the table name so it would be possible to have all messages for a particular loader session use the same topic and let the consumer can differentiate between the XML messages based on the table name within the XML. This seems like a potentially useful option, but we have not yet implemented this in the JCC LogMiner Loader.

Publishing Messages in JSON Format

Because the JCC LogMiner Loader already supported XML for the file and API targets, we implemented XML as the first format for publishing to Kafka. Now that we have the mechanisms in place for publishing to Kafka, we are working on adding support for JSON messages. The JSON format is likely to resemble the XML format:
{"row":
  {"name" :"SALARY_HISTORY"
  ,"columns" :
    {"actn" : "M"
    ,"EMPLOYEE_ID" : "00164"
    ,"SALARY_AMOUNT" : "51712.00"
    ,"SALARY_START" : "1983-01-14 00:00:00.00"
    ,"ORIGINATING_DBKEY" :"25614222880800770"
    }
  }
}

JSON messages will be simpler, and therefore smaller, than equivalent XML messages. This example assumes that NULLs will be indicated by not excluding the NULL column from the JSON message.

Summary

This example replicates committed changes from an Rdb database to a target database using the following sequence of tools:

  1. JCC LogMiner Loader
  2. Kafka
  3. SQLStream
  4. Microsoft SQL Server

Once data has been published to Kafka, it is available to any tool that can consume Kafka messages.
This example uses SQLStream to consume the messages. SQLStream was chosen because it allowed the quick creation of a working example that replicated data to a target that could be easily queried.