• Java

SpringBoot集成ZeroMQ

开发ZeroMQ的SpringBoot Starter。

resources中META-INF中增加文件spring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=net.ytoframework.plugin.zeromq.ZeroMQAutoConfiguration

配置文件类:ZeroMQProperties

@ConfigurationProperties(prefix = "spring.zeromq")
@Setter
@Getter
public class ZeroMQProperties {

    private Publisher publisher;

    private Subscriber subscriber;

    @Data
    public static class Publisher {
        private Integer ioNum = 1;
        private String address;
    }

    @Data
    public static class Subscriber {
        private Integer ioNum = 1;
        private String address;
    }
}

自动配置类:ZeroMQAutoConfiguration

@Configuration
@EnableConfigurationProperties(ZeroMQProperties.class)
public class ZeroMQAutoConfiguration {

    @Bean(destroyMethod = "destroy")
    @ConditionalOnMissingBean(Publisher.class)
    @ConditionalOnProperty(prefix = "spring.zeromq", value = "publisher.address")
    public Publisher mqPublisher(final ZeroMQProperties zeroMQProperties) {
        ZeroMQProperties.Publisher publisher = zeroMQProperties.getPublisher();
        if (publisher.getAddress() == null) {
            throw new IllegalArgumentException("mq address can not be null!");
        }
        if (publisher.getIoNum() == null) {
            publisher.setIoNum(1);
        }
        return new Publisher(publisher.getIoNum(), publisher.getAddress());
    }

    @Configuration
    @ConditionalOnProperty(prefix = "spring.zeromq", value = "subscriber.address")
    public static class ListenerContainerConfiguration
            implements ApplicationContextAware, InitializingBean, EnvironmentAware {

        private Set<String> topicSet = new HashSet<>();

        private ConfigurableApplicationContext applicationContext;

        private StandardEnvironment environment;

        private AtomicLong counter = new AtomicLong(0);

        @Autowired
        private ZeroMQProperties subscrbConfig;

        private void registerContainer(final String beanName, final Object bean) {
            Class<?> clazz = AopUtils.getTargetClass(bean);
            if (!MessageCallback.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " must be instance of " + MessageCallback.class.getName());
            }
            if (subscrbConfig.getSubscriber() == null) {
                throw new IllegalArgumentException("Argument Illegal, no subscrbConfig!");
            }
            MessageCallback callback = (MessageCallback) bean;
            Subscriber annotation = clazz.getAnnotation(Subscriber.class);
            if (topicSet.contains(annotation.topic())) {
                throw new IllegalArgumentException("Topic重复!");
            }
            BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(SubscribContainer.class);
            beanBuilder.addPropertyValue("topic", annotation.topic());
            beanBuilder.addPropertyValue("address", subscrbConfig.getSubscriber().getAddress());
            beanBuilder.addPropertyValue("ioNum", subscrbConfig.getSubscriber().getIoNum());
            beanBuilder.addPropertyValue("threadSize", annotation.threadSize());
            beanBuilder.addPropertyValue("queueSize", annotation.queueSize());
            beanBuilder.addPropertyValue("callback", callback);
            beanBuilder.addPropertyValue("clazz", findType(callback));
            beanBuilder.setDestroyMethodName("close");
            String containerBeanName = String.format("%s_%s", SubscribContainer.class.getName(), counter.incrementAndGet());
            DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
            beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
            SubscribContainer container = beanFactory.getBean(containerBeanName, SubscribContainer.class);
            container.startRun();
            topicSet.add(annotation.topic());
            log.info("register messageCallback to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }

        @Override
        public void afterPropertiesSet() throws Exception {
            Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(Subscriber.class);
            Optional.ofNullable(beans).ifPresent(b -> b.forEach(this::registerContainer));
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        }

        @Override
        public void setEnvironment(Environment environment) {
            if (environment instanceof StandardEnvironment) {
                this.environment = (StandardEnvironment) environment;
            }
        }

        private Type findType(MessageCallback callback) {
            Type[] interfaces = callback.getClass().getGenericInterfaces();
            for(Type t: interfaces){
                if(t.getTypeName().startsWith(MessageCallback.class.getName()) && ParameterizedType.class.isAssignableFrom(t.getClass())){
                    return ((ParameterizedType)t).getActualTypeArguments()[0];
                }
            }
            throw new IllegalStateException("");
        }
    }

}

发布者:Publisher

public class Publisher<T extends Serializable> {

    private ZMQ.Context context;

    private ZMQ.Socket socket;

    private int mode = ZMQ.PUB;

    public Publisher(int ioNum, String address) {
        context = ZMQ.context(ioNum);
        socket = context.socket(mode);
        socket.bind(address);
    }

    public boolean send(String topic, T msg) {
        return socket.send(topic + " " + JSON.toJSON(msg), 0);
    }

    public void destroy() {
        socket.close();
        context.term();
    }

}

接收者注解:Subscriber

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Subscriber {

    String topic();

    //并发处理消息的线程数
    int threadSize() default 20;

    int queueSize() default 20000;

}

接收者回调接口:MessageCallback

public interface MessageCallback<T extends Serializable> {
    void onMessage(T msg);
}

接收者容器:SubscribContainer

@Data
public class SubscribContainer extends Thread {

    private String topic;

    private String address;

    private int threadSize;

    private int queueSize;

    private int keepAliveTime = 0;

    private ZMQ.Socket socket;

    private ZMQ.Context context;

    private MessageCallback callback;

    private Executor execute;

    private int ioNum = 1;

    private boolean exit = false;

    private Type clazz;

    private int mode = ZMQ.SUB;

    public SubscribContainer() {
    }

    public void startRun() {
        if (this.getState() != State.NEW) {
            return;
        }
        try {
            context = ZMQ.context(ioNum);
            socket = context.socket(mode);
            socket.connect(address);
            socket.subscribe(this.topic.getBytes());
            execute = new ThreadPoolExecutor(threadSize, threadSize, keepAliveTime, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(queueSize), new NamedThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
            this.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true) {
            byte[] stringValue = socket.recv(0);
            String msg = new String(stringValue);
            String finalMsg = msg.substring(this.topic.length() + 1);
            if (String.class.equals(clazz)) {
                execute.execute(() -> callback.onMessage(finalMsg));
            } else {
                Serializable o = JSON.parseObject(finalMsg, clazz);
                execute.execute(() -> callback.onMessage(o));
            }
            if (exit) {
                break;
            }
        }
    }

    public void close() {
        try {
            exit = true;
            socket.close();
            context.term();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static class NamedThreadFactory implements ThreadFactory {
        AtomicInteger i = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("ZeroMQ-Thread-" + i.getAndIncrement());
            return t;
        }
    }

}

使用: 引入jar包 application.properties

server.port=8080
spring.zeromq.publisher.ioNum=4
spring.zeromq.publisher.address=tcp://*:5557
spring.zeromq.publisher.topic=test
spring.zeromq.subscriber.address=tcp://localhost:5557

启动类:

@SpringBootApplication
public class ZmqApplication implements CommandLineRunner {

    @Autowired
    Publisher publisher;

    @Autowired
    Publisher<Person> publisher2;

    /**
     * 程序入口
     *
     * @param args 参数
     */
    public static void main(final String[] args) {
        SpringApplication.run(ZmqApplication.class, args);
    }

    @Override
    public void run(String... strings) throws Exception {
        for (int i = 1; i <= 100; i++) {
            publisher.send("topic1", "msg" + i);
            publisher2.send("topic2", new Person(i, "name" + i, new Date()));
        }
    }
}

Person对象:

@Data
public class Person implements Serializable {

    public Person(int id, String name, Date birthday) {
        this.id = id;
        this.name = name;
        this.birthday = birthday;
    }

    private int id;

    private String name;

    private Date birthday;
}

一个Controller:

@RestController
@RequestMapping("/zero")
public class TestController {

    @Autowired
    Publisher<String> publisher;

    @RequestMapping("/send/{n}")
    public String send(@PathVariable("n") int n) {
        boolean b = publisher.send("topic1", "msg" + n);
        return b ? "success" : "error";
    }
}

订阅者1:

@Subscriber(topic = "topic1", threadSize = 5)
public class SubscriberTest implements MessageCallback<String>, Serializable {

    @Override
    public void onMessage(String str) {
        System.out.println(Thread.currentThread().getName() + " msg: " + str);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

订阅者2:

@Subscriber(topic = "topic2", threadSize = 10)
public class SubscriberTest2 implements MessageCallback<Person> {

    @Override
    public void onMessage(Person p) {
        System.out.println(Thread.currentThread().getName() + " msg: " + p.toString());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

相关

最新