In DolphinDB, we will import historic knowledge right into a stream desk in chronological order as “real-time knowledge” in order that the identical script can be utilized each for backtesting and real-time buying and selling. Relating to streaming in DolphinDB, please discuss with DolphinDB Streaming Tutorial.
This text introduces the features replay
and replayDS
after which demonstrates the method of information replaying.
1. Capabilities
replay
replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [absoluteRate=true], [parallelLevel=1])
Operate replay
injects knowledge from specified tables or knowledge sources into stream tables.
‘inputTables’
is a desk or a tuple. Every component of the tuple is an unpartitioned desk or a knowledge supply generated by the performreplayDS
.‘outputTables’
is a desk or a tuple of tables, or a string or a string vector. The variety of components ofoutputTables
should be the identical because the variety of components ofinputTables
. If it’s a vector, it’s a listing of the names of the shared stream tables the place the replayed knowledge of the corresponding tables ofinputTables
are saved. If it’s a tuple, every component is a shared stream desk the place the replayed knowledge of the corresponding desk ininputTables
are saved. The schema of every desk inoutputTables
should be similar to the schema of the corresponding desk ininputTables
.‘dateColumn’
and‘timeColumn’
are strings indicating the date column and time column in inputTables. If neither is specified, the primary column of the desk is chosen as‘dateColumn’
. If there’s a‘dateColumn’
, it should be one of many partitioning columns. If solely‘timeColumn’
is specified, it should be one of many partitioning columns. If details about date and time comes from the identical column (e.g., DATETIME, TIMESTAMP), use the identical column for each‘dateColumn’
and‘timeColumn’
. Information are replayed in batches decided by the smallest unit of time in‘timeColumn’
or‘dateColumn’
if‘timeColumn’
is just not specified. For instance, if the smallest unit of time in‘timeColumn’
is second, then all knowledge in the identical second are replayed in the identical batch; if‘timeColumn’
is just not specified, then all knowledge in the identical day are replayed in the identical batch.‘replayRate’
is a nonnegative integer indicating the variety of rows to be replayed per second. If it’s not specified, it means knowledge are replayed on the most pace.‘replayRate’
is an integer.‘absoluteRate’
is a Boolean worth. The default worth is true.
Relating to ‘replayRate’
and ‘absoluteRate’
:
- If
‘replayRate
’ is a optimistic integer andabsoluteRate=true
, replay on the pace of‘replayRate’
rows per second. - If
‘replayRate’
is a optimistic integer andabsoluteRate=false
, replay at‘replayRate’
instances the unique pace of the info. For instance, if the distinction between the utmost and the minimal values of‘dateColumn’
or‘timeColumn’
is n seconds, then it takesn/replayRate
seconds to complete the replay. - If ‘
replayRate’
is unspecified or unfavourable, replay on the most pace.‘parallelLevel’
is a optimistic integer. When the scale of particular person partitions within the knowledge sources is simply too giant relative to reminiscence dimension, we have to use performreplayDS
to additional divide particular person partitions into smaller knowledge sources.'parallelLevel'
signifies the variety of threads loading knowledge into reminiscence from these smaller knowledge sources concurrently. The default worth is 1. If'inputTables'
is a desk or a tuple of tables, the efficient'parallelLevel'
is at all times 1.
replayDS
replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])
Operate replayDS
generates a bunch of information sources for use because the inputs of perform replay
. It splits a SQL question into a number of subqueries based mostly on 'timeRepartitionSchema'
with 'timeColumn'
inside every 'dateColumn'
partition.
‘sqlObj’
is a desk or metacode with SQL statements (akin to<choose * from sourceTable>
) indicating the info to be replayed. The desk object of “choose from” should use a DATE sort column as one of many partitioning columns.‘dateColumn’
and‘timeColumn’
are strings indicating the date column and time column. If neither is specified, the primary column of the desk is chosen as‘dateColumn’
. If there’s a‘dateColumn’
, it should be one of many partitioning columns. If solely‘timeColumn’
is specified, it should be one of many partitioning columns. If details about date and time comes from the identical column (e.g., DATETIME, TIMESTAMP), use the identical column for each‘dateColumn’
and‘timeColumn’
. OperatereplayDS
and the corresponding performreplay
should use the identical set of'dateColumn'
and'timeColumn'
.‘timeRepartitionSchema’
is a TIME or NANOTIME sort vector.‘timeRepartitionSchema’
deliminates a number of knowledge sources on the dimension of‘timeColumn’
inside every‘dateColumn’
partition. For instance, if timeRepartitionSchema=[t1, t2, t3], then there are 4 knowledge sources inside a day: [00:00:00.000,t1), [t1,t2), [t2,t3) and [t3,23:59:59.999).
Replay a Single In-Memory Table
replay(inputTable, outputTable, `date, `time, 10)
Replay a Single Table Using Data Sources
To replay a single table with a large number of rows, we can use function replayDS
together with function replay. Function
replayDSdeliminates multiple data sources on the dimension of 'timeColumn' within each 'dateColumn' partition. Parameter 'parallelLevel' of function
replay` specifies the number of threads loading data into memory from these smaller data sources simultaneously. In this example, ‘parallelLevel’ is set to 2.
inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay(inputDS, outputTable, `date, `time, 1000, true, 2)
Replay Multiple Tables Simultaneously Using Data Sources
To replay multiple tables simultaneously, assign a tuple of these table names to the parameter ‘inputTables’
of the function replay
and specify the output tables. Each of the output tables corresponds to an input table and should have the same schema as the corresponding input table. All input tables should have identical 'dateColumn'
and 'timeColumn'
.
ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, true, 2)
Cancel Replay
If perform replay
was referred to as with submitJob
, we will use getRecentJobs
to get jobId, then cancel the replay with the command cancelJob
.
getRecentJobs()
cancelJob(jobid)
If perform replay
was referred to as instantly, we will use getConsoleJobs
in one other GUI session to get jobId, then cancel the replay and use the command cancelConsoleJob
.
getConsoleJobs()
cancelConsoleJob(jobId)
2. How To Use Replayed Information
Replayed knowledge are streaming knowledge. We will subscribe to and course of the replayed knowledge within the following 3 methods:
- Subscribe to DolphinDB. Write user-defined features in DolphinDB to course of streaming knowledge.
- Subscribe to DolphinDB. To conduct real-time calculations with streaming knowledge, use DolphinDB’s built-in streaming aggregators akin to time-series aggregator, cross-sectional aggregator, and anomaly detection engine. They’re very simple to make use of and have wonderful efficiency. In part 3.2, we use a cross-sectional aggregator to calculate the intrinsic worth of an ETF.
- With third-party shoppers by DolphinDB’s streaming API.
3. Examples
Replay stage 1 inventory quotes to calculate ETF intrinsic worth.
On this instance, we replay the extent 1 inventory quotes in US inventory markets on 2007/08/17, and calculate the intrinsic worth of an ETF with the built-in cross-sectional aggregator in DolphinDB. The next are the schema of the enter desk ‘quotes’ and a preview of the info.
quotes = database("dfs://TAQ").loadTable("quotes");
quotes.schema().colDefs;
choose high 10 * from quotes the place date=2007.08.17
1. To replay a considerable amount of knowledge, if we load all knowledge into reminiscence first, we could have an out-of-memory drawback. We will first use perform replayDS
and specify parameter 'timeRepartitionSchema'
to divide the info into 60 elements based mostly on the column ‘time’.
trs = cutPoints(09:30:00.000..16:00:00.000, 60)
rds = replayDS(<choose * from quotes the place date=2007.08.17>, `date, `time, trs);
2. Outline the output stream desk ‘outQuotes’
.
sch = choose identify,typeString as sort from quotes.schema().colDefs
share streamTable(100:0, sch.identify, sch.sort) as outQuotes
3. Outline a dictionary for the ETF elements weights and performance etfVal
to calculate ETF intrinsic worth. For simplicity, we use an ETF with solely 6 element shares.
defg etfVal(weights,sym, value) {
return wsum(value, weights[sym])
}
weights = dict(STRING, DOUBLE)
weights[`AAPL] = 0.1
weights[`IBM] = 0.1
weights[`MSFT] = 0.1
weights[`NTES] = 0.1
weights[`AMZN] = 0.1
weights[`GOOG] = 0.5
4. Outline a streaming aggregator to subscribe to the output stream desk ‘outQuotes’
. We specify a filtering situation for the subscription that solely knowledge with inventory symbols of AAPL, IBM, MSFT, NTES, AMZN, or GOOG are revealed to the aggregator. This considerably reduces pointless community overhead and knowledge switch.
setStreamTableFilterColumn(outQuotes, `image)
outputTable = desk(1:0, `time`etf, [TIMESTAMP,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `image, `perBatch)
subscribeTable(tableName="outQuotes", actionName="tradesCrossAggregator", offset=-1, handler=append!{tradesCrossAggregator}, msgAsTable=true, filter=`AAPL`IBM`MSFT`NTES`AMZN`GOOG)
5. Begin to replay knowledge on the specified pace of 100,000 rows per second. The streaming aggregator conducts real-time calculations with the replayed knowledge.
submitJob("replay_quotes", "replay_quotes_stream", replay, [rds], [`outQuotes], `date, `time, 100000, true, 4)
6. Verify ETF intrinsic values
choose high 15 * from outputTable
![Introduction To Backtesting Technique – DZone – Insta News Hub Introduction To Backtesting Technique – DZone – Insta News Hub](https://dz2cdn1.dzone.com/storage/temp/17533940-1709090828637.png)
4. Efficiency Testing
We examined knowledge replaying in DolphinDB on a server with the next configuration:
- Server: DELL PowerEdge R730xd
- CPU: Intel Xeon(R) CPU E5–2650 v4(24cores, 48 threads, 2.20GHz)
- RAM: 512 GB (32GB × 16, 2666 MHz)
- Harddisk: 17T HDD (1.7T × 10, learn pace 222 MB/s, write pace 210 MB/s)
- Community: 10 Gigabit Ethernet
DolphinDB script:
sch = choose identify,typeString as sort from quotes.schema().colDefs
trs = cutPoints(09:30:00.000..16:00:00.001,60)
rds = replayDS(<choose * from quotes the place date=2007.08.17>, `date, `time, trs);
share streamTable(100:0, sch.identify, sch.sort) as outQuotes1
jobid = submitJob("replay_quotes","replay_quotes_stream", replay, [rds], [`outQuotes1], `date, `time, , ,4)
When replaying at most pace (parameter ‘replayRate’
is just not specified) The output desk is just not subscribed, and it solely takes about 100 seconds to replay 336,305,414 rows of information.