开发ZeroMQ的SpringBoot Starter。
resources中META-INF中增加文件spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=net.ytoframework.plugin.zeromq.ZeroMQAutoConfiguration
配置文件类:ZeroMQProperties
@ConfigurationProperties(prefix = "spring.zeromq")
@Setter
@Getter
public class ZeroMQProperties {
private Publisher publisher;
private Subscriber subscriber;
@Data
public static class Publisher {
private Integer ioNum = 1;
private String address;
}
@Data
public static class Subscriber {
private Integer ioNum = 1;
private String address;
}
}
自动配置类:ZeroMQAutoConfiguration
@Configuration
@EnableConfigurationProperties(ZeroMQProperties.class)
public class ZeroMQAutoConfiguration {
@Bean(destroyMethod = "destroy")
@ConditionalOnMissingBean(Publisher.class)
@ConditionalOnProperty(prefix = "spring.zeromq", value = "publisher.address")
public Publisher mqPublisher(final ZeroMQProperties zeroMQProperties) {
ZeroMQProperties.Publisher publisher = zeroMQProperties.getPublisher();
if (publisher.getAddress() == null) {
throw new IllegalArgumentException("mq address can not be null!");
}
if (publisher.getIoNum() == null) {
publisher.setIoNum(1);
}
return new Publisher(publisher.getIoNum(), publisher.getAddress());
}
@Configuration
@ConditionalOnProperty(prefix = "spring.zeromq", value = "subscriber.address")
public static class ListenerContainerConfiguration
implements ApplicationContextAware, InitializingBean, EnvironmentAware {
private Set<String> topicSet = new HashSet<>();
private ConfigurableApplicationContext applicationContext;
private StandardEnvironment environment;
private AtomicLong counter = new AtomicLong(0);
@Autowired
private ZeroMQProperties subscrbConfig;
private void registerContainer(final String beanName, final Object bean) {
Class<?> clazz = AopUtils.getTargetClass(bean);
if (!MessageCallback.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " must be instance of " + MessageCallback.class.getName());
}
if (subscrbConfig.getSubscriber() == null) {
throw new IllegalArgumentException("Argument Illegal, no subscrbConfig!");
}
MessageCallback callback = (MessageCallback) bean;
Subscriber annotation = clazz.getAnnotation(Subscriber.class);
if (topicSet.contains(annotation.topic())) {
throw new IllegalArgumentException("Topic重复!");
}
BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(SubscribContainer.class);
beanBuilder.addPropertyValue("topic", annotation.topic());
beanBuilder.addPropertyValue("address", subscrbConfig.getSubscriber().getAddress());
beanBuilder.addPropertyValue("ioNum", subscrbConfig.getSubscriber().getIoNum());
beanBuilder.addPropertyValue("threadSize", annotation.threadSize());
beanBuilder.addPropertyValue("queueSize", annotation.queueSize());
beanBuilder.addPropertyValue("callback", callback);
beanBuilder.addPropertyValue("clazz", findType(callback));
beanBuilder.setDestroyMethodName("close");
String containerBeanName = String.format("%s_%s", SubscribContainer.class.getName(), counter.incrementAndGet());
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
SubscribContainer container = beanFactory.getBean(containerBeanName, SubscribContainer.class);
container.startRun();
topicSet.add(annotation.topic());
log.info("register messageCallback to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}
@Override
public void afterPropertiesSet() throws Exception {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(Subscriber.class);
Optional.ofNullable(beans).ifPresent(b -> b.forEach(this::registerContainer));
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void setEnvironment(Environment environment) {
if (environment instanceof StandardEnvironment) {
this.environment = (StandardEnvironment) environment;
}
}
private Type findType(MessageCallback callback) {
Type[] interfaces = callback.getClass().getGenericInterfaces();
for(Type t: interfaces){
if(t.getTypeName().startsWith(MessageCallback.class.getName()) && ParameterizedType.class.isAssignableFrom(t.getClass())){
return ((ParameterizedType)t).getActualTypeArguments()[0];
}
}
throw new IllegalStateException("");
}
}
}
发布者:Publisher
public class Publisher<T extends Serializable> {
private ZMQ.Context context;
private ZMQ.Socket socket;
private int mode = ZMQ.PUB;
public Publisher(int ioNum, String address) {
context = ZMQ.context(ioNum);
socket = context.socket(mode);
socket.bind(address);
}
public boolean send(String topic, T msg) {
return socket.send(topic + " " + JSON.toJSON(msg), 0);
}
public void destroy() {
socket.close();
context.term();
}
}
接收者注解:Subscriber
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Subscriber {
String topic();
//并发处理消息的线程数
int threadSize() default 20;
int queueSize() default 20000;
}
接收者回调接口:MessageCallback
public interface MessageCallback<T extends Serializable> {
void onMessage(T msg);
}
接收者容器:SubscribContainer
@Data
public class SubscribContainer extends Thread {
private String topic;
private String address;
private int threadSize;
private int queueSize;
private int keepAliveTime = 0;
private ZMQ.Socket socket;
private ZMQ.Context context;
private MessageCallback callback;
private Executor execute;
private int ioNum = 1;
private boolean exit = false;
private Type clazz;
private int mode = ZMQ.SUB;
public SubscribContainer() {
}
public void startRun() {
if (this.getState() != State.NEW) {
return;
}
try {
context = ZMQ.context(ioNum);
socket = context.socket(mode);
socket.connect(address);
socket.subscribe(this.topic.getBytes());
execute = new ThreadPoolExecutor(threadSize, threadSize, keepAliveTime, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize), new NamedThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
this.start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
byte[] stringValue = socket.recv(0);
String msg = new String(stringValue);
String finalMsg = msg.substring(this.topic.length() + 1);
if (String.class.equals(clazz)) {
execute.execute(() -> callback.onMessage(finalMsg));
} else {
Serializable o = JSON.parseObject(finalMsg, clazz);
execute.execute(() -> callback.onMessage(o));
}
if (exit) {
break;
}
}
}
public void close() {
try {
exit = true;
socket.close();
context.term();
} catch (Exception e) {
e.printStackTrace();
}
}
private static class NamedThreadFactory implements ThreadFactory {
AtomicInteger i = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("ZeroMQ-Thread-" + i.getAndIncrement());
return t;
}
}
}
使用: 引入jar包 application.properties
server.port=8080
spring.zeromq.publisher.ioNum=4
spring.zeromq.publisher.address=tcp://*:5557
spring.zeromq.publisher.topic=test
spring.zeromq.subscriber.address=tcp://localhost:5557
启动类:
@SpringBootApplication
public class ZmqApplication implements CommandLineRunner {
@Autowired
Publisher publisher;
@Autowired
Publisher<Person> publisher2;
/**
* 程序入口
*
* @param args 参数
*/
public static void main(final String[] args) {
SpringApplication.run(ZmqApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
for (int i = 1; i <= 100; i++) {
publisher.send("topic1", "msg" + i);
publisher2.send("topic2", new Person(i, "name" + i, new Date()));
}
}
}
Person对象:
@Data
public class Person implements Serializable {
public Person(int id, String name, Date birthday) {
this.id = id;
this.name = name;
this.birthday = birthday;
}
private int id;
private String name;
private Date birthday;
}
一个Controller:
@RestController
@RequestMapping("/zero")
public class TestController {
@Autowired
Publisher<String> publisher;
@RequestMapping("/send/{n}")
public String send(@PathVariable("n") int n) {
boolean b = publisher.send("topic1", "msg" + n);
return b ? "success" : "error";
}
}
订阅者1:
@Subscriber(topic = "topic1", threadSize = 5)
public class SubscriberTest implements MessageCallback<String>, Serializable {
@Override
public void onMessage(String str) {
System.out.println(Thread.currentThread().getName() + " msg: " + str);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
订阅者2:
@Subscriber(topic = "topic2", threadSize = 10)
public class SubscriberTest2 implements MessageCallback<Person> {
@Override
public void onMessage(Person p) {
System.out.println(Thread.currentThread().getName() + " msg: " + p.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 作者:luangeng
- 主页:https://wawazhua.cn
- 本文出处:https://wawazhua.cn/post/java/spring/zeromq-springboot/
- 版权声明:禁止转载-非商用-非衍生