리스트로 이루어진 데이터가 입력으로 오면, 이 데이터를 mongoDB에서 조회해서 가공한 후에 지도에 출력하는 프로젝트 입니다.
문제는 성능이 나오지 않았습니다.
42개의 데이터를 조회 및 처리하는 데에 5초나 걸렸습니다.
2. 원인
mongoDB가 해외에 위치해 있습니다. 그러다보니 query를 날리고 응답을 받는데에 시간이 오래 걸릴 것이라고 생각했습니다.
특히 이런 IO 작업을 싱글 쓰레드로 처리하다 보니 속도가 느려졌다고 생각했었습니다.
3. 해결
그래서 싱글 쓰레드로 동작하던 것을 멀티 쓰레드로 변경했습니다.
python의 GIL이 있지만, mongoDB에서 데이터를 가져오는 IO 작업 부분을 멀티 쓰레드로 작업하기 때문에 큰 상관은 없습니다.
약 max_worker를 20개로 설정해서 실험을 해봤습니다.
기존에 5초가 걸리던 작업이 약 2.3초까지 줄어든 것을 확인했습니다.
42개의 데이터를 가져오는 데 2.3초나 걸리는 것은 성능이 많이 느리다고 생각했습니다.
그래서 다른 해결 방안을 찾고 있었습니다.
이번에는 데이터의 개수가 약 4500개를 조회하도록 하고 max_worker를 100까지 증가시켰습니다.
무려 24초나 걸렸습니다.
이번엔 mongoDB에 index를 생성해서 조회를 해봤습니다.
24초 -> 16초까지 성능은 개선이 되었습니다. 무려 8초나 줄었기 때문에 여기서 만족하고 다른 일을 처리하러 갔습니다.
그러던 중, kafka의 partitioner가 생각이 났습니다.
kafka의 경우 produce를 할 때, partitioner의 전략(sticky, round robin)에 따라 데이터를 어느 partition에 데이터를 넣을 지 결정합니다. 그리고 이를 바로 kafka에 전송하는 것이 아니라, 설정한 batch 개수 만큼 쌓거나 일정 시간 지난 후에 한 번에 보내게 됩니다.
이렇게 한 이유가 네트워크 통신 비용을 줄이려고 그랬다는 내용이 갑자기 떠올랐습니다.
그래서 데이터 개수만큼의 쿼리를 전송하는 것이 아니라, 하나의 쿼리로 여러 데이터를 조회하도록 수정했습니다.
무려 16초에서 3.7초까지 성능이 개선 되었습니다.
특히 해외에 mongoDB가 있다보니 여러 쿼리를 날리고 응답을 받는 것이 성능에 큰 이슈가 있었던 것으로 파악이 됩니다.
결론
개선 방법
성능
멀티 쓰레드로 조회
24 s
mongoDB index 생성
16 s
싱글 쓰레드로 쿼리 하나로 여러 데이터 조회하도록 수정(Bulk)
3.7s
실제 다른 기능도 멀티 쓰레드로 구현이 되어 있었습니다. 데이터를 많이 조회하지도 않는데, 7초나 소모됩니다.
이 부분을 맡은 팀원 분에게도 해당 내용 공유를 드렸습니다.
이 기능까지 성능이 개선이 된다면, 전체 프로세스가 기존 31초에서 약 4초까지 성능이 개선될 것으로 예상이 됩니다.
4. 느낀 점
당연히 batch로 데이터를 전송하는 것이 성능이 좋다는 것은 알고 있었습니다. 하지만 이 당시에 제 코드를 작성하고 있을 때는 여러 쿼리를 날리는 것이 문제가 될 것이라고 생각하지 못했습니다. 머리로는 알고 있었지만, 응용은 하지 못했었습니다.
머리로만 알던 지식을 완전히 제 것으로 만들기 가장 쉬운 방법은 프로젝트 경험이라는 것을 느낄 수 있었습니다.
(그래서 개발 동아리가 끝난 후에도, 팀원을 구해서 사이드 프로젝트를 계속 유지보수할 생각입니다.)
1. 트랜잭션 로그 파일에 상태 저장. 리소스가 변경될 때마다 파일 기반의 트랜잭션 로그를 이용하여 기록. 트랜잭션 로그는 서비스가 트랜잭션을 다시 수행하여 원래의 상태를 쉽게 만들 수 있도록 함.
2. 외부 데이터베이스에 상태 저장. 트랜잭션 로그 저장을 외부 db에 함.
1. 트랜잭션 로그
트랜잭션 로그는 변경사항의 기록을 유지하는 로그 파일.
서비스에 문제 생기거나, 재시작 될 경우 트랜잭션을 다시 수행해서 복원할 수 있게 해줍니다.
트랜잭션 로그는 이벤트를 오래된 것부터 최신 순으로 읽어서 재연할 것입니다. 그래서 트랜잭션 로그에 필요한 것들은
- 순번(Sequence number) : 쓰기 작업에 대한 고유 ID. 항상 증가.
- 이벤트 타입 : PUT, DELETE 등 이벤트 타입들
- 키 : 트랜잭션에 의해 영향 받는 키(앞의 rest api를 구현할 때 key - value 저장소를 이용했으므로)
- 값 : 이벤트가 PUT이라면, 어느 데이터를 넣었는 지 알아야 하기 때문에 value가 필요.
2. 파일 기반 트랜잭션 로그
트랜잭션 로그를 파일에 저장하면,
장점 :
- 다운스트림에 대한 의존성 없음. (외부 서비스에 대한 의존성이 없음)
- 기술적인 직관성(구현 간단함0
단점:
- 확장 어려움(노드 간 상태 정보를 분산하여 공유하기 위한 방법 필요)]
- 파일 증가(파일 크기 제한을 두어야 함)
이러한 장단점이 있습니다.
package main
import (
"bufio"
"fmt"
"net/url"
"os"
"sync"
)
type EventType byte
//java의 enum 처럼 사용 가능.
const (
_ = iota // iota == 0; ignore this value
EventDelete EventType = iota // iota == 1
EventPut // iota == 2; implicitly repeat last
)
type Event struct {
Sequence uint64
EventType EventType
Key string
Value string
}
// io.Writer를 이용하는 대신, 채널을 이용. io.Writer는 싱글 스레드 기반이라 io 시간에 많은 시간이 걸린다고 함.
type TransactionLogger struct {
events chan<- Event // Write-only channel for sending events
errors <-chan error
lastSequence uint64 // The last used event sequence number
file *os.File // The location of the transaction log
wg *sync.WaitGroup //포인터로 선언하지 않으면 복사본을 이용하게 됨. 동기화가 제대로 일어나지 않음.
}
func (l *TransactionLogger) WritePut(key, value string) {
l.wg.Add(1)
l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}
func (l *TransactionLogger) WriteDelete(key string) {
l.wg.Add(1)
l.events <- Event{EventType: EventDelete, Key: key}
}
func (l *TransactionLogger) Err() <-chan error {
return l.errors
}
//생성자
func NewTransactionLogger(filename string) (*TransactionLogger, error) {
var err error
var l TransactionLogger = TransactionLogger{wg: &sync.WaitGroup{}}
// Open the transaction log file for reading and writing.
l.file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, fmt.Errorf("cannot open transaction log file: %w", err)
}
return &l, nil
}
func (l *TransactionLogger) Run() {
//채널 생성. 16개까지 받을 수 있음. 16개 이상으로 넣으려 하면 blocking 됨.
events := make(chan Event, 16)
l.events = events
errors := make(chan error, 1)
l.errors = errors
// Start retrieving events from the events channel and writing them
// to the transaction log
go func() {
for e := range events {
l.lastSequence++
_, err := fmt.Fprintf(
l.file,
"%d\t%d\t%s\t%s\n",
l.lastSequence, e.EventType, e.Key, e.Value)
if err != nil {
errors <- fmt.Errorf("cannot write to log file: %w", err)
}
l.wg.Done()
}
}()
}
func (l *TransactionLogger) Wait() {
l.wg.Wait()
}
func (l *TransactionLogger) Close() error {
l.Wait()
if l.events != nil {
close(l.events) // Terminates Run loop and goroutine
}
return l.file.Close()
}
//로그 파일 읽기
func (l *TransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
scanner := bufio.NewScanner(l.file)
//포인터가 아닌 Event 값 자체를 받는 채널.
outEvent := make(chan Event)
outError := make(chan error, 1)
go func() {
var e Event
//끝날 때, 채널을 닫음으로써 더 이상 데이터를 못 넣게 함.(읽을 수는 있음)
defer close(outEvent)
defer close(outError)
for scanner.Scan() {
line := scanner.Text()
fmt.Sscanf(
line, "%d\t%d\t%s\t%s",
&e.Sequence, &e.EventType, &e.Key, &e.Value)
//일련 번호 증가 확인.
if l.lastSequence >= e.Sequence {
outError <- fmt.Errorf("transaction numbers out of sequence")
return
}
uv, err := url.QueryUnescape(e.Value)
if err != nil {
outError <- fmt.Errorf("value decoding failure: %w", err)
return
}
e.Value = uv
l.lastSequence = e.Sequence
//매번 event 값을 새로 생성하지 않음.
//즉, 복사본을 넣어주는 것임.
outEvent <- e
}
if err := scanner.Err(); err != nil {
outError <- fmt.Errorf("transaction log read failure: %w", err)
}
}()
return outEvent, outError
}
wait group을 포인터로 선언하지 않으면, 복사본을 이용하기 때문에 동기화가 제대로 일어나지 않습니다.
아래가 예시 입니다.
func main() {
var wg sync.WaitGroup
// Correct usage with pointer
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
// Do some work
}(&wg)
wg.Wait()
// Incorrect usage with value (will cause panic)
wg.Add(1)
go func(wg sync.WaitGroup) {
defer wg.Done() // This modifies a copy, not the original
// Do some work
}(wg)
wg.Wait() // This will not wait for the goroutine because the Add was done on a copy
}
이런 식으로 데드락이 발생하게 됩니다.
이를 이제 이전 포스트에서 구현한 rest api 에 접목시킬 수 있습니다.
package main
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
)
var transact *TransactionLogger
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method, r.RequestURI)
next.ServeHTTP(w, r)
})
}
func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}
func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()
err = Put(key, string(value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WritePut(key, string(value))
w.WriteHeader(http.StatusCreated)
log.Printf("PUT key=%s value=%s\n", key, string(value))
}
func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := Get(key)
if errors.Is(err, ErrorNoSuchKey) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte(value))
log.Printf("GET key=%s\n", key)
}
func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
err := Delete(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WriteDelete(key)
log.Printf("DELETE key=%s\n", key)
}
//추가된 부분
func initializeTransactionLog() error {
var err error
//트랜잭션 로그 파일 경로
transact, err = NewTransactionLogger("/tmp/transactions.log")
if err != nil {
return fmt.Errorf("failed to create transaction logger: %w", err)
}
//파일 읽어오는 부분. 채널 반환함.
events, errors := transact.ReadEvents()
count, ok, e := 0, true, Event{}
for ok && err == nil {
select {
case err, ok = <-errors:
//파일에서 읽은 이벤트. e는 Event. ok 는 채널 끝 여부 알려줌.
case e, ok = <-events:
//기록된 이벤트들 읽고 실행.
switch e.EventType {
case EventDelete: // Got a DELETE event!
err = Delete(e.Key)
count++
case EventPut: // Got a PUT event!
err = Put(e.Key, e.Value)
count++
}
}
}
log.Printf("%d events replayed\n", count)
//이후 요청들(put, delete)에 대해서 트랜잭션 로그를 남김
transact.Run()
return err
}
func main() {
// Initializes the transaction log and loads existing data, if any.
// Blocks until all data is read.
err := initializeTransactionLog()
if err != nil {
panic(err)
}
// Create a new mux router
r := mux.NewRouter()
r.Use(loggingMiddleware)
r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")
r.HandleFunc("/v1", notAllowedHandler)
r.HandleFunc("/v1/{key}", notAllowedHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
실제로 put을 해봤습니다.
첫 번째는 sequence id, 두 번째는 event type(1: delete, 2: put), 세 번째는 key, 네 번째는 value 입니다.
a : hi, b: hello, c: hello,d : "" 를 put 한 것을 확인할 수 있습니다.
그리고 종료 후, 다시 실행해봤습니다.
key a에 대한 데이터를 조회하면,
hi를 가져오는 것을 볼 수 있습니다.
또한 put 작업을 하면, trasactions.log 파일에 뒤에 7부터 들어가는 것을 확인할 수 있습니다.
이 코드의 개선해야 할 점으로는
파일을 안전하게 닫기 위한 close 미존재
쓰기 버퍼에 이벤트 남아 있을 때 서비스 종료되면, 이벤트가 사라짐.
키와 값의 크기가 정해져 있지 않음.
로그가 평문으로 저장되기 때문에, 디스크 공간을 많이 차지하게 됨.
로그 영원히 보관.
이 있습니다
3. 데이터베이스에 상태 저장하기
데이터베이스에 저장할 경우,
장점
- 분산된 state에 대해 덜 걱정해도 됨. -> 클라우드 네이티브에 가깝다고 함.
- 용이한 확장성. 복제본들 사이에 공유할 데이터가 없으므로 확장이 더 쉬움.
단점
- 병목 현상 발생. 만약 여러 복제본이 db에서 값을 읽으면 문제될 수 있음.
- 업스트림 의존성 발생. 외부 시스템에 대한 의존성 생김.
package main
import (
"database/sql"
"fmt"
"net/url"
"sync"
_ "github.com/lib/pq" // init 함수를 자동으로 호출하지만, 실제 패키지는 사용하지 않음.
)
//디비 연결
type PostgresDbParams struct {
dbName string
host string
user string
password string
}
//파일이랑 다른 점은 Db 연결 추가, 마지막 seq number 관리 없앰.
type PostgresTransactionLogger struct {
events chan<- Event
errors <-chan error
db *sql.DB
wg *sync.WaitGroup
}
func (l *PostgresTransactionLogger) WritePut(key, value string) {
l.wg.Add(1)
l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}
func (l *PostgresTransactionLogger) WriteDelete(key string) {
l.wg.Add(1)
l.events <- Event{EventType: EventDelete, Key: key}
}
func (l *PostgresTransactionLogger) Err() <-chan error {
return l.errors
}
func (l *PostgresTransactionLogger) LastSequence() uint64 {
return 0
}
func (l *PostgresTransactionLogger) Run() {
events := make(chan Event, 16) // Make an events channel
l.events = events
errors := make(chan error, 1) // Make an errors channel
l.errors = errors
go func() { // The INSERT query
//db에서 sequence number 관리
query := `INSERT INTO transactions
(event_type, key, value)
VALUES ($1, $2, $3)`
for e := range events { // Retrieve the next Event
_, err := l.db.Exec( // Execute the INSERT query
query,
e.EventType, e.Key, e.Value)
if err != nil {
errors <- err
}
l.wg.Done()
}
}()
}
func (l *PostgresTransactionLogger) Wait() {
l.wg.Wait()
}
func (l *PostgresTransactionLogger) Close() error {
l.wg.Wait()
if l.events != nil {
close(l.events) // Terminates Run loop and goroutine
}
return l.db.Close()
}
func (l *PostgresTransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
outEvent := make(chan Event) // An unbuffered events channel
outError := make(chan error, 1) // A buffered errors channel
//unbuffer 이용할 경우, consume 부분이 준비될 때까지 produce하지 않음.
query := "SELECT sequence, event_type, key, value FROM transactions"
go func() {
defer close(outEvent)
defer close(outError)
rows, err := l.db.Query(query)
if err != nil {
outError <- fmt.Errorf("sql query error: %w", err)
return
}
defer rows.Close() // 커넥션 종료.
var e Event // Create an empty Event
for rows.Next() { // Iterate over the rows
err = rows.Scan(
&e.Sequence, &e.EventType,
&e.Key, &e.Value)
if err != nil {
outError <- err
return
}
outEvent <- e // Send e to the channel
}
err = rows.Err()
if err != nil {
outError <- fmt.Errorf("transaction log read failure: %w", err)
}
}()
return outEvent, outError
}
func (l *PostgresTransactionLogger) verifyTableExists() (bool, error) {
const table = "transactions"
var result string
rows, err := l.db.Query(fmt.Sprintf("SELECT to_regclass('public.%s');", table))
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() && result != table {
rows.Scan(&result)
}
return result == table, rows.Err()
}
func (l *PostgresTransactionLogger) createTable() error {
var err error
createQuery := `CREATE TABLE transactions (
sequence BIGSERIAL PRIMARY KEY,
event_type SMALLINT,
key TEXT,
value TEXT
);`
_, err = l.db.Exec(createQuery)
if err != nil {
return err
}
return nil
}
func NewPostgresTransactionLogger(param PostgresDbParams) (TransactionLogger, error) {
connStr := fmt.Sprintf("host=%s dbname=%s user=%s password=%s sslmode=disable",
param.host, param.dbName, param.user, param.password)
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("failed to create db value: %w", err)
}
//대부분의 드라이버가 핑을 날릴 때 커넥션을 맺음.
err = db.Ping() // Test the databases connection
if err != nil {
return nil, fmt.Errorf("failed to opendb connection: %w", err)
}
tl := &PostgresTransactionLogger{db: db, wg: &sync.WaitGroup{}}
exists, err := tl.verifyTableExists()
if err != nil {
return nil, fmt.Errorf("failed to verify table exists: %w", err)
}
//테이블 존재하지 않으면 생성
if !exists {
if err = tl.createTable(); err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
}
return tl, nil
}
package main
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
)
var transact TransactionLogger
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method, r.RequestURI)
next.ServeHTTP(w, r)
})
}
func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}
func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()
err = Put(key, string(value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WritePut(key, string(value))
w.WriteHeader(http.StatusCreated)
log.Printf("PUT key=%s value=%s\n", key, string(value))
}
func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := Get(key)
if errors.Is(err, ErrorNoSuchKey) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte(value))
log.Printf("GET key=%s\n", key)
}
func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
err := Delete(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WriteDelete(key)
log.Printf("DELETE key=%s\n", key)
}
func initializeTransactionLog() error {
var err error
transact, err = NewPostgresTransactionLogger(PostgresDbParams{
host: "localhost",
dbName: "test",
user: "test",
password: "test",
})
if err != nil {
return fmt.Errorf("failed to create transaction logger: %w", err)
}
events, errors := transact.ReadEvents()
count, ok, e := 0, true, Event{}
for ok && err == nil {
select {
case err, ok = <-errors:
case e, ok = <-events:
switch e.EventType {
case EventDelete: // Got a DELETE event!
err = Delete(e.Key)
count++
case EventPut: // Got a PUT event!
err = Put(e.Key, e.Value)
count++
}
}
}
log.Printf("%d events replayed\n", count)
transact.Run()
go func() {
for err := range transact.Err() {
log.Print(err)
}
}()
return err
}
func main() {
// Initializes the transaction log and loads existing data, if any.
// Blocks until all data is read.
err := initializeTransactionLog()
if err != nil {
//함수 멈추고, defer 함수 실행.
panic(err)
}
// Create a new mux router
r := mux.NewRouter()
r.Use(loggingMiddleware)
r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")
r.HandleFunc("/v1", notAllowedHandler)
r.HandleFunc("/v1/{key}", notAllowedHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
파일을 이용한 것과 큰 차이는 없습니다.
실행했을 경우, put(2), delete(1)에 대한 작업을 잘 저장하는 것을 확인할 수 있습니다.
package main
import (
"log"
"net/http"
)
func helloGoHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello net/http!\n"))
}
func main() {
http.HandleFunc("/", helloGoHandler)
// 2번째 인자는 multiplexor. nil인 경우 DefaultServeMux 이용
//ListenAndServe는 에러가 발생한 경우에만 반환.
//log.Fatal은 에러가 발생할 경우, 에러메시지 반환하고 종료
log.Fatal(http.ListenAndServe(":8080", nil))
}
multiplexor로 패턴(경로 등)이랑 함수를 매핑해줍니다.
이러한 코드가 있고,
이런 식으로 등록을 하게 됩니다.
하지만 다른 mux를 사용하고 싶을 수 있습니다.
책에서는 gorllia/mux를 사용하는 예제를 설명합니다.
2. gorilla mux를 이용한 rest api
package main
import (
"log"
"net/http"
//따로 go init {...} 해서 go.mod 만들어야 함.
"github.com/gorilla/mux"
)
func helloGoHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello net/http!\n"))
}
func main() {
r := mux.NewRouter()
r.HandleFunc("/", helloGoHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
앞에서 사용한 net/http에서 추가된 기능으로는
경로에 /{key} 같은 형태를 넣을 수 있고 정규표현식도 사용할 수 있습니다.
또한 이러한 파라미터를 추출할 수 있습니다.
이를 적용한 코드가 아래 코드입니다.
package main
import (
"log"
"net/http"
"github.com/gorilla/mux"
)
func helloGoHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
var name string = vars["key"]
w.Write([]byte("Hello " + name + "\n"))
}
func main() {
r := mux.NewRouter()
r.HandleFunc("/{key}", helloGoHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
보시면 me를 출력한 것을 볼 수 있습니다.
쉽게 파라미터를 추출할 수 있는 것을 확인할 수 있습니다.
이를 이용해서 put과 get 기능을 만들어보겠습니다.
package main
import "errors"
//멀티 쓰레드로 동작하면?
var store = make(map[string]string)
var ErrorNoSuchKey = errors.New("no such key")
func Delete(key string) error {
delete(store, key)
return nil
}
func Get(key string) (string, error) {
value, ok := store[key]
if !ok {
return "", ErrorNoSuchKey
}
return value, nil
}
func Put(key string, value string) error {
store[key] = value
return nil
}
other work start
start async(future job)
done
hi
2009-11-10 23:00:03 +0000 UTC m=+3.000000001
end async
bye
end
end
2009-11-10 23:00:05 +0000 UTC m=+5.000000001
2009-11-10 23:00:05 +0000 UTC m=+5.000000001
이 반환 됩니다.
1,2 째줄은 동시에 수행이 됩니다.
3번 째줄은 3초 뒤에 출력이 됩니다.(기존 작업이 3초 sleep이기 때뮨에)
4,5번째 줄은 아까 값을 기다리는 로직이 있었습니다.(once.Do(...)) 이 부분의 로직을 호출했다는 의미로 hi와 호출 시간을 출력했습니다. 앞에서 3초 기다렸기 때문에, +3초 찍히는 걸 볼 수 있습니다.
6번째 줄은 future 내부의 실제 작업이 끝난 것을 의미하고, 7번째 줄은 once.Do(...) 에서 해당 작업이 끝난 것을 감지했을 경우 출력이 됩니다.
7,8 번 째 end가 두 번 출력된 이유는 main 함수에서 future.Result()를 2번 호출하고 있기 때문입니다. 하지만 6번째의 end async는 2번 호출되지 않은 것을 확인할 수 있습니다. 이는 sysn.once 를 이용하기 때문에 이러한 결과가 나왔습니다.
실제로 .Result() 를 처음 호출했을 경우는 future의 비동기 작업이 끝날 때 까지 기다리지만, 그 이후 호출에서는 한 번 더 작업을 실행하지 않고 이전에 처리한 결과값을 바로 리턴합니다.
그래서 9,10 번째도 똑같은 시간이 5초가 나온 것을 확인할 수 있습니다. 5초가 나온 이유는 future의 비동기 작업에서 5초 sleep했기 때문입니다.
예를 들면, db lock 이나 리소스 부족 등으로 인해 timeout이 발생할 수 있습니다. time out이 계속해서 발생하게 될 경우에는 뒤의 요청에 대해서도 처리를 하지 못합니다. 만약 MSA 라면, 모든 서비스들이 영향을 받게 되고 이는 전체 서비스 장애와 이어 집니다.
또한 클라이언트가 retry까지 하게 되면 네트워크 단에 부하가 엄청나게 발생합니다.
서킷 브레이커는 이러한 장애를 막을 수 있습니다.
에러가 몇 번 이상 발생했을 경우 서킷을 open 합니다. 그러면 기본 로직은 동작하지 않고 바로 error를 클라이언트에 반환합니다.
시간이 지난 후에는 half open으로 요청 중 일부만 받아들입니다.
만약 이 과정에서 에러가 발생하지 않았다면 서킷이 close되고 기존 로직이 동작하게 됩니다.
이를 go로 구현해보겠습니다.
package circuit
// context는 thread safe.
// 즉, 다수의 go routine이 접근해도 괜찮음.
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
type Circuit func(context.Context) (string, error)
func Breaker(circuit Circuit, threshold int) Circuit {
var last = time.Now()
var m sync.RWMutex
failures := 0
return func(ctx context.Context) (string, error) {
m.Lock() //mutex 를 이용한 read lock(기존 코드) -> Read lock 을 하면 서킷에 오픈이 안됨..?
count := failures - threshold
// 실패 개수가 더 많을 경우
if count >= 0 {
retryAt := last.Add(40 * time.MilliSecond)
if !time.Now().After(retryAt) {
m.Unlock()
fmt.Println("open")
return "open", errors.New("open")
}
}
m.Unlock()
res, err := circuit(ctx)
m.Lock()
defer m.Unlock() //끝날 때 lock
last = time.Now() //이를 위해 lock이 필요함.
if err != nil {
fmt.Println(err)
failures++ // need lock
return res, err
}
if failures > 0 {
failures -= 1
}
fmt.Println("200")
return res, nil
}
}