SpringEvent와 MessageQueue
기존에 MessageQueue로서 RabbitMQ, Kafka를 고려할 때 이와 비슷한 역할을 하는 SpringEvent도 후보에 있었다.
SpringEvent도 이벤트 기반의 비동기 프로그래밍을 지원하는 아이인데 추가적인 외부 도구 설정은 필요없다. 내부에서 비동기 작업이나 트리거 처리를 간소화할 수 있고, Application 내에서 처리가 되니 디버깅이나 로깅도 편하다.
간편한데 안 쓴 이유
분산환경에는 얘가 알맞지 않다. 그야 SpringEvent는 이벤트의 발생과 Listener 호출이 동일한 JVM내에서 처리하다보니, A애플리케이션에서 발생한 Publish 호출을 B 애플리케이션의 Listener가 알 리가 없다. 클러스터 환경에선 다른 서버 노드로이벤트를 전파할 방법을 찾아야한다면 외부 메시지 큐를 찾는 게 일반적일테다.
혹은 SpringEvent를 사용하면서 다른 서버 노드로 이벤트를 전파할 방법을 찾았다고 치자. 하지만 내장된 SpringEvent는 이벤트의 성공여부, 특히 실패에 대한 재시도 로직이나 이벤트 저장 로직을 갖추지 않았다. 이런 것들을 자체적으로 보완하려다 보면 이미 잘 구성된 MessageQueue 도구를 사용함이 제한된 시간과 안전성에 좋은 선택지가 될테다.
결론적으로 MSA와 같은 분산 시스템에서는 중앙 메시지 큐인 Kafka, RabbitMQ 등을 도입하여 신뢰성있는 이벤트 전달, 대규모 데이터 처리, 실패에 대한 처리 등을 활용해 보는 게 좋고 단일 애플리케이션에서 간단한 이벤트 처리를 원한다면 (비동기 동작까지도) SpringEvent를 쓰는 것이 좋다.
*RabbitMA, Kafka 중에 일반적으로 kafka의 비용이 더 비싸다. RabbitMQ가 구조적으로 더 가볍고 단순하기 때문인데, 보다 많은 대규모처리를 노린다거나 데이터 장기 보관을 원한다면 Kafka가 유리하다.
SpringEvent 예제
단일 애플리케이션 내에서 회원가입 발생 시 알림전송과 로그를 남기는 기능을 구현하고 싶다고 해보자.
그러면 Event, EventPublisher, EventListener로 크게 세 가지 구성을 갖추면 된다.
- Event: 이벤트 정보를 담는 클래스
- Publisher: 이벤트를 발생시키는 클래스
- Listeners: 이벤트를 받아 처리하는 클래스들
명칭에서 느꼈듯이 하나의 Publisher에 대해 여러 개의 Listener가 파생되어 trigger 발생 시 정의해놓은 알림 전송, 로그 기록 등이 동작하게 될 에정이다.
✔ Event 클래스 정의
package org.kgyury.kg.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@Getter
public class RegisterEvent extends ApplicationEvent {
private final String username;
private final String email;
RegisterEvent(Object source, String username, String email){
super(source);
this.username = username;
this.email = email;
}
}
회원가입 행위에 대한 트리거를 만들기 위해 이용할 가입 시점의 정보를 담은 클래스를 만들어주었다.
이때, ApplicationEvent 추상메소드를 상속받았다. 이 메소드의 내부는 아래와 같다.
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp;
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
public ApplicationEvent(Object source, Clock clock) {
super(source);
this.timestamp = clock.millis();
}
public final long getTimestamp() {
return this.timestamp;
}
}
// EventObject
public class EventObject implements Serializable {
@java.io.Serial
private static final long serialVersionUID = 5516075349620653480L;
/**
* The object on which the Event initially occurred.
*/
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
public Object getSource() {
return source;
}
public String toString() {
return getClass().getName() + "[source=" + source + "]";
}
}
내부를 보니 최상위의 EventObjet가 이벤트가 발생한 Source 객체를 추적하는 역할을 한다. source는 이벤트가 발생한 객체로서, 이를 통해 SpringEvent Listener가 이벤트 발생 맥락을 이해할 수 있게 된다.
이 source에 들어가는 것은 HTTP 요청 객체일 수도 있고, 데이터베이스 엔티티일 수도 있는 거고, 이벤트 생성 시점의 클래스일 수도 있는 거고... 이벤트를 추적하고 필요한 정보를 전달하는 데 도움이 되는 요소를 넣으면 된다.
여기서 transient 키워드를 통해 source를 직렬화 대상에서 제외시키는데, 이는 직렬화 시 발생할 수 있는 오류를 방지한다. 가령. Swing과 함께 사용하면서 GUI이벤트 반생 시 백그라운드 작업을 트리거하겠다고 치자. 해당 source가 GUI이벤트로 JButton과 같은 컴포넌트가 되었다면 직렬화가 불가능하다. NotSerializableExcpeiton을 피하고자 함이다.
더불어서 ApplicationEvent에 생성자가 있음에 따라 구현체에서 부모 생성자를 불러올 수 있도록 만들었다.
*왜 JButton은 직렬화가 안 돼요?
JButton이 Serializable을 구현하긴 하는데, 내부적으론 직렬화가 불가능한 필드(UI정보, Look-and-Feel)를 포함한다.
이 UI정보가 네이티브 리소스, 즉 운영체제에 직접적인 정보로서 JVM 외부에 있는 정보기에 직렬화가 불가능하다.
✔ Event Publisher 정의
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
@RequiredArgsConstructor
@Slf4j
public class EventPublisherService {
private final ApplicationEventPublisher publisher;
public void register(String username, String email){
log.debug("[EVENT] USER REGISTERED : {}", username);
CustomEvent event = new CustomEvent(this, username, email);
publisher.publishEvent(event);
}
}
보이는 바와 같이 EventPublisher는 register() 함수에서 아까 정의한 Event 객체를 생성하고 이를 알리는 과정을 수행한다.
그럼 내부 객체로 사용중인 ApplicationEventPublisher를 조금 더 파헤쳐보게 되는데, 호출한 publishEvent()만 중점으로 보면 코드가 이처럼 구현되어있다.
...
@Nullable
private Set<ApplicationEvent> earlyApplicationEvents;
...
public void publishEvent(Object event) {
this.publishEvent(event, (ResolvableType)null);
}
protected void publishEvent(Object event, @Nullable ResolvableType typeHint) {
Assert.notNull(event, "Event must not be null");
ResolvableType eventType = null;
Object applicationEvent;
if (event instanceof ApplicationEvent applEvent) {
applicationEvent = applEvent;
eventType = typeHint;
} else {
...
}
if (eventType == null) {
eventType = ResolvableType.forInstance(applicationEvent);
if (typeHint == null) {
typeHint = eventType;
}
}
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.multicastEvent((ApplicationEvent)applicationEvent, eventType);
}
if (this.parent != null) {
ApplicationContext var8 = this.parent;
if (var8 instanceof AbstractApplicationContext) {
AbstractApplicationContext abstractApplicationContext = (AbstractApplicationContext)var8;
abstractApplicationContext.publishEvent(event, typeHint);
} else {
this.parent.publishEvent(event);
}
}
}
Event를 관리 객체로서 Set에 담는 로직을 볼 수 있다.
깨알로 살펴보면 event 객체를 패턴매칭(instanceof Pattern matching)을 사용한 조건문을 볼 수 있다.
if (event instanceof ApplicationEvent applEvent) {
applicationEvent = applEvent;
eventType = typeHint;
}
이는 java16에서부터 가능한 방식인데, event instanceOf ApplicationEvent로 타입을 검사하고 동시에 applEvent라는 변수에 안전하게 캐스팅된 객체를 할당한다.
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.multicastEvent((ApplicationEvent)applicationEvent, eventType);
}
이 부분을 보면, earlyApplicationEvents가 null이 아니라면(대기열이 준비된 상태) 이벤트가 아직 초기화되지 않은 리스너들에게 처리되어야 한다는 얘기다. 주로 application 초기화 단계에서 발생한 이벤트들이 이런 쪽인데, 나중에 처리하라고 대기열에 추가하는 모양이다.
earlyApplicationEvent가 null이 아닌 게 왜 Application 미초기화 상태일까?
public class AbstractApplicationContext {
private List<ApplicationEvent> earlyApplicationEvents;
// 애플리케이션 초기화 중 이벤트 처리
public void publishEvent(ApplicationEvent event) {
if (this.earlyApplicationEvents != null) {
// 초기화 중: 대기열에 이벤트 추가
this.earlyApplicationEvents.add(event);
} else if (this.applicationEventMulticaster != null) {
// 초기화 완료: 이벤트를 바로 멀티캐스터로 처리
this.applicationEventMulticaster.multicastEvent(event);
}
}
// 초기화 완료 시 처리
public void finishRefresh() {
if (this.earlyApplicationEvents != null) {
// 대기열에 저장된 이벤트를 모두 처리
for (ApplicationEvent event : this.earlyApplicationEvents) {
this.applicationEventMulticaster.multicastEvent(event);
}
// 대기열 비활성화
this.earlyApplicationEvents = null;
}
}
}
간소화한 위의 코드를 보자.
내부 동작으로서 우선 earlyApplicationEvents는 초기화 중 발생한 이벤트를 저장하기 위한 대기열이다. 즉 아래와 같은 시나리오로 이해할 수 있다.
- Application이 초기화 진행 중이라면 earlyApplicationEvent 자료구조를 통해 초기화 완료 시 진행해야할 애들을 위한 대기열로 만들어둔다
- Application이 초기화가 완료되었다면 이제 처리가능한 상태이므로 대기열에서 꺼내어 처리를 진행한다
- 처리가 완료되었다면 earlyApplicationEvent는 비활성화(null)된다.
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.multicastEvent((ApplicationEvent)applicationEvent, eventType);
}
다시 돌아오자. 앞선 과정으로 대기열에서 완료되었다면 그 다음부터는 applicationEventMulticaster의 멀티캐스트 함수를 통해서 모든 리스너들에게 이벤트를 전파한다. 즉, 여기가 리스너들을 호출하는 핵심 역할을 한다.
✔ Event Listener 정의
package org.kgyury.kg.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RegisterLogListener {
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER1] Registered Someone : {}", event.getUsername());
}
}
package org.kgyury.kg.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RegisterSendMailListener {
@EventListener
public void handleRegisterEvent(RegisterEvent event){
// 메일 보내는 척
log.info("[EVENT LISTENER2] SEND EMAIL: {}", event.getEmail());
}
}
간단히 두 개를 만들었다. 하나는 누군가의 가입로그를, 하나는 이에 대한 메일을 보내는척 하는 로그다.
실행
package org.kgyury.kg;
import lombok.RequiredArgsConstructor;
import org.kgyury.kg.event.RegisterEventPublisher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@RequiredArgsConstructor
public class KgApplication implements CommandLineRunner {
private final RegisterEventPublisher publisher;
public static void main(String[] args) {
System.out.println("------main-----");
SpringApplication.run(KgApplication.class, args);
}
@Override
public void run(String ...args){
System.out.println("-------run------");
publisher.register("Kgyury", "mandarining@g.com");
}
}
간단한 테스트를 위해 CommandLineRunner를 이용한다.
사전에 정의해놓은 RegisterEventPublisher에 이벤트를 발생시켰고, 그러면 내부적으로 해당 이벤트에 대해 정의한 Listener들이 동작할 것이다.
Spring Event의 Listener는 어떻게 수행되는가
생각해보자, 이제 이 Listener는 어떻게 동작할까?
1. 여러 개의 Listener가 하나의 스레드에서 순차적으로 실행될 것인가?
2. 각 Listenr마다 개별 스레드에서 병렬수행을 할 것인가?
3. 순차적으로 실행된다면 순서는 어떻게 정해지는가?
✔ Spring Event의 기본동작
우선 결론적으로 기본동작은 동기적으로, 한 스레드에서 순차적인 진행을 이룬다.
아래와 같이 로그를 찍어 실행시켜보자.
@Component
@Slf4j
public class RegisterLogListener {
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER1] Registered Someone : {}", event.getUsername());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
@Component
@Slf4j
public class RegisterSendMailListener {
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER2] SEND EMAIL: {}", event.getEmail());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
✔ 비동기 실행 수행시키기
그러나 개발자가 명시적으로 @Async를 이용해 비동기 실행을 활성화시킬 수 있다. 아래처럼 설정을 하고 @Async를 사용해보자
@Configuration
@EnableAsync
public class AsyncConfig {
}
@Component
@Slf4j
public class RegisterLogListener {
@Async // 추가
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER1] Registered Someone : {}", event.getUsername());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
@Component
@Slf4j
public class RegisterSendMailListener {
@Async // 추가
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER2] SEND EMAIL: {}", event.getEmail());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
별도의 Thread에서 각자 수행중임을 알 수 있다.
✔ Listener 순서 주기
마지막으로, 명시적으로 Order값을 줌으로써 수행 순서를 지정해줄 수 있다.
@Component
@Slf4j
public class RegisterLogListener {
@Order(2)
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER1] ORDER2- Registered Someone : {}", event.getUsername());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
@Component
@Slf4j
public class RegisterSendMailListener {
@Order(1)
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER2] ORDER1- SEND EMAIL: {}", event.getEmail());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
@Component
@Slf4j
public class RegisterThridMailListener {
@Order(3)
@EventListener
public void handleRegisterEvent(RegisterEvent event){
log.info("[EVENT LISTENER3] ORDER3- Orange: {} {}", "🍊", event.getEmail());
log.info("[EVENT THREAD] Listener executed on thread: {}" , Thread.currentThread().getName());
}
}
Order을 Method 수준에서 적용시켜주면 된다.
SpringListener가 메서드 단위로 실행 순서를 제어하고 있으니 같은 수준에 명시해주는 것이다.
위의 실행은 기본적으로 동기 수행일 때인데, 비동기로 수행시킨다면 아래와 같다 (@Async 적용 시)
로그를 보면 각각이 다른 thread로 병렬처리하고 있음을 확인할 수 있다.
보이다시피, 비동기이다보니 호출 자체는 순서대로 될지라도 처리의 완료시점은 제각각으로 끝날 예정이다.
# Reference
https://www.baeldung.com/spring-events