문제
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Subscriber {
void send(String message) {
//....
}
}
public class NotificationService {
void publish(String topic, String message) {
}
void subscribe(String topic, Subscriber subscriber) {
}
}
구독 시스템의 알람 서비스를 구현하는 문제이다. 알람 서비스의 메서드로는 topic
을 구독한 구독자에게 메시지를 보내는 publish(…)
, 그리고 topic
을 구독하는 subscribe(…)
를 구현하는 문제이다.
내 풀이
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class NotificationService {
HashMap<String, List<Subscriber>> subscribersMap = new HashMap<>();
void publish(String topic, String message) {
if(subscribersMap.containsKey(topic)) {
List<Subscriber> subscribers = subscribersMap.get(topic);
for(Subscriber subscriber : subscribers) {
subscriber.send(message);
}
}else{
System.out.println("해당 topic을 구독한 사람이 없습니다.");
}
}
void subscribe(String topic, Subscriber subscriber) {
if(!subscribersMap.containsKey(topic)){
ArrayList<Subscriber> subscriberList = new ArrayList<>();
subscriberList.add(subscriber);
subscribersMap.put(topic,subscriberList);
}else{
subscribersMap.get(topic).add(subscriber);
}
}
}
HashMap<String, List<Subscriber>> subscribersMap = new HashMap<>();
String 타입인 topic을 key로 해당 topic을 구독한 Subscriber를 담을 수 있는 List를 value로 HashMap을 만들어 db를 대체했다.
publish(String topic, String message)
메시지 전송 기능은HashMap
에topic
이 등록되어있는지 확인하고for()
문을 사용해topic
의 구독자들에게 메시지를 전송하도록 구현했다.
subscribe(String topic, Subscriber subscriber)
구독 기능은 해당topic
을 구독한 사람이 한명이라도 있다면HashMap
에서topic
을 찾아 구독자 리스트에 추가해주고 없다면 새로HashMap
에 등록하도록 구현했다.
개선사항 : 예외처리
topic
이나 subscriber
가 null
인 경우를 대비해 예외처리를 해주었다.
1
2
3
if (topic == null || message == null) {
throw new IllegalArgumentException("Topic이나 메시지가 null일 수 없습니다.");
}
send()
메서드는 Subscriber
라는 외부 클래스에서 제공해주는 메서드이기 때문에 예외가 전파될 수 있다. 예를 들어 10명의 구독자에게 메시지를 보내는데 6번째에 예외가 발생한다면 남은 5명은 메시지를 못받을 수 있다. try-catch
로 예외를 잡아 처리해주었다.
1
2
3
4
5
6
7
for(Subscriber subscriber : subscribers) {
try{
subscriber.send(message); // 예외 발생할 가능성이 있음
}catch(Exception e){
e.printStackTrace();
}
}
개선사항 : 병렬처리
만약 A라는 topic을 구독한 구독자가 백만명이라면? 백만명에게 for()문을 돌려 메시지를 전송한다면 백만번째의 구독자는 메시지를 받으려면 엄청 오래기다려야 할것이다.
Java에서는 병렬처리를 수행하는 방법으로는 두 가지가 있다.
- 스레드를 사용하는 방법
- Fork/Join Framework를 사용하는방법
그 중 Fork/Join Framework를 편리하게 사용할 수 있는 stream
의 parallel
을 사용해 병렬로 메시지를 보낼 수 있게 처리했다.
1
2
3
4
5
6
7
subscribers.stream().parallel().forEach(subscriber -> {
try{
subscriber.send(message);
}catch(Exception e){
e.printStackTrace();
}
});
개선사항 : 동시성 처리
A,B라는 사용자가 구독을 요청했다면 A는 해당 topic에 대한 첫번째 구독자여서 구독자 리스트를 생성하고 있다. 아직 map에 구독자 리스트를 추가하지 않은 상태에 새로운 B가 같은 topic에 구독을 요청한다면 아직 A가 만든 구독자 리스트가 map에 저장되지 않았기 때문에 B 또한 구독자 리스트를 생성하게 될것이다.
1
2
3
4
5
6
7
8
9
void subscribe(String topic, Subscriber subscriber) {
if (!subscribersMap.containsKey(topic)) { <---- B
ArrayList<Subscriber> subscriberList = new ArrayList<>();
subscriberList.add(subscriber); <----- A
subscribersMap.put(topic, subscriberList);
} else {
subscribersMap.get(topic).add(subscriber);
}
}
이후 A가 구독자 리스트를 만들고 map에 저장하고 B가 다시 map에 저장할때는 A가 만든 구독자 리스트 위에 덮어 씌어져 최종적으로 A,B가 구독한 topic의 구독자 리스트에는 A는 없을 것이다.
이런 동시성 문제를 해결하기 위해 구독 메서드에 synchronized
를 붙여 하나의 쓰레드만 작업을 할 수 있도록 수정해주었다.
1
2
3
4
5
6
7
8
9
synchronized void subscribe(String topic, Subscriber subscriber) {
if (!subscribersMap.containsKey(topic)) {
ArrayList<Subscriber> subscriberList = new ArrayList<>();
subscriberList.add(subscriber);
subscribersMap.put(topic, subscriberList);
} else {
subscribersMap.get(topic).add(subscriber);
}
}
하지만 이렇게 메서드 전체에 동기화를 걸어버린다면 엄청 오랜 시간이 걸릴것이다.
그래서 특정 topic에 대한 구독 기능에만 동기화를 걸어두도록 구현했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ConcurrentHashMap<String, Object> topicLocks = new ConcurrentHashMap<>();
private Object getLockTopic(String topic){
if(!topicLocks.containsKey(topic)) {
topicLocks.put(topic, new Object());
}
return topicLocks.get(topic);
}
void subscribe(String topic, Subscriber subscriber) {
synchronized (getLockTopic(topic)) {
if (!subscribersMap.containsKey(topic)) {
ArrayList<Subscriber> subscriberList = new ArrayList<>();
subscriberList.add(subscriber);
subscribersMap.put(topic, subscriberList);
} else {
subscribersMap.get(topic).add(subscriber);
}
}
}
topicLocks
:ConcurrentHashMap
을 사용하여 각 주제에 대해 독립적인 동기화 객체를 관리한다.getLockTopic(String topic)
:topicLocks
에 topic에 대한 동기화 객체를 반환하는 메서드이다.synchronized (getLockTopic(topic))
:synchronized
블럭을 사용해 해당 topic에 대한 동기화 객체를 가져오고 그 topic에 대한 동기화를 수행한다.
전체 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class NotificationService {
HashMap<String, List<Subscriber>> subscribersMap = new HashMap<>();
ConcurrentHashMap<String, Object> topicLocks = new ConcurrentHashMap<>(); // 동기화 객체
void publish(String topic, String message) {
// 입력값 예외 처리
if (topic == null || message == null) {
throw new IllegalArgumentException("Topic이나 메시지가 null일 수 없습니다.");
}
if(subscribersMap.containsKey(topic)) {
List<Subscriber> subscribers = subscribersMap.get(topic);
subscribers.stream().parallel().forEach(subscriber -> { // 병렬 처리
try{ // 외부 메서드 예외 처리
subscriber.send(message);
}catch(Exception e){
e.printStackTrace();
}
});
}else{
System.out.println("해당 topic을 구독한 사람이 없습니다.");
}
}
void subscribe(String topic, Subscriber subscriber) {
// 입력값 예외 처리
if (topic == null || Subscriber == null) {
throw new IllegalArgumentException("Topic이나 구독자가 null일 수 없습니다.");
}
synchronized (getLockTopic(topic)) { // topic별 동기화 처리
if (!subscribersMap.containsKey(topic)) {
ArrayList<Subscriber> subscriberList = new ArrayList<>();
subscriberList.add(subscriber);
subscribersMap.put(topic, subscriberList);
} else {
subscribersMap.get(topic).add(subscriber);
}
}
}
private Object getLockTopic(String topic){
if(!topicLocks.containsKey(topic)) {
topicLocks.put(topic, new Object());
}
return topicLocks.get(topic);
}
}