Analisi di dati real-time (Cloud Day 2018)

Luigi Saetta
10 min readNov 21, 2018

--

Un’introduzione a strumenti, Open Source e non, per l’analisi di eventi e dati real-time.

Introduzione

Nella giornata del Cloud Day a Milano ho tenuto una breve presentazione, con demo annessa, intitolata “Event Driven Integration”.

Nella presentazione ho esplorato alcune tecnologie oggi comunemente utilizzate per analizzare real-time grandi volumi di dati “streaming” (Fast Data) ed ho mostrato esempi di come queste elaborazioni possano essere realizzate utilizzando due tecnologie che io considero come complementari: Apache Kafka ed Oracle Stream Analytics, il tutto ospitato in Oracle Cloud Infrastructure (OCI), il Cloud Oracle di seconda generazione.

In questa storia vi racconto la presentazione e la demo, con un’ampiezza e (spero) una chiarezza maggiore di quella concessa dal tempo (limitato) a mia disposizione nella giornata del Cloud Day.

Come approfondire?

Alla fine, ho deciso di mettere queste informazioni all’inizio e non alla fine, in quanto mi sono sembrate via via più importanti.

Se vi trovate nello scenario di realizzare un sistema in cui la tecnologia di Kafka e le tematiche di analisi di dati streaming siano al centro, un testo molto valido, pieno di concetti ben esposti, da leggere e consultare è il seguente: Designing Event-Driven Systems, di B. Stopford. Il libro è offerto in formato elettronico gratuitamente dalla Confluent ed è reperibile qui.

Le tecnologie Open Source utilizzate

In questo articolo e nella demo descritta sono considerate ed impiegate le seguenti tecnologie Open Source:

  • Apache Kafka
  • Apache Spark
  • Oracle MySQL
  • NodeRED
  • Traccar
  • Java

Un Enterprise Message Broker

Sono profondamente convinto del fatto che se si vuole realizzare un Event Driven Architecture, al centro dell’infrastruttura Cloud deve essere messo un Message Broker, basato sul paradigma Publish e Subscribe

Fig. 1: Il paradigma Publish&Subscribe

e che abbia le seguenti caratteristiche:

  • scalabile
  • robusto
  • affidabile
  • veloce

Deve essere scalabile perché tutte le nuove tipologie di eventi (e messaggi), che via via si rendono disponibili in azienda, devono essere messi a disposizione di tutti i presenti e futuri subscriber. Ogni tipologia di messaggio è pubblicata su una “topic”, dedicata, e si deve poter prevedere (e supportare) una crescita nel tempo delle applicazioni che sottoscrivono a ciascuna topic.

L’affidabilità consegue dal porre un tale Message Broker al centro di tutta l’architettura complessiva. Il cuore del sistema.

La tecnologia di Kafka si è dimostrata del tutto in grado di essere posta al centro dell’architettura complessiva, supportando in pieno, in modo robusto, affidabile e scalabile, il paradigma su enunciato. Basterebbe fare gli esempi di Linkedin (dove Kafka è nato), ma anche di altre aziende, quali Netflix, etc, che hanno costruito la loro piattaforma digitale intorno a Kafka.

Una lista degli articoli prodotti su Kafka, dall’engineering di Linkedin, si può trovare alla URL: https://engineering.linkedin.com/blog/topic/kafka. Può essere utile dargli uno sguardo per capire, ad esempio, quanto sono grandi i loro cluster Kafka (migliaia di broker) e quali volumi di dati trattano (Petabyte per settimana).

Kafka permette di soddisfare i requisiti su elencati, in sintesi, perché implementa il message broker con un cluster di nodi, che replicano i messaggi in modo affidabile.

E’ evidente che utilizzando una delle caratteristiche di Oracle Cloud Infrastructure, ovvero la strutturazione di ogni Cloud Region in tre Availability Domain indipendenti e collegati da una rete ad alta velocità (25 Gbs), è possibile realizzare un cluster Kafka in grado di “sopravvivere” anche ad un evento che renda indisponibile uno degli availability domain.

Fig. 2.

Per la demo, ho scelto di installare la distribuzione Confluent in una VM basata su Ubuntu Linux.

La distribuzione Confluent mi consente non soltanto una veloce installazione ma anche di poter testare il KSQL. Inoltre provvede anche ad un Registry ove pubblicare gli schemi dei messaggi pubblicati sulle topic Kafka.

Elaborazione di dati streaming ed introduzione ad Oracle Stream Analytics

Apache Kafka ha un API molto completa, che supporta numerosi linguaggi. Inoltre, nel tempo, sono state aggiunte tutta una serie di funzionalità che aiutano nel connettere Kafka ai sistemi più vari (si vedano, ad esempio, i connettori previsti da Kafka Connector) e non ultima la possibilità di interrogare i dati che arrivano su una topic utilizzando un linguaggio di chiara derivazione SQL, chiamato KSQL.

Detto questo, è ovvio che non è difficilissimo costruire un insieme di Microservizi che provvedano alle più varie necessità delle applicazioni che producono e consumano i messaggi che poi saranno gestiti tramite le Kafka topics.

Ma la scrittura di codice a partire dalle API non è necessariamente la strada più veloce ed affidabile, per costruire soluzioni potenti ed estensibili. Sopratutto se vogliamo costruire elaborazioni complesse, che prevedono correlazioni sia basate su dati “tradizionali” che “spaziali”. Sopratutto se dobbiamo utilizzare algoritmi di analisi complessi da scrivere, ma che devono offrire tempi di risposta «sub-second».

E’ per esplorare questo concetto, che voglio proporvi un breve viaggio tra le caratteristiche e funzionalità di Oracle Stream Analytics (OSA).

Quello che ritengo sia importante enfatizzare in prima istanza, nel contesto che ci siamo dati, è che:

  • OSA può in maniera molto semplice collegarsi ad un cluster Kafka e consumare messaggi da una o più Topics
  • OSA può “produrre” i risultati delle sue elaborazioni sotto forma di messaggi, pubblicati in topic Kafka
  • Per elaborare in maniera scalabile e performante questi messaggi OSA utilizza il motore parallelo di Apache Spark

Elementi di Oracle Streaming Analytics

I componenti di OSA che esploreremo sono, nell’ordine:

  • Connections
  • Streams
  • Pipelines
  • Patterns
  • Visualizations
  • Dashboards
  • Geofence

Per collocare i componenti in un contesto di riferimento, possiamo sintetizzare dicendo che con una pipeline possiamo elaborare, filtrare, i dati di uno stream, provenienti da una connessione. Possiamo applicare pattern di analisi già pronti per l’uso. Possiamo creare visualizzazioni e comporre diagrammi e mappe in una o più Dashboard.

Fig. 3: Costruzione di una pipeline in OSA

Connessioni.

Il punto di partenza è la definizione di una connessione. Qui definiamo il tipo, e tra i previsti vi è Kafka, ed i parametri per la connessione.

Per creare una connessione ad un cluster Kafka è sufficiente specificare hostname e port del zookeeper. La form di definizione consente di testare la connessione.

E’ interessante far notare che tra le tipologie è prevista la connessione ad un’istanza di Druid.

Fig. 4: Tipologie di connessioni

Streams

Uno stream è una sorgente di eventi, collegata ad una connessione, che hanno una struttura ben definita (shape).

La tipologia di messaggi supportati prevede: JSON, AVRO, CSV

La struttura (shape) dei messaggi definisce i nomi e la tipologia dei campi dati. E’ possibile definirla a mano oppure inferirla da uno stream di messaggi (una funzionalità molto utile ma che richiede un pizzico di pazienza).

Pipeline

Una pipeline parte da uno stream e definisce una o più linee concorrenti di elaborazione.

La pipeline si costruisce combinando query, filtri, summary, applicando pattern.

L’output ad ogni nodo di una pipeline può essere visualizzato nella output table, nella parte bassa della schermata del pipeline editor (vedi fig . 3).

Una pipeline può, ad esempio, implementare un semplice filtro e produrre in uscita, su un target di tipo “Kafka topic”, soltanto i messaggi che soddisfano una certa condizione (ad esempio, SPEED > 50).

Fig. 5: pipeline editor

Pattern

I pattern a disposizione sono uno degli elementi che trovo più interessanti in OSA.

I pattern sono utilizzabili come stage di una pipeline e forniscono delle soluzioni pronte all’uso per elaborazioni e filtri altrimenti complessi da realizzare.

Una breve lista di pattern, tra quelli disponibili:

  • Outlier
  • Spatial: Speed
  • Spatial: geofence
  • Spatial: interaction single stream o interaction two stream
  • Trend

Ad esempio se abbiamo una serie di messaggi che rappresentano le posizioni nel tempo di due “mobili”, con il pattern “interaction single stream” possiamo valutare in real-time la distanza tra i mobili e segnalare un eccessiva vicinanza.

Visualization

Come parte di una pipeline possiamo costruire una o più visualization, che poi andiamo ad includere in una dashboard. La fig. 6 è una visualization.

Posiamo creare visualization di tipo: grafico, istogramma, mappa, etc.

Dashboard

Nella dashboard possiamo raggruppare, costruendola in modalità visuale (drag & drop) un insieme di visualization.

Dashboard e regole

Con OSA è semplice implementare una regola che, in uno stream di messaggi, filtra soltanto quelli che soddisfano una condizione che identifica la violazione di una policy (come esempio: SPEED > 50).

Laddove la condizione si verifica, è aggiornato un campo del messaggio, definito nella SHAPE allo scopo (VIOLATIONS).

Poi possiamo mandare i messaggi che soddisfano la condizione VIOLATION = 1 ad una topic target e costruire, su questa topic, un secondo stream che è inviato ad una Visualization, che poi includeremo in una dashbord.

Questo è il risultato ottenibile, in cui il colore del simbolo associato al veicolo cambia, se la violazione è lieve (verde, SPEED < 70) o grave (arancio, SPEED > 70)

Fig. 6: Veicoli che violano una policy, visualizzati su mappa. Il colore indica la gravità della violazione

In questo modo vediamo che con OSA possiamo non soltanto definire le elaborazioni sui dati streaming, ma anche costruire una User Interface Web.

L’esempio fatto consente di apprezzare anche le funzionalità geo-spaziali di OSA. Nel caso la possibilità di visualizzare i simboli che rappresentano i veicolo, in movimento, su una mappa basata su OpenStreetMap.

Pipeline eseguite in Apache Spark

Ogni pipeline è pubblicata ed eseguita come un’applicazione nel motore di Apache Spark.

Quando la pipeline è pubblicata in OSA è possibile specificare i parametri di esecuzione (core dedicati, memoria), per un fine tuning ed è possibile tramite Web UI seguire l’esecuzione e consultare i log.

Fig. 7

Geofence.

In OSA possiamo definire delle aree geografiche delimitate su mappe (cerchi, poligoni, etc) e possiamo inserire in una pipeline alcuni pattern di tipo geospaziale. Ad esempio possiamo analizzare uno stream di eventi geolocalizzati (posizioni di un veicolo) e generare eventi che demarcano l’ingresso o uscita di un veicolo da una geo-fence.

Machine Learning.

Come stage di una pipeline, possiamo invocare un modello di ML, di cui sia stato già effettuato il training (se di tipo supervised). Il modello deve essere descritto utilizzando il PMML.

Come caso d’uso, possiamo immaginare una pipeline di analisi di uno stream di eventi di pagamento mediante carta di credito, ed un modello ML che stabilisce se l’evento è o meno un tentativo di frode.

I dati per la demo

Ho immaginato, ancora una volta, di dover elaborare dati provenienti da autoveicoli in movimento. Quindi dati che consentono, ad esempio, di collocare i veicoli su una mappa, valutarne le distanze reciproche e da un’area di riferimento. Ma anche di identificare anomalie, quali il superamento di limiti (ad esempio di velocità).

Il formato dei dati

Oggi il formato più adottato per lo scambio di messaggi è sicuramente il formato JSON.

In tutto l’articolo e la demo assumeremo che il formato dei dati inviati dai veicoli e trasportato attraverso tutte le pipeline di elaborazione sia sempre JSON.

Ad esempio, il formato di un messaggio che proviene da un veicolo, contenente i dati letti dalla porta OBD2, è il seguente:

{“COOLANT_TEMP”:65,”ALT”:47,”SPEED”:77,”DISTANCE”:1754,”LON”:12.521799579274395,”MAF”:17.94,”LAT”:41.8970927456784,”WARMUPS”:126,”CARID”:”RM_009_AB”,”RPM”:1424,”DTIME”:”27–01–2018 09:03:20",”AMBIANT_AIR_TEMP”:9,”THROTTLE_POS”:89.41,”INTAKE_TEMP”:14,”ENGINE_LOAD”:24.71,”RUN_TIME”:530}

Dei dati contenuti, alcuni sono molto importanti:

  • CARID identifica univocamente il veicolo; Potrebbe essere, ad esempio, la targa
  • LON, LAT sono rispettivamente latitudine e longitudine, forniti dal GPS; Saranno utilizzati per posizionare, in tempo reale, le icone che rappresentano i veicoli su una mappa
  • SPEED è la velocità: un dato che può essere utilizzato, come vedremo, per controllare che il guidatore non commetta violazioni del codice stradale

Altri dati potrebbero essere utilizzati per identificare, ad esempio tramite un modello di Machine Learning, eventuali anomalie nel funzionamento del veicolo.

Inoltre i dati di velocità e posizione possono essere utilizzati per valutare il comportamento del driver, eventualmente aggiungendo anche i dati relativi alle accelerazioni (non mostrati nel msg sopra riportato).

Se utilizziamo un qualsiasi client Kafka in grado di leggere i messaggi JSON da un file, possiamo facilmente simulare, come nella demo, il flusso di eventi da cui parte la nostra demo.

Traccar

Per visualizzare in real-time la posizione dei veicoli ho deciso di utilizzare un Tracking Server Open Source molto ben fatto: Traccar.

Traccar è implementato in Java e la sua installazione su una VM Linux, come la nostra VM nel Cloud Oracle, è veramente semplice.

Ma l’aspetto più interessante è la molteplicità di strumenti di integrazione che offre. Tra i tanti, il supporto per l’Owntracks protocol. Traccar mette a disposizione un endpoint REST a cui inviare messaggi JSON formattati secondo tale protocollo. Per integrare Traccar non ho dovuto far altro che implementare un semplice Microservizio in NodeJS che legge dalla topic, trasforma nel formato necessario ed invia all’endpoint REST. Ecco il risultato:

Fig. 8: Posizione dei veicoli su mappa (Traccar). Si, siamo a Roma!

Come side effect, Traccar è stato configurato in modo da salvare tutte le posizioni, con relativi timestamp, in un database MySQL. Lo schema del database è semplice da interpretare e ciò apre ad altre interessanti realizzazioni.

Documentazione

Una documentazione abbastanza dettagliata di OSA può essere reperita alla URL: https://docs.oracle.com/en/middleware/osa/18.1/using-stream-analytics/using-oracle-stream-analytics.pdf

In particolare, nel capitolo 5 si spiega come, utilizzando le schermate del sottostante Spark Engine, è possibile analizzare e diagnosticare condizioni di errore (che possono capitare, ad esempio nello sviluppo un po’ rapido di una demo).

--

--

Luigi Saetta
Luigi Saetta

Written by Luigi Saetta

Born in the wonderful city of Naples, but living in Rome. Always curious about new technologies and new things., especially in the AI field.

No responses yet