Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Paho » ConcurrentModificationException in deliverMessage(error while subscribing and sending messages)
ConcurrentModificationException in deliverMessage [message #1804443] Tue, 26 March 2019 08:18
Angela Stempfel is currently offline Angela StempfelFriend
Messages: 1
Registered: March 2019
Junior Member
hi all

i have a scheduled job which sends multiple messages and subscribes to a topic for each of these message:

 @Scheduled(cron = "${scheduled.jobs.cron}")
    public void sendMessages() {
        List<ScheduledJob> foundJobsToExecute = scheduledJobRepository.findByAusfuehrungsZeitpunktLessThanEqualAndStatus(LocalDateTime.now(), JobStatus.ANSTEHEND);
        log.info("started cron job, found {} jobs to execute",  foundJobsToExecute.size());
        foundJobsToExecute.forEach(job -> asyncService.executeSendAndSubscribePerJob(job));
    }



 public void executeSendAndSubscribePerJob(ScheduledJob job) {

        try {
            mqttPahoClient.connect();

            setupSecurityContextForBatchJob(job);

            setJobInAusfuehrung(job.getId());

            String sendTopic = String.format(VERSION_TOPIC, job.getRessourcenUniqueIdentifier());
            String responseTopic = "reply_" + job.getCorrelationId();
            log.info("sending message {} to topic {} ", job.getMessagePayload(), sendTopic);


            mqttPahoClient.subscribe(responseTopic, message -> {
                tempService.doStuff(message, job.getId(), responseTopic);
            });


            MqttMessage messageToSend = new MqttMessage(job.getMessagePayload().getBytes(), 2, true, createMqttProperties(responseTopic));
            mqttPahoClient.send(sendTopic, messageToSend);


        } catch (MqttException e) {
            e.printStackTrace();
        }

    }



but now it seems that the paho client isnt able to subscribe and deliver messages in parallel, because i get a concurrent modification exception while doing it:
Caused by: Untranslated MqttException - RC: 0 (0) - java.util.ConcurrentModificationException
	at org.eclipse.paho.mqttv5.client.internal.CommsCallback.run(CommsCallback.java:238)
	... 1 more
Caused by: java.util.ConcurrentModificationException
	at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1526)
	at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1524)
	at org.eclipse.paho.mqttv5.client.internal.CommsCallback.deliverMessage(CommsCallback.java:610)
	at org.eclipse.paho.mqttv5.client.internal.CommsCallback.handleMessage(CommsCallback.java:481)
	at org.eclipse.paho.mqttv5.client.internal.CommsCallback.run(CommsCallback.java:224)
	... 1 more


I think it's the same problem as reported here: https://github.com/aws-amplify/aws-sdk-android/issues/329
Is there a way around this issue or to fix it? Any help appreciated

regards angela

Previous Topic:paho async c++ client, no timeout on MQTTAsync_sendMessage?
Next Topic:Proxy aware
Goto Forum:
  


Current Time: Fri May 03 05:56:52 GMT 2024

Powered by FUDForum. Page generated in 0.03157 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top