Edit this page

A client SDK for Java in order to interact with digital twins provided by an Eclipse Ditto backend.

Features

  • Digital twin management: CRUD (create, read, update, delete) of Ditto things
  • Change notifications: consume notifications whenever a “watched” digital twin is modified
  • Send/receive messages to/from devices connected via a digital twin
  • Use the live channel in order to react on commands directed to devices targeting their “live” state

Communication channel

The Ditto Java client interacts with an Eclipse Ditto backend via Ditto’s WebSocket sending and receiving messages in Ditto Protocol.

Usage

Maven coordinates:

<dependency>
   <groupId>org.eclipse.ditto</groupId>
   <artifactId>ditto-client</artifactId>
   <version>${ditto-client.version}</version>
</dependency>

Instantiate & configure a new Ditto client

To configure your Ditto client instance, use the org.eclipse.ditto.client.configuration package in order to

  • create instances of AuthenticationProvider and MessagingProvider
  • create a DisconnectedDittoClient instance
  • obtain a DittoClient instance asynchronously by calling .connect()

For example:

ProxyConfiguration proxyConfiguration =
    ProxyConfiguration.newBuilder()
        .proxyHost("localhost")
        .proxyPort(3128)
        .build();

AuthenticationProvider authenticationProvider =
    AuthenticationProviders.clientCredentials(ClientCredentialsAuthenticationConfiguration.newBuilder()
        .clientId("my-oauth-client-id")
        .clientSecret("my-oauth-client-secret")
        .scopes("offline_access email")
        .tokenEndpoint("https://my-oauth-provider/oauth/token")
        // optionally configure a proxy server
        .proxyConfiguration(proxyConfiguration)
        .build());

MessagingProvider messagingProvider =
    MessagingProviders.webSocket(WebSocketMessagingConfiguration.newBuilder()
        .endpoint("wss://ditto.eclipseprojects.io")
        // optionally configure a proxy server or a truststore containing the trusted CAs for SSL connection establishment
        .proxyConfiguration(proxyConfiguration)
        .trustStoreConfiguration(TrustStoreConfiguration.newBuilder()
            .location(TRUSTSTORE_LOCATION)
            .password(TRUSTSTORE_PASSWORD)
            .build())
        .build(), authenticationProvider);

DisconnectedDittoClient disconnectedDittoClient = DittoClients.newInstance(messagingProvider);

disconnectedDittoClient.connect()
    .thenAccept(this::startUsingDittoClient)
    .exceptionally(error -> disconnectedDittoClient.destroy());

Use the Ditto client

Manage twins

client.twin().create("org.eclipse.ditto:new-thing").handle((createdThing, throwable) -> {
    if (createdThing != null) {
        System.out.println("Created new thing: " + createdThing);
    } else {
        System.out.println("Thing could not be created due to: " + throwable.getMessage());
    }
    return client.twin().forId(thingId).putAttribute("first-updated-at", OffsetDateTime.now().toString());
}).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!

Subscribe for change notifications

In order to subscribe for events emitted by Ditto after a twin was modified, start the consumption on the twin channel:

client.twin().startConsumption().toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for Twin events");
client.twin().registerForThingChanges("my-changes", change -> {
   if (change.getAction() == ChangeAction.CREATED) {
       System.out.println("An existing Thing was modified: " + change.getThing());
       // perform custom actions ..
   }
});

There is also the possibility here to apply server side filtering of which events will get delivered to the client:

client.twin().startConsumption(
   Options.Consumption.filter("gt(features/temperature/properties/value,23.0)")
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for Twin events");
client.twin().registerForFeaturePropertyChanges("my-feature-changes", "temperature", "value", change -> {
   // perform custom actions ..
});
Subscribe to enriched change notifications

In order to use enrichment in the Ditto Java client, the startConsumption() call can be enhanced with the additional extra fields:

client.twin().startConsumption(
   Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location"))
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
client.twin().registerForThingChanges("my-enriched-changes", change -> {
   Optional<JsonObject> extra = change.getExtra();
   // perform custom actions, making use of the 'extra' data ..
});

In combination with a filter, the extra fields may also be used as part of such a filter:

client.twin().startConsumption(
   Options.Consumption.extraFields(JsonFieldSelector.newInstance("attributes/location")),
   Options.Consumption.filter("eq(attributes/location,\"kitchen\")")
).toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
// register the callbacks...

Send/receive messages

Register for receiving messages with the subject hello.world on any thing:

client.live().startConsumption().toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!
System.out.println("Subscribed for live messages/commands/events");
client.live().registerForMessage("globalMessageHandler", "hello.world", message -> {
   System.out.println("Received Message with subject " +  message.getSubject());
   message.reply()
      .statusCode(HttpStatusCode.IM_A_TEAPOT)
      .payload("Hello, I'm just a Teapot!")
      .send();
});

Send a message with the subject hello.world to the thing with ID org.eclipse.ditto:new-thing:

client.live().forId("org.eclipse.ditto:new-thing")
   .message()
   .from()
   .subject("hello.world")
   .payload("I am a Teapot")
   .send(String.class, (response, throwable) ->
      System.out.println("Got response: " + response.getPayload().orElse(null))
   );

Manage policies

Read a policy:

Policy retrievedPolicy = client.policies().retrieve(PolicyId.of("org.eclipse.ditto:new-policy"))
   .toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!

Create a policy:

Policy newPolicy = Policy.newBuilder(PolicyId.of("org.eclipse.ditto:new-policy"))
   .forLabel("DEFAULT")
   .setSubject(Subject.newInstance(SubjectIssuer.newInstance("nginx"), "ditto"))
   .setGrantedPermissions(PoliciesResourceType.policyResource("/"), "READ", "WRITE")
   .setGrantedPermissions(PoliciesResourceType.thingResource("/"), "READ", "WRITE")
   .build();

client.policies().create(newPolicy)
   .toCompletableFuture().get(); // this will block the thread! work asynchronously whenever possible!

Updating and deleting policies is also possible via the Java client API, please follow the API and the JavaDoc.

Search for things

Search for things using the Java 8 java.util.Stream API:

client.twin().search()
   .stream(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
      .filter("eq(attributes/location,'kitchen')") // apply RQL expression here
      .options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
   )
   .forEach(foundThing -> System.out.println("Found thing: " + foundThing));

Use an RQL query in order to filter for the searched things.

Search for things using the reactive streams org.reactivestreams.Publisher API:

Publisher<List<Thing>> publisher = client.twin().search()
   .publisher(queryBuilder -> queryBuilder.namespace("org.eclipse.ditto")
      .filter("eq(attributes/location,'kitchen')") // apply RQL expression here
      .options(builder -> builder.sort(s -> s.desc("thingId")).size(1))
   );
// integrate the publisher in the reactive streams library of your choice, e.g. Pekko streams:
org.apache.pekko.stream.javadsl.Source<Thing, NotUsed> things = org.apache.pekko.stream.javadsl.Source.fromPublisher(publisher)
   .flatMapConcat(Source::from);
// .. proceed working with the Pekko Source ..

Request and issue acknowledgements

Requesting acknowledgements is possible in the Ditto Java client in the following way:

DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
   .acknowledgementRequest(
      AcknowledgementRequest.of(DittoAcknowledgementLabel.PERSISTED),
      AcknowledgementRequest.of(AcknowledgementLabel.of("my-custom-ack"))
   )
   .timeout("5s")
   .build();

client.twin().forId(ThingId.of("org.eclipse.ditto:my-thing"))
   .putAttribute("counter", 42, Options.dittoHeaders(dittoHeaders))
   .whenComplete((aVoid, throwable) -> {
      if (throwable instanceof AcknowledgementsFailedException) {
         Acknowledgements acknowledgements = ((AcknowledgementsFailedException) throwable).getAcknowledgements();
         System.out.println("Acknowledgements could not be fulfilled: " + acknowledgements);
      }   
   });

Issuing requested acknowledgements can be done like this whenever a Change callback is invoked with a change notification:

client.twin().registerForThingChanges("REG1", change -> {
   change.handleAcknowledgementRequest(AcknowledgementLabel.of("my-custom-ack"), ackHandle ->
      ackHandle.acknowledge(HttpStatusCode.NOT_FOUND, JsonObject.newBuilder()
         .set("error-detail", "Could not be found")
         .build()
      )
   );
});

Further examples

For further examples on how to use the Ditto client, please have a look at the class DittoClientUsageExamples which is configured to connect to the Ditto sandbox.

Tags: