Skip to content

Latest commit

 

History

History
71 lines (53 loc) · 3.16 KB

File metadata and controls

71 lines (53 loc) · 3.16 KB

JDBC Sink

JDBC sink allows you to persist incoming payload into an RDBMS database.

The jdbc.consumer.columns property represents pairs of COLUMN_NAME[:EXPRESSION_FOR_VALUE] where EXPRESSION_FOR_VALUE (together with the colon) is optional. In this case the value is evaluated via generated expression like payload.COLUMN_NAME, so this way we have a direct mapping from object properties to the table column. For example we have a JSON payload like:

{
  "name": "My Name",
  "address": {
     "city": "Big City",
     "street": "Narrow Alley"
  }
}

So, we can insert it into the table with name, city and street structure using the configuration:

--jdbc.consumer.columns=name,city:address.city,street:address.street

This sink supports batch inserts, as far as supported by the underlying JDBC driver. Batch inserts are configured via the batch-size and idle-timeout properties: Incoming messages are aggregated until batch-size messages are present, then inserted as a batch. If idle-timeout milliseconds pass with no new messages, the aggregated batch is inserted even if it is smaller than batch-size, capping maximum latency.

Note
The module also uses Spring Boot’s DataSource support for configuring the database connection, so properties like spring.datasource.url etc. apply.

Examples

java -jar jdbc-sink.jar --jdbc.consumer.tableName=names \
            --jdbc.consumer.columns=name \
            --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
            --spring.datasource.url='jdbc:mysql://localhost:3306/test

Payload

Options

The jdbc sink has the following options:

Properties grouped by prefix:

jdbc.consumer

batch-size

Threshold in number of messages when data will be flushed to database table. (Integer, default: 1)

columns

The comma separated colon-based pairs of column names and SpEL expressions for values to insert/update. Names are used at initialization time to issue the DDL. (String, default: payload:payload.toString())

idle-timeout

Idle timeout in milliseconds when data is automatically flushed to database table. (Long, default: -1)

initialize

'true', 'false' or the location of a custom initialization script for the table. (String, default: false)

table-name

The name of the table to write into. (String, default: messages)

spring.datasource

driver-class-name

Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default: <none>)

password

Login password of the database. (String, default: <none>)

url

JDBC URL of the database. (String, default: <none>)

username

Login username of the database. (String, default: <none>)