Oracle CDC by mimicking an UPSERT using NiFi

Murtaza Kheriwala
6 min readJul 7, 2021
Photo by Tom Parsons on Unsplash

Motivation

We recently started exploring Apache NiFi for our ETL (actually just E-L) journey. Our main requirement was to daily migrate a couple of hundred tables from different ERPs and other application databases to our reporting Oracle Data Warehouse (Oracle 12c), as-is. And we decided to look for open source options, given their proven track records to solve problems and of course, being free. And Apache NiFi was one of the top contenders for the job.

Moving further, this article will require some basic understanding of NiFi and its processors.

Now as mentioned earlier, our initial use case was to refresh a few hundred tables into the data warehouse, but this got a little resource intensive (especially in our single node sandbox NiFi environment) when refreshing tables containing more than a few million records.

This made us look at ways in NiFi to try and mimic an upsert in Oracle, as Oracle 12c did not natively support upserts, which led me to come up with the following flow which seemed to work well for fetching changes (new or updated records) from the source Oracle database, and put them into the Oracle Data Warehouse.

Local Setup

My local setup, which I put together for the purpose of this article, was just running docker containers for Oracle Express and Apache NiFi, and using SQL Developer as the client tool for Oracle. I have put all of the resources regarding the setup (on macOs Big Sur) in this github repo, and documented the setup in more detail on the README. This repo contains the ddl to spin up the source and target tables, in different schemas, for NiFi to move data between. It also contains some dml to populate the source database with some update test data.

And finally…

So lets start first with the NiFi data flow image (flow xml checked in here), and we’ll then drill down in the details.

Data update/insert flow

Before going further into how this flow works, there are 2 conditions that are required on the source database table for this flow to work:

  1. A primary key or a column with a unique constraint.
  2. A column which always increments in value for any new inserts or updates, like an update timestamp.

The first column in the flow fetches the max value for the update timestamp from the target database table, to determine the baseline for changes from source i.e. fetch changes from the source table post this time, as the target is updated up till this time. Once this value is fetched from the target, it is stored as a flowfile attribute.

The next column of the flow then fetches the source data post the extracted timestamp in the previous steps. This is done using a combination of the GenerateTableFetch and ExecuteSQLRecord processors. GenerateTableFetch generates the SQL to fetch records from the source table, and passes the SQL to the ExecuteSQLRecord for it to be executed. One of the properties on the GenerateTableFetch processor that should be configured is the Partition Size which when configured fetches the configured amount of records from the source db, and results in generating multiple SQL statements with offsets. This limits the number of rows being extracted from the database at a time, which optimises the amount of data flowing between these processors. Here’s looking at the properties of the GenerateTableFetch and ExecuteSQLRecord processors:

GenerateTableFetch processor
ExecuteSQLRecord processor

The database connection is fairly straight-forward using the Database Connection Pooling Service. The Additional WHERE clause is also configured to extract records from source where update times are greater than the max update time extracted from the target in the previous processors.

The ExecuteSQLRecord processor executes the SQL generated by GenerateTableFetch and exports the result as json, as configured in the Record Writer property. As the Max Rows Per Flow File property is set to 1, this processor outputs each record separately as json content in a flow file, and feeds into the next processor, which is EvaluateJsonPath.

EvaluateJsonPath processor

The EvaluateJsonPath processor extracts the value of the primary key/unique index column from the flow file’s json content and stores it as a flow file attribute called lookup_key, and passes the flow file further to the LookupAttribute processor.

Finally, this last column in the flow performs the conditional routing of flow files, and determines whether to insert the or update the records (flow files) in the target database.

LookupAttribute processor

The LookupAttribute processor uses the lookup_key attribute extracted from the previous EvaluateJsonPath processor, and passes it to the SimpleDatabaseLookupService, and returns the output of the Service as described below, in a new attribute called key on the existing flow file.

SimpleDatabaseLookupService

This Lookup Service checks for the lookup_key value against the Lookup Key Column in the target database table, and if found, returns the value of the Lookup Value Column from the target table, otherwise returns null.

RouteOnAttribute processor

The RouteOnAttribute processor simply checks if the key attribute value is null, implying that this primary key was not found on the target table, sending that flowfile to the insert connection. If on the other hand, the key value is not null, meaning the key value fetches an existing value back from the target, resulting in the flow file being sent to the unmatched (or update) connection.

The last processors ConvertJSONToSQL and PutSQL simply function as per their names, i.e. convert the flow file json to an INSERT or UPDATE sql statement, and then apply that statement to the target database table.

Final Thoughts

There are a few things one needs to be aware of in this flow, from a functional and performance perspective.

  1. Firstly, as we are fetching changes from large tables (with millions of records), one can argue that looking up the key in the target table for each fetched/changed record from the source, would be costly as the target table would also be similarly sized as the source (as it is fetching just a delta from the source). Due to this reason, it is mandatory for the target table to also have the same primary key/unique index on the target table, as the source.
  2. This flow can be run frequently, every 5 mins for example, which will mean that it would fetch a small changeset on each run.
  3. I hope it is clear from the above explanation why an update timestamp is absolutely critical for this flow to work, as each subsequent run is required to fetch the delta from the previous run, which can be tracked using an update timestamp column.
  4. DELETES at source cannot be tracked or fetched using this flow.

Finally, I hope this flow actually helps others with a similar use case.

--

--