I am trying to start a basic project structure where multiple spring boot application will share resources using apache curator.
I am following the guides specified in documentation but changing the nodes doesnt trigger any events
Please, any help would be appreciated
pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
docker-compose.yaml
version: '3.1'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
Creator
package com.training.zoo.sss;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.System.out;
@Service
public class Client {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Client() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
if (stat1 == null) {
client.create().forPath(ZK_PATH, "sometdata".getBytes());
}
byte[] bytes = client.getData().forPath(ZK_PATH);
out.println(new String(bytes, StandardCharsets.UTF_8));
// Update value every half second
final AtomicInteger i = new AtomicInteger(0);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
exec.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
i.set(i.get()+1);
System.out.println(i);
try {
client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
Listener
package com.training.bookstore.request;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;
@Service
public class Watcher2 {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Watcher2() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
PathChildrenCache watcher = new PathChildrenCache(
client, ZK_PATH, true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.NORMAL);
System.out.println("Register zk watcher successfully!");
}
}
Thank you
So yeah that class name PathChildrenCache sounded a bit off to me.
Solution is If producer produces on specified path
In Watcher class set path to "parent" of that node
And in case you need to listen to subnodes/subfolders of your producer path, instead of using
PathChildrenCacheuseTreeCache