Architecture
RIOT architecture
RIOT processes data in batch fashion: a fixed number of records (a batch or a chunk) is read, processed, and written at a time. Then the cycle is repeated until there is no more data on the source.
Keys
Import commands (file-import
, faker-import
, db-import
) construct keys from input records by concatenating a keyspace prefix with the field names.
Batching
The default batch size is 50, which means that an execution step reads 50 items at a time from the source, processes them, and writes them to the target. If the target is Redis, writing is done in a single command (see Redis Pipelining) to minimize the number of roundtrips to the server.
You can change the batch size (and hence the pipeline size) using the --batch
option.
The optimal batch size in terms of throughput depends on many factors like record size and command types. See Redis Pipeline Tuning for details.
Batching example
riot faker-import value="random.nextDouble" --count 10 --batch 1 --sleep 1
--skip-policy never ts.add --keyspace ts:gen --value value
Multi-threading
It is possible to parallelize processing by using multiple threads. In this configuration, each chunk of items is read, processed, and written in a separate thread of execution. This is different from partitioning where items would be read by multiple readers. Here, only one reader is being accessed from multiple threads.
To set the number of threads, use the --threads
option.
Multi-threading example
riot db-import "SELECT * FROM orders" --url "jdbc:postgresql://host:port/database"
--username appuser --password passwd --threads 3 hset --keyspace order
--keys order_id
Processing
Processors are applied to records in the following order:
- Transforms
- Regular expressions
- Filters
Transforms
Transforms allow you to create, update, and delete fields using the Spring Expression Language (SpEL):
field1='foo'
-> generate a field namedfield1
containing the stringfoo
temp=(temp-32)*5/9
-> convert temperature from Fahrenheit to Celsiusname=remove(first).concat(remove(last))
-> concatenatefirst
andlast
fields and delete themfield2=null
-> deletefield2
Input fields are accessed by name (e.g., field3=field1+field2
).
The transform processor also exposes functions and variables that can be accessed using the #
prefix:
date
: Date parser/formatter (Java date format).geo
: Convenience method that takes a longitude and a latitude to produce a RediSearch geo-location string in the formlongitude,latitude
.index
: Sequence number of the item being generated.redis
: Handle to invoke Redis commands (Lettuce API).
Transform processor example
riot file-import --process epoch="#date.parse(mydate).getTime()" location="#geo(lon,lat)" id="#index" name="#redis.hget('person1','lastName')" ...
Regular expressions
Extract patterns from source fields using regular expressions:
riot file-import --regex name="(?<first>\w+)\/(?<last>\w+)" ...
Filters
Filters allow you to exclude records that don’t match a SpEL boolean expression.
For example this filter will only keep records where the value
field is a series of digits:
riot file-import --filter "value matches '\\d+'" ...