Skip to content

Real-time case

Overview

This example shows how to use chili to build a real-time data pipeline.

graph TD
    PUB[**Publisher** pub]
    SUB[**Subscriber** sub]
    TICK[**Message Broker** tick]

    PUB --> TICK
    TICK --> SUB

    style PUB fill:#7E57C2
    style SUB fill:#7E57C2
    style TICK fill:#D32F2F

Chili Source Code

Start a message broker instance, listen at port 18000.

chili ./src/chili/tick.chi -p 18000

Source code:

.tick.msgLog: "/tmp/msg.log";

// .broker.validateSeq returns valid message number
tick(.broker.validateSeq(.tick.msgLog, 0b));
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open("file://" + .tick.msgLog);

.tick.schema: {
  trade: ([]sym:`sym$[], price:`f64$[], size:`i64$[], side:`sym$[], time:`timestamp$[]),
  quote: ([]sym:`sym$[], bid:`f64$[], ask:`f64$[], time:`timestamp$[]),
};

.tick.upd:function(table, data){
  .log.info(["received", count(data), "for", table]);
  .tick.msgHandle([`upd, table, data]);
  .broker.publish(`upd, table, data);
  // tick[1] is a built-in function for updating internal tick count
  // use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
  tick(1);
};

.tick.subscribe:function(tables){
  tables: if{count(tables); tables; key(.tick.schema)};
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.subscribe(this.h, ) each tables;
  [.tick.msgLog, tick(0), .tick.schema]
};

.tick.unsubscribe:function(tables){
  tables: if{count(tables); tables; key(.tick.schema)};
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.unsubscribe(this.h, ) each tables;
};

.tick.eod:function(){
  .broker.eod([`eod, today(`)]);
};

Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.

chili ./src/chili/pub.chi -i 1000

Source code:

h: .handle.open(`chili://:18000);

pub: function() {
  n: 10;
  .log.info(["publish", n , "record"]);
  h([`.tick.upd, `trade, ([]sym: n?`a`b`c, price: n?1.0, size: n?10, side: n?`B`S, time: n#now(`))]);
  h([`.tick.upd, `quote, ([]sym: n?`a`b`c, bid: n?1.0, ask: n?1.0, time: n#now(`))]);
 };

.job.add("pub", now(`), now(`)+1D00:00:00, 0D00:00:01, "publish");

Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.

chili ./src/pepper/sub.chi

Source code:

upd: function(table, data) {
  table upsert data;
  tick(1);
};

.sub.init: function() {
  h: .handle.open(`chili://:18000:sub1:token789);
  .handle.onDisconnected(h, `.sub.recover);
    info: h([`.tick.subscribe, []]);
  (set) each info(2);
  // .log.info(["broker info", info]);
  replay(info(0), 0, info(1), [], 1b);
  .handle.subscribing(h);
};

// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: function(handle){
  .handle.connect(handle);
  info: handle([`.tick.subscribe, []]);
  replay(info(0), tick(0), info(1), [], 1b);
  .handle.subscribing(handle);
};

.sub.init();

Pepper Source Code

Start a message broker instance, listen at port 18000.

chili -P ./src/pepper/tick.pep -p 18000

Source code:

.tick.msgLog: "/tmp/msg.log";

// .broker.validateSeq returns valid message number
tick .broker.validateSeq[.tick.msgLog; 0b];
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open "file://" + .tick.msgLog;

.tick.schema: {
  trade: ([]sym: `sym$(); price: `f64$(); size: `i64$(); side: `sym$(); time: `timestamp$());
  quote: ([]sym: `sym$(); bid: `f64$(); ask: `f64$(); time: `timestamp$());
};

.tick.upd:{[table; data]
  .log.info ("received"; count data; "for"; table);
  .tick.msgHandle (`upd; table; data);
  .broker.publish[`upd; table; data];
  // tick[1] is a built-in function for updating internal tick count
  // use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
  tick[1];
};

.tick.subscribe:{[tables]
  tables: $[count tables; tables; key .tick.schema];
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.subscribe[this.h; ] each tables;
  (.tick.msgLog; tick[0]; .tick.schema)
};

.tick.unsubscribe:{[tables]
  tables: $[count tables; tables; key .tick.schema];
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.unsubscribe[this.h; ] each tables;
};

.tick.eod:{[]
  .broker.eod[(`eod; today[`])];
};

Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.

chili -P ./src/pepper/pub.pep -i 1000

Source code:

h: .handle.open `chili://:18000;

pub: {[]
  n: 10;
  .log.info ("publish"; n ; "record");
  h (`.tick.upd; `trade; ([]sym: n?`a`b`c; price: n?1.0; size: n?10; side: n?`B`S; time: n#now[`]));
  h (`.tick.upd; `quote; ([]sym: n?`a`b`c; bid: n?1.0; ask: n?1.0; time: n#now[`]));
 };

.job.add["pub"; now[`]; now[`]+1D00:00:00; 0D00:00:01; "publish"];

Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.

chili -P ./src/pepper/sub.pep

Source code:

upd: {[table; data]
  table upsert data;
  tick[1];
};

.sub.init: {[]
  h: .handle.open `chili://:18000:sub1:token789;
  .handle.onDisconnected[h; `.sub.recover];
  info: h (`.tick.subscribe; ());
  (set) each info[2];
  // .log.info ("broker info"; info);
  replay[info[0]; 0; info[1]; (); 1b];
  .handle.subscribing[h];
};

// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: {[handle]
  .handle.connect[handle];
  info: handle (`.tick.subscribe; ());
  replay[info[0]; tick[0]; info[1]; (); 1b];
  .handle.subscribing[handle];
};

.sub.init[];