Hello Aravind,
I think the problem is in your subscription. By default Kura enforces a topic structure of <account>/<client_id>/<app_id>/rest_of_the_topic . From what I see you are subscribing to:
// <app_id>/rest_of_the_topic
EXAMPLE_PUBLISHER/data/metrics
Where you should be subscribing to something like:
// <account_id>/<client_id><app_id>/rest_of_the_topic
kapua-sys/#/EXAMPLE_PUBLISHER/data/metrics
I hope this helps.
--Dave
From: <kapua-dev-bounces@xxxxxxxxxxx> on behalf of Aravind Boppana <aravind.boppana@xxxxxxxxxxxxxxxxxxx>
Reply-To: kapua developer discussions <kapua-dev@xxxxxxxxxxx>
Date: Tuesday, April 17, 2018 at 8:27 AM
To: "kapua-dev@xxxxxxxxxxx" <kapua-dev@xxxxxxxxxxx>
Cc: Robert Sanders <robert.sanders@xxxxxxxxxxxxxxxxxxx>
Subject: Re: [kapua-dev] Kapua-Camel Mqtt Connection
On Fri, Apr 13, 2018 at 11:18 AM, Aravind Boppana <aravind.boppana@xxxxxxxxxxxxxxxxxxx>
wrote:
Hi, I have connected Kura and kapua and the data is getting published from kura to kapua. This I can see it in Web UI of Kapua. I have used example publisher from
ecllipse community by installing it as a package using kura UI. Now I want to get the data from kapua into kafka. For that I am using Apache camel. I used camel mqtt component to connect to kapua. I am able to connect well(I can see the connection in the kapua
web UI.) but the data is not getting transferred. Can you help me with this? This is my code.
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListenerSupport;
import org.apache.camel.main.MainSupport;
import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.Properties;
public class MainExample {
public static void main(String[] args) throws Exception {
MainExample example = new MainExample();
example.boot();
}
private void boot() throws Exception {
// create a Main instance
Main main = new Main();
main.addRouteBuilder(new MyRouteBuilder());
// add event listener
main.addMainListener(new Events());
// run until you terminate the JVM
System.out.println("Starting Camel. Use ctrl + c to terminate the JVM.\n");
main.run();
}
private static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("mqtt://kapua?host=tcp://kapua_mqtt_brokeruri:1883&subscribeTopicName=EXAMPLE_PUBLISHER/data/metrics&userName=kapua-sys&password=kapua-password&clientId=kafka")
.process(exchange -> System.out.println("Invoked timer at " + new Date()))
.process(new MyLogProcessor())
.to("kafka:kafka-kapua?brokers=kafka_brokeruri:9092");
}
}
public static class Events extends MainListenerSupport {
@Override
public void afterStart(MainSupport main) {
System.out.println("MainExample with Camel is now started!");
}
@Override
public void beforeStop(MainSupport main) {
System.out.println("MainExample with Camel is now being stopped!");
}
}
public class MyLogProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
System.out.println(new Date()+":Now processing the String : " + exchange.getIn().getBody(String.class)); }
}
}
--
Aravind Boppana |
Big Data Developer
CLAIRVOYANT | Chandler,
AZ