Realtime Stock Trade Analysis With Apache NiFi, Kafka, and Flink (and Python)
In this article we will dive into a way to monitor and trade realtime stock trades using several Apache applications and Python. All resources are available on the GitHub repo. When I say realtime I mean live streaming equity trades since the concept of “realtime” is tricky to define with a time series. This pipeline is able to handle all ~9000 equities (and all other trade feeds) at once since each stage is horizontally scalable (you get what you pay for). If you aren’t familiar with the technologies involved:
- NiFi: data processing application that uses a directed graph to organize a “flow” of data
- Kafka: distributed log that acts as a streaming database using Producers/Consumers of messages
- Flink: distributed processing engine with stateful computations
- Python: Python
For the actual trading strategy, I will be using some stochastic variation functions from my own academic research that require maintaining a state of past log returns.
To receive trade data, I use Polygon as a provider since they have full trade streaming available with WebSockets because who would ever want a message based credit system. The stock trade event schema we’ll be working with is:
- Event Type: String
- Symbol Ticker: String
- Exchange ID: Integer
- Trade ID: String
- Tape: Integer (1=”A”, 2=”B”, 3=”C”)
- Price: Double
- Trade Size: Integer
- Trade Conditions: List[String]
- Trade Timestamp: Unix NS (you know, for nano seconds)
To actually ingest the streaming trade data, I use NiFi’s ConnectWebSocket processor and an input list of equity symbols. Then I push all received trades to a Kafka instance.
Here is an overview of the flow:
The flow centers around the ConnectWebSocket processor that handles the connection, authentication, subscription, and trade message reception which then pushes messages to Kafka with topics keyed by equity symbol.
To be use this graph, simply build the custom NiFi docker image that adds the flow to the base Apache image. Also add your Polygon token to the docker-compose config so NiFi can pull data:
I use Kafka so that I can persist and receive the messages ingested by NiFi. In the event that a NiFi producer or Flink consumer goes down, it doesn’t interrupt the rest of the system and the producer/consumer can be restarted and pull messages from where they left off using consumer offsets. Kafka relies on Zookeeper for broker metadata so we need to also add it to our docker-compose config as well:
I use Flink to subscribe to the trade messages and perform windowed aggregations. Flink is good because you can implement stateful computations with a checkpointing mechanism. Thus if we required a substantial amount of stateful information (incremental aggregations or indicators) for our trade calculations, our pipeline would be fault tolerant enough to quickly recover from any downtime. Sure I could use Spark Streaming but Flink has more complex windowing classes that would prove to be useful in the future with more complexly timed strategies (click-session windowing for equity trades??).
The Scala code for our Flink job (forgive any bad code, I’m still learning the art of Scala):
The trade metrics we calculate are:
From a combination of these papers:
And adding Flink’s containers to the docker-compose config:
I use a basic Python class to implement an agent that subscribes to aggregate signals and, on message polls, updates its state and places orders. The agent also uses several subclasses to hold state about the portfolio, namely a portfolio class that has handles to several asset instances to hold current portfolio positions and calculate stats. For the broker, I’m using Alpaca since they have $0 commission equity trades and access to a sandbox API with a free account on sign-up.
On each message poll to Kafka (given a polling frequency that can be altered to fit how often the agent should trade), the agent updates its internal state, ranks its portfolio, generates new equity weights, and submits orders to rebalance. Here is the same thing I just said but in box form:
An interesting feature of this design is that one could have N number of agents each implementing a different portfolio, ranking strategy, and/or update frequency. A result of fully decoupling each layer in the pipeline, each component can be specialized and tuned.
Once you have an account, add the API key and secret to the docker-compose config under the agent.environment key so the agent has access:
And set the Dockerfile config with these steps:
Build and Run
We need to build the submittable Flink jar application and the custom NiFi image before running the pipeline. Using a Makefile, I define two targets:
And then add the entry points for our pipeline:
Since we didn’t bundle our Flink jar, we must submit the job manually to the cluster using the
submit-flinktarget but only after we first start all of our other services using the
make run first builds our Flink job:
And our custom NiFi image:
make runthen, once the Kafka topics are created by NiFi, then run
make submit-flink to submit the Flink job. Be sure to run this during market hours or there won’t be any trade data to pull and operate on.
NiFi will start pulling messages and pushing to their respective Kafka topics from which Flink will calculate our metrics over finally pushing the aggregates back to Kafka for the Python trading Agent to receive and trade upon.
That’s the way the cookie crumbles
Thanks for reading, I’m not that interested in over-engineering this pipeline anymore so hopefully I passed at least a minuscule crumb of knowledge onto you. I pray to the code gods that your copy and paste of this project worked perfectly the first time 🎲 (honestly there have been random encoding errors but I swear this pipeline has worked most of the time).