Real-time case
Overview
This example shows how to use chili to build a real-time data pipeline.
graph TD
PUB[**Publisher** pub]
SUB[**Subscriber** sub]
TICK[**Message Broker** tick]
PUB --> TICK
TICK --> SUB
style PUB fill:#7E57C2
style SUB fill:#7E57C2
style TICK fill:#D32F2F
Chili Source Code
Start a message broker instance, listen at port 18000.
Source code:
.tick.msgLog: "/tmp/msg.log";
// .broker.validateSeq returns valid message number
tick(.broker.validateSeq(.tick.msgLog, 0b));
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open("file://" + .tick.msgLog);
.tick.schema: {
trade: ([]sym:`sym$[], price:`f64$[], size:`i64$[], side:`sym$[], time:`timestamp$[]),
quote: ([]sym:`sym$[], bid:`f64$[], ask:`f64$[], time:`timestamp$[]),
};
.tick.upd:function(table, data){
.log.info(["received", count(data), "for", table]);
.tick.msgHandle([`upd, table, data]);
.broker.publish(`upd, table, data);
// tick[1] is a built-in function for updating internal tick count
// use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
tick(1);
};
.tick.subscribe:function(tables){
tables: if{count(tables); tables; key(.tick.schema)};
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.subscribe(this.h, ) each tables;
[.tick.msgLog, tick(0), .tick.schema]
};
.tick.unsubscribe:function(tables){
tables: if{count(tables); tables; key(.tick.schema)};
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.unsubscribe(this.h, ) each tables;
};
.tick.eod:function(){
.broker.eod([`eod, today(`)]);
};
Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.
Source code:
h: .handle.open(`chili://:18000);
pub: function() {
n: 10;
.log.info(["publish", n , "record"]);
h([`.tick.upd, `trade, ([]sym: n?`a`b`c, price: n?1.0, size: n?10, side: n?`B`S, time: n#now(`))]);
h([`.tick.upd, `quote, ([]sym: n?`a`b`c, bid: n?1.0, ask: n?1.0, time: n#now(`))]);
};
.job.add("pub", now(`), now(`)+1D00:00:00, 0D00:00:01, "publish");
Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.
Source code:
upd: function(table, data) {
table upsert data;
tick(1);
};
.sub.init: function() {
h: .handle.open(`chili://:18000:sub1:token789);
.handle.onDisconnected(h, `.sub.recover);
info: h([`.tick.subscribe, []]);
(set) each info(2);
// .log.info(["broker info", info]);
replay(info(0), 0, info(1), [], 1b);
.handle.subscribing(h);
};
// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: function(handle){
.handle.connect(handle);
info: handle([`.tick.subscribe, []]);
replay(info(0), tick(0), info(1), [], 1b);
.handle.subscribing(handle);
};
.sub.init();
Pepper Source Code
Start a message broker instance, listen at port 18000.
Source code:
.tick.msgLog: "/tmp/msg.log";
// .broker.validateSeq returns valid message number
tick .broker.validateSeq[.tick.msgLog; 0b];
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open "file://" + .tick.msgLog;
.tick.schema: {
trade: ([]sym: `sym$(); price: `f64$(); size: `i64$(); side: `sym$(); time: `timestamp$());
quote: ([]sym: `sym$(); bid: `f64$(); ask: `f64$(); time: `timestamp$());
};
.tick.upd:{[table; data]
.log.info ("received"; count data; "for"; table);
.tick.msgHandle (`upd; table; data);
.broker.publish[`upd; table; data];
// tick[1] is a built-in function for updating internal tick count
// use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
tick[1];
};
.tick.subscribe:{[tables]
tables: $[count tables; tables; key .tick.schema];
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.subscribe[this.h; ] each tables;
(.tick.msgLog; tick[0]; .tick.schema)
};
.tick.unsubscribe:{[tables]
tables: $[count tables; tables; key .tick.schema];
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.unsubscribe[this.h; ] each tables;
};
.tick.eod:{[]
.broker.eod[(`eod; today[`])];
};
Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.
Source code:
h: .handle.open `chili://:18000;
pub: {[]
n: 10;
.log.info ("publish"; n ; "record");
h (`.tick.upd; `trade; ([]sym: n?`a`b`c; price: n?1.0; size: n?10; side: n?`B`S; time: n#now[`]));
h (`.tick.upd; `quote; ([]sym: n?`a`b`c; bid: n?1.0; ask: n?1.0; time: n#now[`]));
};
.job.add["pub"; now[`]; now[`]+1D00:00:00; 0D00:00:01; "publish"];
Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.
Source code:
upd: {[table; data]
table upsert data;
tick[1];
};
.sub.init: {[]
h: .handle.open `chili://:18000:sub1:token789;
.handle.onDisconnected[h; `.sub.recover];
info: h (`.tick.subscribe; ());
(set) each info[2];
// .log.info ("broker info"; info);
replay[info[0]; 0; info[1]; (); 1b];
.handle.subscribing[h];
};
// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: {[handle]
.handle.connect[handle];
info: handle (`.tick.subscribe; ());
replay[info[0]; tick[0]; info[1]; (); 1b];
.handle.subscribing[handle];
};
.sub.init[];