Skip to content

Real-time case

Overview

This example shows how to use chili to build a real-time data pipeline.

graph TD
    PUB[**Publisher** pub]
    SUB[**Subscriber** sub]
    TICK[**Message Broker** tick]

    PUB --> TICK
    TICK --> SUB

    style PUB fill:#7E57C2
    style SUB fill:#7E57C2
    style TICK fill:#D32F2F

Chili Source Code

Start a message broker instance, listen at port 18000.

chili ./src/chili/tick.chi -p 18000

Source code:

.tick.msgLog: "/tmp/msg.log";

// .broker.validateSeq returns valid message number
tick(0, .broker.validateSeq(.tick.msgLog, 0b));
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open("file://" + .tick.msgLog);

.tick.schema: {
  trade: ([]
    sym: `sym $ [],
    price: `f64 $ [],
    size: `i64 $ [],
    side: `sym $ [],
    time: `timestamp $ [],
  ),
  quote: ([]sym: `sym $ [], bid: `f64 $ [], ask: `f64 $ [], time: `timestamp $ []),
};

.tick.upd: function(table, data) {
  .log.info(["received", count(data), "for", table]);
  .tick.msgHandle([`upd, table, data]);
  .broker.publish(`upd, table, data);
  // tick(0, 1) is a built-in function for updating internal tick count
  // use `tick(0, 0)` to get current tick count, `tick neg tick 0` to reset tick count.
  tick(0, 1);
};

.tick.subscribe: function(tables) {
  tables: if{count(tables); tables; key(.tick.schema)};
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.subscribe(this.h, ) each tables;
  [.tick.msgLog, tick(0, 0), .tick.schema]
};

.tick.unsubscribe: function(tables) {
  tables: if{count(tables); tables; key(.tick.schema)};
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.unsubscribe(this.h, ) each tables;
};

.tick.eod: function() {
  .broker.eod([`eod, today(`)]);
};

Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.

chili ./src/chili/pub.chi -i 1000

Source code:

h: .handle.open(`chili://:18000);

pub: function() {
  n: 10;
  .log.info(["publish", n, "record"]);
  h(
    [
      `.tick.upd,
      `trade,
      ([]sym: n ? `a`b`c, price: n ? 1.0, size: n ? 10, side: n ? `B`S, time: n # now(`)),
    ]
  );
  h([`.tick.upd, `quote, ([]sym: n ? `a`b`c, bid: n ? 1.0, ask: n ? 1.0, time: n # now(`))]);
};

.job.add("pub", now(`), now(`) + 1D00:00:00, 0D00:00:01, "publish");

Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.

chili ./src/pepper/sub.chi

Source code:

upd: function(table, data) {
  table upsert data;
  tick(this.h, 1);
};

.sub.init: function() {
  h: .handle.open(`chili://:18000:sub1:token789);
  .handle.onDisconnected(h, `.sub.recover);
  info: h([`.tick.subscribe, []]);
  (set) each info(2);
  // .log.info(["broker info", info]);
  replay(info(0), 0, info(1), [], 1b, h);
  .handle.subscribing(h);
};

// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: function(handle) {
  .handle.connect(handle);
  info: handle([`.tick.subscribe, []]);
  replay(info(0), tick(0), info(1), [], 1b, handle);
  .handle.subscribing(handle);
};

.sub.init();

Pepper Source Code

Start a message broker instance, listen at port 18000.

chili -P ./src/pepper/tick.pep -p 18000

Source code:

.tick.msgLog: "/tmp/msg.log";

// .broker.validateSeq returns valid message number
tick[0; .broker.validateSeq[.tick.msgLog; 0b]];
// str + str is allowed in chili, so we can concatenate strings using `+` operator
.tick.msgHandle: .handle.open "file://" + .tick.msgLog;

.tick.schema: {
  trade: ([]
    sym: `sym $ ();
    price: `f64 $ ();
    size: `i64 $ ();
    side: `sym $ ();
    time: `timestamp $ ();
  );
  quote: ([]sym: `sym $ (); bid: `f64 $ (); ask: `f64 $ (); time: `timestamp $ ());
};

.tick.upd: {[table; data]
  .log.info ("received"; count data; "for"; table);
  .tick.msgHandle (`upd; table; data);
  .broker.publish[`upd; table; data];
  // tick[0; 1] is a built-in function for updating internal tick count
  // use `tick 0` to get current tick count, `tick neg tick 0` to reset tick count.
  tick[0; 1];
};

.tick.subscribe: {[tables]
  tables: $[count tables; tables; key .tick.schema];
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.subscribe[this.h; ] each tables;
  (.tick.msgLog; tick[0; 0]; .tick.schema)
};

.tick.unsubscribe: {[tables]
  tables: $[count tables; tables; key .tick.schema];
  // this is reserved for current stack
  // this.h is the handle for the IPC connection of current stack
  .broker.unsubscribe[this.h; ] each tables;
};

.tick.eod: {[] .broker.eod[(`eod; today[`])]; };

Start a publisher instance to publish data to the message broker, publish 10 records every second. -i 1000 is required to trigger job every 1 second.

chili -P ./src/pepper/pub.pep -i 1000

Source code:

h: .handle.open `chili://:18000;

pub: {[]
  n: 10;
  .log.info ("publish"; n; "record");
  h (
    `.tick.upd;
    `trade;
    ([]sym: n ? `a`b`c; price: n ? 1.0; size: n ? 10; side: n ? `B`S; time: n # now[`]);
  );
  h (`.tick.upd; `quote; ([]sym: n ? `a`b`c; bid: n ? 1.0; ask: n ? 1.0; time: n # now[`]));
};

.job.add["pub"; now[`]; now[`] + 1D00:00:00; 0D00:00:01; "publish"];

Start a subscriber instance to subscribe data from the message broker. Trade and quote tables shall be able to query on subscriber instance.

chili -P ./src/pepper/sub.pep

Source code:

upd: {[table; data] table upsert data; tick[this.h; 1]; };

.sub.init: {[]
  h: .handle.open `chili://:18000:sub1:token789;
  .handle.onDisconnected[h; `.sub.recover];
  info: h (`.tick.subscribe; ());
  (set) each info[2];
  // .log.info ("broker info"; info);
  replay[info[0]; 0; info[1]; (); 1b; h];
  .handle.subscribing[h];
};

// this function will be called when the connection is lost, retry every minute until no error
.sub.recover: {[handle]
  .handle.connect[handle];
  info: handle (`.tick.subscribe; ());
  replay[info[0]; tick[0]; info[1]; (); 1b; handle];
  .handle.subscribing[handle];
};

.sub.init[];