hi all
i have a scheduled job which sends multiple messages and subscribes to a topic for each of these message:
@Scheduled(cron = "${scheduled.jobs.cron}")
public void sendMessages() {
List<ScheduledJob> foundJobsToExecute = scheduledJobRepository.findByAusfuehrungsZeitpunktLessThanEqualAndStatus(LocalDateTime.now(), JobStatus.ANSTEHEND);
log.info("started cron job, found {} jobs to execute", foundJobsToExecute.size());
foundJobsToExecute.forEach(job -> asyncService.executeSendAndSubscribePerJob(job));
}
public void executeSendAndSubscribePerJob(ScheduledJob job) {
try {
mqttPahoClient.connect();
setupSecurityContextForBatchJob(job);
setJobInAusfuehrung(job.getId());
String sendTopic = String.format(VERSION_TOPIC, job.getRessourcenUniqueIdentifier());
String responseTopic = "reply_" + job.getCorrelationId();
log.info("sending message {} to topic {} ", job.getMessagePayload(), sendTopic);
mqttPahoClient.subscribe(responseTopic, message -> {
tempService.doStuff(message, job.getId(), responseTopic);
});
MqttMessage messageToSend = new MqttMessage(job.getMessagePayload().getBytes(), 2, true, createMqttProperties(responseTopic));
mqttPahoClient.send(sendTopic, messageToSend);
} catch (MqttException e) {
e.printStackTrace();
}
}
but now it seems that the paho client isnt able to subscribe and deliver messages in parallel, because i get a concurrent modification exception while doing it:
Caused by: Untranslated MqttException - RC: 0 (0) - java.util.ConcurrentModificationException
at org.eclipse.paho.mqttv5.client.internal.CommsCallback.run(CommsCallback.java:238)
... 1 more
Caused by: java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1526)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1524)
at org.eclipse.paho.mqttv5.client.internal.CommsCallback.deliverMessage(CommsCallback.java:610)
at org.eclipse.paho.mqttv5.client.internal.CommsCallback.handleMessage(CommsCallback.java:481)
at org.eclipse.paho.mqttv5.client.internal.CommsCallback.run(CommsCallback.java:224)
... 1 more
I think it's the same problem as reported here: https://github.com/aws-amplify/aws-sdk-android/issues/329
Is there a way around this issue or to fix it? Any help appreciated
regards angela