Data Distribution Service

Da Wikipedia, l'enciclopedia libera.

Il Data Distribution Service for Real Time Systems (DDS) è uno standard emanato dall'Object Management Group (OMG) che definisce un middleware per la distribuzione di dati in tempo reale secondo il paradigma publish/subscribe.

Introduzione[modifica | modifica sorgente]

Il DDS è un middleware per sistemi incentrati sui dati (dall'anglosassone data-centric systems), ovvero sistemi distribuiti il cui funzionamento è basato sullo scambio di dati in tempo reale da più sorgenti a più destinazioni. Esempi tipici sono i sistemi di controllo, i sistemi di difesa, i sistemi di trading finanziario, ecc. I vantaggi dell'uso di DDS in questo dominio sono i seguenti:

Il paradigma publish/subscribe alla base del DDS.
  • Accoppiamento lasco tra entità e semplicità d'uso grazie all'uso del paradigma publish/subscribe nella variante topic-based;
  • Architettura flessibile ed adattabile grazie al discovery automatico;
  • Efficienza grazie alla comunicazione diretta tra publisher e subscriber;
  • Determinismo nella consegna dei dati;
  • Scalabilità elevata come conseguenza dell'accoppiamento lasco tra entità;
  • Qualità di Servizio altamente parametrizzabile;
  • Indipendenza dalla piattaforma poiché l'interfaccia del middleware è definita tramite IDL;

Grazie a queste caratteristiche, DDS sta rapidamente guadagnando terreno nel mercato dei middleware per sistemi distribuiti in tempo reale, soprattutto nel dominio delle applicazioni militari e del controllo del traffico aereo.

Storia dello standard[modifica | modifica sorgente]

Lo standard DDS è stato emanato da OMG sotto la spinta di due produttori, la californiana Real Time Innovations ed il gruppo francese Thales, che hanno sviluppato rispettivamente NDDS e SPLICE per lo stesso settore di mercato. La specifica DDS 1.0 ha così uniformato l'interfaccia di programmazione di questi due prodotti ed ha contribuito alla nascita di ulteriori implementazioni, come OpenDDS di Object Computing, Inc. (OCI), basata su TAO e quindi open-source.

Con la specifica DDS 1.1 (datata dicembre 2005) vengono aggiunte allo standard una serie di specifiche da rispettare per garantire l'interoperabilità tra le diverse implementazioni.

La versione corrente dello standard è DDS 1.2 ed è stata approvata nel gennaio 2007.

Il DDS SIG di OMG sta ora lavorando alla versione DDS 1.3.

Architettura[modifica | modifica sorgente]

Architettura del DDS.

Il Data Distribution Service è concepito come una soluzione infrastrutturale alla programmazione di applicazioni incentrate sui dati. Il suo scopo è di nascondere interamente le problematiche della gestione della comunicazione nella programmazione di applicazioni data-centric.

Lo standard è rappresentato da una serie di API suddivise in due livelli:

  • DCPS, Data Centric Publish/Subscribe, è il livello inferiore di DDS che definisce le entità, i ruoli, le interfacce e le policy di QoS per la piattaforma publish/subscribe, nonché le tecniche di discovery dei partecipanti alla comunicazione. DCPS rappresenta in sostanza la parte dello standard relativa alla comunicazione di rete.
  • DLRL, Data Local Reconstruction Layer, è il livello superiore di DDS che definisce il modello di interazione tra il mondo ad oggetti dell'applicazione ed i dati provenienti da DCPS. Tramite DLRL è possibile mappare i dati scambiati all'interno di un topic con un oggetto del livello applicativo, in modo tale da propagare in modo automatico e trasparente gli aggiornamenti dell'oggetto dall'applicazione verso la rete e viceversa. DLRL è dichiarato opzionale all'interno di DDS.

Entità DCPS[modifica | modifica sorgente]

Entità definite in DCPS.

IN DCPS sono definite le seguenti entità:

DomainParticipantFactory
Un Singleton che funge da Factory per l'accesso allo spazio di comunicazione in DDS, chiamato Global Data Space (GDS). Il GDS è diviso in domini, ognuno dei quali è identificato da un nome. All'interno di un dominio si possono avere diversi topic. Due topic con lo stesso nome possono coesistere in DDS solo all'interno di diversi domini.
DomainParticipant
Punto di accesso per la comunicazione in uno specifico dominio. Un DomainParticipant può essere creato solo attraverso il DomainParticipantFactory. Il DomainParticipant a sua volta funge da Factory per la creazione di Topic, Publisher e Subsbriber.
Topic
Partizione tipizzata del dominio di comunicazione, all'interno del quale i Publisher ed i Subscriber pubblicano e ricevono i dati. Ogni Topic è caratterizzato da un nome e da un tipo. Il nome identifica il topic all'interno del dominio ed il tipo caratterizza i dati scambiati all'interno del topic stesso. Il tipo associato da un topic è specificato attraverso il linguaggio IDL e può essere visto come una struttura che può comprendere sia tipi primitivi che tipi complessi. Per distinguere tra i diversi oggetti di un topic si utilizza il concetto di chiave, preso in prestito dal modello Entità-Relazione tipico dei database relazionali. Una chiave è quindi composta da un membro o da una combinazione di membri del tipo.
Publisher
Entità responsabile della disseminazione dei dati provenienti dai diversi DataWriter ad esso associati. Un Publisher funge da Factory per la creazione dei DataWriter.
DataWriter
Punto di accesso per la pubblicazione di dati all'interno di un topic. Il DataWriter viene creato a partire da un Publisher, il quale a sua volta è associato univocamente ad un Topic. Per questo motivo il DataWriter è una entità astratta che viene resa concreta tipizzando la sua interfaccia con il tipo di dato corrispondente al topic a cui il DataWriter si riferisce.
Subscriber
Entità responsabile della ricezione dei dati pubblicati sul topic a cui esso è associato. Funge da Factory per la creazione dei DataReader ed a run-time si occupa di smistare i dati ricevuti ai diversi DataReader ad esso associati.
DataReader
Punto di accesso per la ricezione di dati all'interno di un topic. Come il DataWriter, anche il DataReader è una entità astratta che viene resa concreta tipizzando la sua interfaccia con il tipo di dato corrispondente al topic a cui il DataReader si riferisce.

Qualità del Servizio[modifica | modifica sorgente]

Data Distribution Service offre una vasta gamma di parametri relativi alla qualità di servizio, impostabili sulla base della singola entità coinvolta nella comunicazione. Questa scelta di progetto fa di DDS l'unico standard attualmente disponibile per la comunicazione publish/subscribe in tempo reale.

I parametri relativi alla qualità del servizio in DDS possono essere suddivisi in tre grandi categorie:

  • Affidabilità della comunicazione
  • Persistenza dei dati
  • Tempistica e priorità

In base a questa classificazione vengono di seguito elencati i parametri di maggiore rilevanza concettuale, indicando le entità alle quali il parametro si riferisce.

Affidabilità della comunicazione[modifica | modifica sorgente]

RELIABILITY (si applica a Topic, DataReader, DataWriter)
Specifica se la connessione tra writers e readers è di tipo affidabile (senza perdita di messaggi) o meno (best-effort). Se questa politica è impostata su best-effort (impostazione di default), non è prevista la ritrasmissione dei messaggi. Se invece è impostata su reliable, è possibile specificare il tempo massimo di blocco della write() nel caso in cui le code di ritrasmissione siano piene. Si noti che tramite questo parametro di QoS è possibile avere una consegna affidabile dei dati anche utilizzando un protocollo di trasporto inaffidabile (tipo UDP).
DESTINATION ORDER (si applica a Topic, DataReader)
Determina come devono essere ordinati nel subscriber i dati in arrivo, dunque in quale ordine essi devono essere presentati all’applicazione. L’ordinamento pu` avvenire in base al tempo di pubblicazione (timestamp del writer) o al tempo di notifica (timestamp del reader, impostazione di default).
OWNERSHIP & STRENGTH (si applica a Topic (Ownership), DataWriter (Strength))
Ownership specifica se un topic può essere aggiornato da uno (Exclusive) o più DataWriter (Shared). Nel caso Exclusive, il topic viene aggiornato dal writer col parametro Strength più alto. In questo modo pu` essere implementato un meccanismo di tolleranza ai guasti: nel momento in cui il writer primario (con strength più alta) viene meno, il topic continua ad essere aggiornato dal writer secondario (con strength più bassa).
LIVELINESS (si applica a Topic, DataReader, DataWriter)
Specifica come l’infrastruttura DDS (in particolare il servizio di Discovery) determini la presenza dei partecipanti alla comunicazione. In termini implementativi rappresenta l'intervallo di tempo in cui una entità deve manifestare la sua presenza attraverso un segnale (HEARTBEAT) altrimenti viene considerata non più presente. Sono possibili tre valori: automatico, manuale impostato sul participant e manuale impostato sul topic. Nel caso automatico (default), viene definito un sotto-parametro indicante ogni quanto viene inviato il segnale di presenza dell’entità. Nel caso manuale da parte del participant o del topic una entità viene considerata attiva fin quando è attivo il partecipant od il topic a cui essa afferisce.

Esempi d'uso[modifica | modifica sorgente]

Nei seguenti esempi il publisher pubblica dati relativi al topic "Example HelloWorld" ad intervallo di un secondo. Dall'altro lato il subscriber preleva i dati non appena questi vengono ricevuti e li stampa a video. Il tipo di dato associato al topic "Example HelloWorld" è definito attraverso il linguaggio IDL nel seguente modo:

struct HelloWorld {
    string name<64>;
    string msg<256>;
}

La specifica IDL viene poi tradotta nel linguaggio utilizzato per lo sviluppo dell'applicazione. In C e C++ darà luogo ad una struttura del tipo:

struct HelloWorld {
    char* name;
    char* msg;
}

mentre in Java si avrà la seguente classe:

public class HelloWorld {
    public String name;
    public String msg;
}

Dal tipo IDL vengono anche generate le interfacce tipizzate per la lettura e la scrittura di dati sul topic, ovvero HelloWorldDataReader ed HelloWorldDataWriter.

Per semplicità di trattazione, nei seguenti esempi si utilizzano per ogni entità le politiche di QoS di default. Per approfondimenti riguardo alla parametrizzazione della QoS si faccia riferimento allo standard.

C++[modifica | modifica sorgente]

Publisher Side[modifica | modifica sorgente]

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> // needed for sleep()
 
#include <dds_cpp.h>
using DDS;
 
int main(int argc, char *argv[])
{
    int domainId = 0;
    int sample_count = 100;
    DomainParticipant *participant = NULL;
    Publisher *publisher = NULL;
    Topic *topic = NULL;
    DataWriter *writer = NULL;
    HelloWorldDataWriter *helloWriter = NULL;
    HelloWorld instance;
    InstanceHandle_t instance_handle = HANDLE_NIL;
    const char *type_name = NULL;
    int count = 0;
 
    participant = TheParticipantFactory->create_participant(
        domainId, PARTICIPANT_QOS_DEFAULT, NULL /* listener */, 
        STATUS_MASK_NONE);
 
    publisher = participant->create_publisher(
        PUBLISHER_QOS_DEFAULT, NULL /* listener */, 
        STATUS_MASK_NONE);
 
    type_name = HelloWorldTypeSupport::get_type_name();
    HelloWorldTypeSupport::register_type(
        participant, type_name);
 
    topic = participant->create_topic(
        "Example HelloWorld",
        type_name, TOPIC_QOS_DEFAULT, NULL /* listener */,
        STATUS_MASK_NONE);
 
    writer = publisher->create_datawriter(
        topic, DATAWRITER_QOS_DEFAULT, NULL /* listener */,
        STATUS_MASK_NONE);
    helloWriter = HelloWorldDataWriter::narrow(writer);
 
    strcpy(instance.name, "MyName");
    instance_handle = HelloWorld_writer->register_instance(instance);
 
    /* Main loop */
    for (count=0; (sample_count == 0) || (count < sample_count); ++count) {
        sprintf(instance.msg,  "Hello World! (count %d)", count);
        printf("Writing: %s", instance.msg);
        helloWriter->write(*instance, instance_handle);
        sleep(1);
    }
 
    participant->delete_contained_entities();
    TheParticipantFactory->delete_participant(participant);
}

Subscriber side[modifica | modifica sorgente]

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> // needed for sleep()
 
#include <dds_cpp.h>
 
using DDS;
 
/* Listener used to receive notifications on data updates */
class HelloWorldListener : public DataReaderListener {
  public:
    virtual void on_data_available(DataReader* reader);
};
 
void HelloWorldListener::on_data_available(DataReader* reader)
{
    HelloWorldDataReader *HelloWorld_reader = NULL;
    HelloWorldSeq data_seq;
    SampleInfoSeq info_seq;
 
    HelloWorld_reader = HelloWorldDataReader::narrow(reader);
 
    retcode = HelloWorld_reader->take(
        data_seq, info_seq, LENGTH_UNLIMITED,
        ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE);
 
    for (i = 0; i < data_seq.length(); ++i) {
        if (info_seq[i].valid_data) {
            printf("From %s: %s\n", data_seq[i].name, data_seq[i].msg);
        }
    }
 
    retcode = HelloWorld_reader->return_loan(data_seq, info_seq);
}
 
int main(int argc, char *argv[])
{
    int domainId = 0;
 
    DomainParticipant *participant = NULL;
    Subscriber *subscriber = NULL;
    Topic *topic = NULL;
    HelloWorldListener *reader_listener = NULL; 
    DataReader *reader = NULL;
    ReturnCode_t retcode;
    const char *type_name = NULL;
 
    participant = TheParticipantFactory->create_participant(
        domainId, participant_qos, NULL /* listener */, STATUS_MASK_NONE);
 
    subscriber = participant->create_subscriber(
        SUBSCRIBER_QOS_DEFAULT, NULL /* listener */, STATUS_MASK_NONE);
 
    type_name = HelloWorldTypeSupport::get_type_name();
    retcode = HelloWorldTypeSupport::register_type(
        participant, type_name);
 
    topic = participant->create_topic(
        "Example HelloWorld",
        type_name, TOPIC_QOS_DEFAULT, NULL /* listener */,
        STATUS_MASK_NONE);
 
    /* Create data reader listener */
    reader_listener = new HelloWorldListener();
 
    reader = subscriber->create_datareader(
        topic, DATAREADER_QOS_DEFAULT, reader_listener,
        STATUS_MASK_ALL);
 
    /* Main loop. Does nothing. Action taken in listener */
    for (count=0; (sample_count == 0) || (count < sample_count); ++count) {
        printf("HelloWorld subscriber sleeping for %d sec...\n",
               receive_period.sec);
        sleep(10);
    }
 
    participant->delete_contained_entities();
    TheParticipantFactory->delete_participant(participant);
}

Collegamenti esterni[modifica | modifica sorgente]