Real-time case
Overview
This example shows how to use chili to build a real-time data pipeline.
graph TD
PUB[**Publisher** pub]
SUB[**Subscriber** sub]
TICK[**Message Broker** tick]
PUB --> TICK
TICK --> SUB
style PUB fill:#7E57C2
style SUB fill:#7E57C2
style TICK fill:#D32F2F
Chili Source Code
Start a message broker instance, listen at port 18000.
Source code:
.tick.msgLog: "/tmp/msg.log";
// .broker.validateSeq returns valid message number
tick(0, .broker.validateSeq(.tick.msgLog, 0b));
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open("file://" + .tick.msgLog);
.tick.schema: {
trade: ([]
sym: `sym $ [],
price: `f64 $ [],
size: `i64 $ [],
side: `sym $ [],
time: `timestamp $ [],
),
quote: ([]sym: `sym $ [], bid: `f64 $ [], ask: `f64 $ [], time: `timestamp $ []),
};
.tick.upd: function(table, data) {
.log.info(["received", count(data), "for", table]);
.tick.msgHandle([`upd, table, data]);
.broker.publish(`upd, table, data);
// tick(0, 1) is a built-in function for updating internal tick count
// use `tick(0, 0)` to get current tick count, `tick neg tick 0` to reset tick count.
tick(0, 1);
};
.tick.subscribe: function(tables) {
tables: if{count(tables); tables; key(.tick.schema)};
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.subscribe(this.h, ) each tables;
[.tick.msgLog, tick(0, 0), .tick.schema]
};
.tick.unsubscribe: function(tables) {
tables: if{count(tables); tables; key(.tick.schema)};
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.unsubscribe(this.h, ) each tables;
};
.tick.eod: function() {
.broker.eod([`eod, today(`)]);
};
Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.
Source code:
h: .handle.open(`chili://:18000);
pub: function() {
n: 10;
.log.info(["publish", n, "record"]);
h(
[
`.tick.upd,
`trade,
([]sym: n ? `a`b`c, price: n ? 1.0, size: n ? 10, side: n ? `B`S, time: n # now(`)),
]
);
h([`.tick.upd, `quote, ([]sym: n ? `a`b`c, bid: n ? 1.0, ask: n ? 1.0, time: n # now(`))]);
};
.job.add("pub", now(`), now(`) + 1D00:00:00, 0D00:00:01, "publish");
Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.
Source code:
upd: function(table, data) {
table upsert data;
tick(this.h, 1);
};
.sub.init: function() {
h: .handle.open(`chili://:18000:sub1:token789);
.handle.onDisconnected(h, `.sub.recover);
info: h([`.tick.subscribe, []]);
(set) each info(2);
// .log.info(["broker info", info]);
replay(info(0), 0, info(1), [], 1b, h);
.handle.subscribing(h);
};
// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: function(handle) {
.handle.connect(handle);
info: handle([`.tick.subscribe, []]);
replay(info(0), tick(0), info(1), [], 1b, handle);
.handle.subscribing(handle);
};
.sub.init();
Pepper Source Code
Start a message broker instance, listen at port 18000.
Source code:
.tick.msgLog: "/tmp/msg.log";
// .broker.validateSeq returns valid message number
tick[0; .broker.validateSeq[.tick.msgLog; 0b]];
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open "file://" + .tick.msgLog;
.tick.schema: {
trade: ([]
sym: `sym $ ();
price: `f64 $ ();
size: `i64 $ ();
side: `sym $ ();
time: `timestamp $ ();
);
quote: ([]sym: `sym $ (); bid: `f64 $ (); ask: `f64 $ (); time: `timestamp $ ());
};
.tick.upd: {[table; data]
.log.info ("received"; count data; "for"; table);
.tick.msgHandle (`upd; table; data);
.broker.publish[`upd; table; data];
// tick[0; 1] is a built-in function for updating internal tick count
// use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
tick[0; 1];
};
.tick.subscribe: {[tables]
tables: $[count tables; tables; key .tick.schema];
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.subscribe[this.h; ] each tables;
(.tick.msgLog; tick[0; 0]; .tick.schema)
};
.tick.unsubscribe: {[tables]
tables: $[count tables; tables; key .tick.schema];
// this is reserved for current stack
// this.h is the handle for the IPC connection of current stack
.broker.unsubscribe[this.h; ] each tables;
};
.tick.eod: {[] .broker.eod[(`eod; today[`])]; };
Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.
Source code:
h: .handle.open `chili://:18000;
pub: {[]
n: 10;
.log.info ("publish"; n; "record");
h (
`.tick.upd;
`trade;
([]sym: n ? `a`b`c; price: n ? 1.0; size: n ? 10; side: n ? `B`S; time: n # now[`]);
);
h (`.tick.upd; `quote; ([]sym: n ? `a`b`c; bid: n ? 1.0; ask: n ? 1.0; time: n # now[`]));
};
.job.add["pub"; now[`]; now[`] + 1D00:00:00; 0D00:00:01; "publish"];
Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.
Source code:
upd: {[table; data] table upsert data; tick[this.h; 1]; };
.sub.init: {[]
h: .handle.open `chili://:18000:sub1:token789;
.handle.onDisconnected[h; `.sub.recover];
info: h (`.tick.subscribe; ());
(set) each info[2];
// .log.info ("broker info"; info);
replay[info[0]; 0; info[1]; (); 1b; h];
.handle.subscribing[h];
};
// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: {[handle]
.handle.connect[handle];
info: handle (`.tick.subscribe; ());
replay[info[0]; tick[0]; info[1]; (); 1b; handle];
.handle.subscribing[handle];
};
.sub.init[];