An introduction to tools, Open Source and not, for the analysis of real-time events and data.
On the day of Oracle Cloud Day in Milan I held a brief presentation, with an attached demo, entitled “Event Driven Integration”.
In the presentation I explored some technologies commonly used today to analyze real-time large volumes of “streaming” data (Fast Data) and I showed examples of how these processes can be achieved using two technologies that I consider complementary: Apache Kafka and Oracle Stream Analytics, all hosted in Oracle Cloud Infrastructure (OCI), the second-generation Oracle Cloud.
In this story I will tell you the presentation and the demo, with an amplitude and (I hope) a clarity greater than that granted by the time (limited) available to me on Cloud Day.
How to go in deeper details?
In the end, I decided to put this information at the beginning and not at the end, as they seemed to me more and more important.
If you are in the scenario of creating a system in which Kafka’s technology and Streaming Data Analysis topics are at the center, a very valid text, full of well-presented concepts, to read and consult is the following: Designing Event-Driven Systems, by B. Stopford. The book is offered electronically for free from Confluent and can be found here: https://www.confluent.io/designing-event-driven-systems.
Open Source technologies used
In this article and in the demo described, the following Open Source technologies are considered and used:
- Apache Kafka
- Apache Spark
- Oracle MySQL
An Enterprise Message Broker
I am deeply convinced that if you want to create an Event Driven Architecture, a Message Broker must be placed at the center of the Cloud infrastructure, based on the Publish and Subscribe paradigm.
and having the following characteristics:
It must be scalable because all new types of events (and messages), which gradually become available in the company, must be made available to all present and future subscriber. Each type of message is published on a dedicated “topic”, and it is necessary to foresee (and support) a growth over the time of the applications subscribing to each topic.
Reliability is a MUST resulting from placing such a Message Broker at the center of all the overall architecture. At the heart of the system.
Kafka’s technology has proven itself capable of being placed at the center of the overall architecture, fully supporting, in a robust, reliable and scalable way, the paradigm described above. It would be enough to give examples such as Linkedin (where Kafka was born), but also of other companies, such as Netflix, etc, who built their digital platform around Kafka.
A list of the articles produced on Kafka, from Linkedin Engineering, can be found at the URL: https://engineering.linkedin.com/blog/topic/kafka
It may be useful to take a look at them, for example, to understand how big their Kafka clusters are (thousands of brokers) and what data volumes they handle (Petabyte per week).
Kafka allows you to meet the requirements listed above, in brief, because it implements the Message Broker with a cluster of nodes, which replicate the messages reliably.
It is evident that using one of the features of Oracle Cloud Infrastructure, that is the structuring of each Cloud Region in three independent Availability Domains, connected by a high-speed network (25 Gbs), it is possible to create a Kafka cluster able to “survive” even to an event that makes one of the domains unavailable.
For the demo, I chose to install the Confluent distribution in a VM based on Ubuntu Linux.
The Confluent distribution allows me not only a quick installation but also to be able to test the KSQL. It also provides a Registry where to publish the patterns of messages published on the Kafka topic.
Processing of streaming data and an introduction to Oracle Stream Analytics
Apache Kafka has a very complete API, which supports many languages. In addition, over the time, a whole series of features have been added that help in connecting Kafka to the most diverse systems (see, for example, the connectors provided by Kafka Connector) and, not least, the ability to query data arriving on a topic using a clear SQL derivation language, called KSQL.
That said, it is obvious that it is not difficult to build a set of Microservices that provide the most various needs of applications producing and consuming messages that will then be managed through the Kafka topics.
But writing the code from the API is not necessarily the fastest and most reliable way to build powerful and extensible solutions. Especially if we want to build complex elaborations, which foresee correlations both based on “traditional” and “spatial” data. Especially if we have to use complex analysis algorithms to be written, but which must offer “sub-second” response times.
It is to explore this concept, that I want to offer you a short journey through the features and functionality of Oracle Stream Analytics (OSA).
What I think is important to emphasize in the first instance, in the context we have set ourselves, is that:
- OSA can very easily connect to a Kafka cluster and consume messages from one or more Topics
- OSA can “produce” the results of its elaborations in the form of messages, published in topic Kafka
- To process these messages in a scalable and performing manner, OSA uses the parallel Apache Spark engine
Elements of Oracle Streaming Analytics
The OSA components we will explore are, in order:
To place components in a reference context, we can summarize by saying that with a pipeline we can process, filter, data from a stream, coming from a connection. We can apply analysis patterns ready for use. We can create visualizations and compose diagrams and maps in one or more dashboards.
The starting point is the definition of a connection. Here we define the type, and among the expected there is Kafka, and the parameters for the connection.
To create a connection to a Kafka cluster, simply specify hostname and port of the zookeeper. The definition form allows you also to test the connection.
It is interesting to note that among the types there is a connection to an instance of Druid.
A stream is a source of events, connected to a connection, which have a well-defined structure (shape).
The type of messages supported includes: JSON, AVRO, CSV.
The message structure (shape) defines the names and types of data fields. It is possible to define it by hand or infer it from a stream of messages (a very useful feature, that requires a bit of patience).
A pipeline starts from a stream and defines one or more concurrent processing lines.
The pipeline is built by combining queries, filters, summary, applying patterns.
The output to each node of a pipeline can be displayed in the output table, at the bottom of the pipeline editor screen (see Fig. 3).
For example, a pipeline can implement a simple filter and output only messages that meet a certain condition (for example, SPEED> 50) to a “Kafka topic” target.
The available patterns are one of the elements that I find most interesting in OSA.
Patterns can be used as a stage in a pipeline and provide ready-to-use solutions for complex and otherwise complicated processing and filters.
A short list of patterns, among those available:
- Spatial: Speed
- Spatial: geofence
- Spatial: interaction single stream or interaction two stream
For example, if we have a series of messages representing the positions in time of two “mobile vehicles”, with the “interaction single stream” pattern we can evaluate in real-time the distance between the two and alert for an excessive proximity.
As part of a pipeline, we can build one or more visualizations, which we then include in a dashboard. Fig. 6 is a visualization.
We can create visualization of type: chart, histogram, map, etc.
In the dashboard we can group, constructing it in visual mode (drag & drop) a set of visualizations.
Dashboard and rules
With OSA it is easy to implement a rule that filters only those events that satisfy a condition that identifies the violation of a policy (for example: SPEED> 50) in a message stream.
Where the condition occurs, a message field, defined in the SHAPE for this purpose, is updated (VIOLATIONS).
Then we can send messages that satisfy the “VIOLATION = 1” condition to a target topic and construct, on this topic, a second stream that is sent to a Visualization, which we will then include in a dashboard.
This is the achievable result, in which the color of the symbol associated with the vehicle changes, if the violation is slight (green, SPEED <70) or serious (orange, SPEED> 70)
In this way, we see that with OSA we can not only define processing on streaming data, but also build a Web User Interface.
The example made allows us to also appreciate the geo-spatial features of OSA. In the case the possibility to visualize the symbols representing the vehicle, in movement, on a map based on OpenStreetMap.
Pipelines performed in Apache Spark
Each pipeline is published and executed as an application in the Apache Spark engine.
When the pipeline is published in OSA, it is possible to specify the execution parameters (dedicated cores, memory), for a fine tuning and it is possible, via Web UI, to follow the execution and read the logs.
In OSA we can define geographic areas delimited on maps (circles, polygons, …) and we can insert some geospatial patterns into a pipeline. For example, we can analyze a stream of geolocated events (positions of a vehicle) and generate events that demarcate the entry or exit of a vehicle from a geo-fence.
As a stage of a pipeline, we can invoke a model of ML, which has already been trained (if supervised). The model must be described using PMML.
As a use case, we can imagine an analysis pipeline of a credit card payment event stream, and an ML model that determines whether or not the event is an attempted fraud.
Data for the demo
I imagined, once again, having to process data from moving cars. So, data that allow, for example, to place vehicles on a map, to evaluate their mutual distances and a reference area. But also, to identify anomalies, such as exceeding limits (for example speed).
The data format
Today, the most popular format for exchanging messages is the JSON format.
Throughout the article and the demo, we will assume that the format of data sent by vehicles and transported through all processing pipelines is always JSON.
For example, the format of a message coming from a vehicle, containing the data read by the OBD2 port, is as follows:
Of the data contained, some are very important:
- CARID uniquely identifies the vehicle; It could be, for example, the license plate
- LON, LAT are respectively latitude and longitude, provided by the GPS; They will be used to position, in real time, the icons representing vehicles on the map
- SPEED is the speed: a data that can be used, as we will see, to check that the driver does not commit traffic violations
Other data could be used to identify, for example through a Machine Learning model, any anomalies in the operation of the vehicle.
Moreover, the speed and position data can be used to evaluate the behavior of the driver, possibly adding also the data related to the accelerations (not shown in the above-mentioned messages).
If we use any Kafka client capable of reading JSON messages from a file, we can easily simulate, as in the demo, the flow of events from which our demo starts.
To view the position of the vehicles in real time, I decided to use a very well-made Open Source Tracking Server: Traccar.
Traccar is implemented in Java and its installation on a Linux VM, like our VM in the Oracle Cloud, is really simple.
But the most interesting aspect is the multiplicity of integration tools it offers. Among the many, the support for the Owntracks protocol. Traccar provides a REST endpoint to send JSON messages formatted according to this protocol.
To integrate Traccar I had to do nothing but implement a simple Microservice in NodeJS that reads incoming messages from the topic, transforms it into the needed format and sends it to the REST endpoint.
Here is the result:
A fairly detailed documentation about OSA can be found at the URL: https://docs.oracle.com/en/middleware/osa/18.1/using-stream-analytics/using-oracle-stream-analytics.pdf
In particular, in chapter 5 we explain how, using the administration web pages of the underlying Spark Engine, it is possible to analyze and diagnose error conditions (which may occur, for example during the development of a quick demo).