책 클라우드 네이티브 go를 참고했습니다.
https://github.com/cloud-native-go/examples (코드)
분산 클라우드 네이티브의 도전 과제 중 하나는 '상태를 어떻게 유지할 것인가' 입니다.
1. 트랜잭션 로그 파일에 상태 저장. 리소스가 변경될 때마다 파일 기반의 트랜잭션 로그를 이용하여 기록. 트랜잭션 로그는 서비스가 트랜잭션을 다시 수행하여 원래의 상태를 쉽게 만들 수 있도록 함.
2. 외부 데이터베이스에 상태 저장. 트랜잭션 로그 저장을 외부 db에 함.
1. 트랜잭션 로그
트랜잭션 로그는 변경사항의 기록을 유지하는 로그 파일.
서비스에 문제 생기거나, 재시작 될 경우 트랜잭션을 다시 수행해서 복원할 수 있게 해줍니다.
트랜잭션 로그는 이벤트를 오래된 것부터 최신 순으로 읽어서 재연할 것입니다. 그래서 트랜잭션 로그에 필요한 것들은
- 순번(Sequence number) : 쓰기 작업에 대한 고유 ID. 항상 증가.
- 이벤트 타입 : PUT, DELETE 등 이벤트 타입들
- 키 : 트랜잭션에 의해 영향 받는 키(앞의 rest api를 구현할 때 key - value 저장소를 이용했으므로)
- 값 : 이벤트가 PUT이라면, 어느 데이터를 넣었는 지 알아야 하기 때문에 value가 필요.
2. 파일 기반 트랜잭션 로그
트랜잭션 로그를 파일에 저장하면,
장점 :
- 다운스트림에 대한 의존성 없음. (외부 서비스에 대한 의존성이 없음)
- 기술적인 직관성(구현 간단함0
단점:
- 확장 어려움(노드 간 상태 정보를 분산하여 공유하기 위한 방법 필요)]
- 파일 증가(파일 크기 제한을 두어야 함)
이러한 장단점이 있습니다.
package main
import (
"bufio"
"fmt"
"net/url"
"os"
"sync"
)
type EventType byte
//java의 enum 처럼 사용 가능.
const (
_ = iota // iota == 0; ignore this value
EventDelete EventType = iota // iota == 1
EventPut // iota == 2; implicitly repeat last
)
type Event struct {
Sequence uint64
EventType EventType
Key string
Value string
}
// io.Writer를 이용하는 대신, 채널을 이용. io.Writer는 싱글 스레드 기반이라 io 시간에 많은 시간이 걸린다고 함.
type TransactionLogger struct {
events chan<- Event // Write-only channel for sending events
errors <-chan error
lastSequence uint64 // The last used event sequence number
file *os.File // The location of the transaction log
wg *sync.WaitGroup //포인터로 선언하지 않으면 복사본을 이용하게 됨. 동기화가 제대로 일어나지 않음.
}
func (l *TransactionLogger) WritePut(key, value string) {
l.wg.Add(1)
l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}
func (l *TransactionLogger) WriteDelete(key string) {
l.wg.Add(1)
l.events <- Event{EventType: EventDelete, Key: key}
}
func (l *TransactionLogger) Err() <-chan error {
return l.errors
}
//생성자
func NewTransactionLogger(filename string) (*TransactionLogger, error) {
var err error
var l TransactionLogger = TransactionLogger{wg: &sync.WaitGroup{}}
// Open the transaction log file for reading and writing.
l.file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
if err != nil {
return nil, fmt.Errorf("cannot open transaction log file: %w", err)
}
return &l, nil
}
func (l *TransactionLogger) Run() {
//채널 생성. 16개까지 받을 수 있음. 16개 이상으로 넣으려 하면 blocking 됨.
events := make(chan Event, 16)
l.events = events
errors := make(chan error, 1)
l.errors = errors
// Start retrieving events from the events channel and writing them
// to the transaction log
go func() {
for e := range events {
l.lastSequence++
_, err := fmt.Fprintf(
l.file,
"%d\t%d\t%s\t%s\n",
l.lastSequence, e.EventType, e.Key, e.Value)
if err != nil {
errors <- fmt.Errorf("cannot write to log file: %w", err)
}
l.wg.Done()
}
}()
}
func (l *TransactionLogger) Wait() {
l.wg.Wait()
}
func (l *TransactionLogger) Close() error {
l.Wait()
if l.events != nil {
close(l.events) // Terminates Run loop and goroutine
}
return l.file.Close()
}
//로그 파일 읽기
func (l *TransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
scanner := bufio.NewScanner(l.file)
//포인터가 아닌 Event 값 자체를 받는 채널.
outEvent := make(chan Event)
outError := make(chan error, 1)
go func() {
var e Event
//끝날 때, 채널을 닫음으로써 더 이상 데이터를 못 넣게 함.(읽을 수는 있음)
defer close(outEvent)
defer close(outError)
for scanner.Scan() {
line := scanner.Text()
fmt.Sscanf(
line, "%d\t%d\t%s\t%s",
&e.Sequence, &e.EventType, &e.Key, &e.Value)
//일련 번호 증가 확인.
if l.lastSequence >= e.Sequence {
outError <- fmt.Errorf("transaction numbers out of sequence")
return
}
uv, err := url.QueryUnescape(e.Value)
if err != nil {
outError <- fmt.Errorf("value decoding failure: %w", err)
return
}
e.Value = uv
l.lastSequence = e.Sequence
//매번 event 값을 새로 생성하지 않음.
//즉, 복사본을 넣어주는 것임.
outEvent <- e
}
if err := scanner.Err(); err != nil {
outError <- fmt.Errorf("transaction log read failure: %w", err)
}
}()
return outEvent, outError
}
wait group을 포인터로 선언하지 않으면, 복사본을 이용하기 때문에 동기화가 제대로 일어나지 않습니다.
아래가 예시 입니다.
func main() {
var wg sync.WaitGroup
// Correct usage with pointer
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
// Do some work
}(&wg)
wg.Wait()
// Incorrect usage with value (will cause panic)
wg.Add(1)
go func(wg sync.WaitGroup) {
defer wg.Done() // This modifies a copy, not the original
// Do some work
}(wg)
wg.Wait() // This will not wait for the goroutine because the Add was done on a copy
}
이런 식으로 데드락이 발생하게 됩니다.
이를 이제 이전 포스트에서 구현한 rest api 에 접목시킬 수 있습니다.
package main
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
)
var transact *TransactionLogger
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method, r.RequestURI)
next.ServeHTTP(w, r)
})
}
func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}
func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()
err = Put(key, string(value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WritePut(key, string(value))
w.WriteHeader(http.StatusCreated)
log.Printf("PUT key=%s value=%s\n", key, string(value))
}
func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := Get(key)
if errors.Is(err, ErrorNoSuchKey) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte(value))
log.Printf("GET key=%s\n", key)
}
func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
err := Delete(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WriteDelete(key)
log.Printf("DELETE key=%s\n", key)
}
//추가된 부분
func initializeTransactionLog() error {
var err error
//트랜잭션 로그 파일 경로
transact, err = NewTransactionLogger("/tmp/transactions.log")
if err != nil {
return fmt.Errorf("failed to create transaction logger: %w", err)
}
//파일 읽어오는 부분. 채널 반환함.
events, errors := transact.ReadEvents()
count, ok, e := 0, true, Event{}
for ok && err == nil {
select {
case err, ok = <-errors:
//파일에서 읽은 이벤트. e는 Event. ok 는 채널 끝 여부 알려줌.
case e, ok = <-events:
//기록된 이벤트들 읽고 실행.
switch e.EventType {
case EventDelete: // Got a DELETE event!
err = Delete(e.Key)
count++
case EventPut: // Got a PUT event!
err = Put(e.Key, e.Value)
count++
}
}
}
log.Printf("%d events replayed\n", count)
//이후 요청들(put, delete)에 대해서 트랜잭션 로그를 남김
transact.Run()
return err
}
func main() {
// Initializes the transaction log and loads existing data, if any.
// Blocks until all data is read.
err := initializeTransactionLog()
if err != nil {
panic(err)
}
// Create a new mux router
r := mux.NewRouter()
r.Use(loggingMiddleware)
r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")
r.HandleFunc("/v1", notAllowedHandler)
r.HandleFunc("/v1/{key}", notAllowedHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
실제로 put을 해봤습니다.
첫 번째는 sequence id, 두 번째는 event type(1: delete, 2: put), 세 번째는 key, 네 번째는 value 입니다.
a : hi, b: hello, c: hello,d : "" 를 put 한 것을 확인할 수 있습니다.
그리고 종료 후, 다시 실행해봤습니다.
key a에 대한 데이터를 조회하면,
hi를 가져오는 것을 볼 수 있습니다.
또한 put 작업을 하면, trasactions.log 파일에 뒤에 7부터 들어가는 것을 확인할 수 있습니다.
이 코드의 개선해야 할 점으로는
- 파일을 안전하게 닫기 위한 close 미존재
- 쓰기 버퍼에 이벤트 남아 있을 때 서비스 종료되면, 이벤트가 사라짐.
- 키와 값의 크기가 정해져 있지 않음.
- 로그가 평문으로 저장되기 때문에, 디스크 공간을 많이 차지하게 됨.
- 로그 영원히 보관.
이 있습니다
3. 데이터베이스에 상태 저장하기
데이터베이스에 저장할 경우,
장점
- 분산된 state에 대해 덜 걱정해도 됨. -> 클라우드 네이티브에 가깝다고 함.
- 용이한 확장성. 복제본들 사이에 공유할 데이터가 없으므로 확장이 더 쉬움.
단점
- 병목 현상 발생. 만약 여러 복제본이 db에서 값을 읽으면 문제될 수 있음.
- 업스트림 의존성 발생. 외부 시스템에 대한 의존성 생김.
package main
import (
"database/sql"
"fmt"
"net/url"
"sync"
_ "github.com/lib/pq" // init 함수를 자동으로 호출하지만, 실제 패키지는 사용하지 않음.
)
//디비 연결
type PostgresDbParams struct {
dbName string
host string
user string
password string
}
//파일이랑 다른 점은 Db 연결 추가, 마지막 seq number 관리 없앰.
type PostgresTransactionLogger struct {
events chan<- Event
errors <-chan error
db *sql.DB
wg *sync.WaitGroup
}
func (l *PostgresTransactionLogger) WritePut(key, value string) {
l.wg.Add(1)
l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}
func (l *PostgresTransactionLogger) WriteDelete(key string) {
l.wg.Add(1)
l.events <- Event{EventType: EventDelete, Key: key}
}
func (l *PostgresTransactionLogger) Err() <-chan error {
return l.errors
}
func (l *PostgresTransactionLogger) LastSequence() uint64 {
return 0
}
func (l *PostgresTransactionLogger) Run() {
events := make(chan Event, 16) // Make an events channel
l.events = events
errors := make(chan error, 1) // Make an errors channel
l.errors = errors
go func() { // The INSERT query
//db에서 sequence number 관리
query := `INSERT INTO transactions
(event_type, key, value)
VALUES ($1, $2, $3)`
for e := range events { // Retrieve the next Event
_, err := l.db.Exec( // Execute the INSERT query
query,
e.EventType, e.Key, e.Value)
if err != nil {
errors <- err
}
l.wg.Done()
}
}()
}
func (l *PostgresTransactionLogger) Wait() {
l.wg.Wait()
}
func (l *PostgresTransactionLogger) Close() error {
l.wg.Wait()
if l.events != nil {
close(l.events) // Terminates Run loop and goroutine
}
return l.db.Close()
}
func (l *PostgresTransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
outEvent := make(chan Event) // An unbuffered events channel
outError := make(chan error, 1) // A buffered errors channel
//unbuffer 이용할 경우, consume 부분이 준비될 때까지 produce하지 않음.
query := "SELECT sequence, event_type, key, value FROM transactions"
go func() {
defer close(outEvent)
defer close(outError)
rows, err := l.db.Query(query)
if err != nil {
outError <- fmt.Errorf("sql query error: %w", err)
return
}
defer rows.Close() // 커넥션 종료.
var e Event // Create an empty Event
for rows.Next() { // Iterate over the rows
err = rows.Scan(
&e.Sequence, &e.EventType,
&e.Key, &e.Value)
if err != nil {
outError <- err
return
}
outEvent <- e // Send e to the channel
}
err = rows.Err()
if err != nil {
outError <- fmt.Errorf("transaction log read failure: %w", err)
}
}()
return outEvent, outError
}
func (l *PostgresTransactionLogger) verifyTableExists() (bool, error) {
const table = "transactions"
var result string
rows, err := l.db.Query(fmt.Sprintf("SELECT to_regclass('public.%s');", table))
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() && result != table {
rows.Scan(&result)
}
return result == table, rows.Err()
}
func (l *PostgresTransactionLogger) createTable() error {
var err error
createQuery := `CREATE TABLE transactions (
sequence BIGSERIAL PRIMARY KEY,
event_type SMALLINT,
key TEXT,
value TEXT
);`
_, err = l.db.Exec(createQuery)
if err != nil {
return err
}
return nil
}
func NewPostgresTransactionLogger(param PostgresDbParams) (TransactionLogger, error) {
connStr := fmt.Sprintf("host=%s dbname=%s user=%s password=%s sslmode=disable",
param.host, param.dbName, param.user, param.password)
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("failed to create db value: %w", err)
}
//대부분의 드라이버가 핑을 날릴 때 커넥션을 맺음.
err = db.Ping() // Test the databases connection
if err != nil {
return nil, fmt.Errorf("failed to opendb connection: %w", err)
}
tl := &PostgresTransactionLogger{db: db, wg: &sync.WaitGroup{}}
exists, err := tl.verifyTableExists()
if err != nil {
return nil, fmt.Errorf("failed to verify table exists: %w", err)
}
//테이블 존재하지 않으면 생성
if !exists {
if err = tl.createTable(); err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
}
return tl, nil
}
package main
import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/gorilla/mux"
)
var transact TransactionLogger
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Println(r.Method, r.RequestURI)
next.ServeHTTP(w, r)
})
}
func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}
func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer r.Body.Close()
err = Put(key, string(value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WritePut(key, string(value))
w.WriteHeader(http.StatusCreated)
log.Printf("PUT key=%s value=%s\n", key, string(value))
}
func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
value, err := Get(key)
if errors.Is(err, ErrorNoSuchKey) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte(value))
log.Printf("GET key=%s\n", key)
}
func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
key := vars["key"]
err := Delete(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
transact.WriteDelete(key)
log.Printf("DELETE key=%s\n", key)
}
func initializeTransactionLog() error {
var err error
transact, err = NewPostgresTransactionLogger(PostgresDbParams{
host: "localhost",
dbName: "test",
user: "test",
password: "test",
})
if err != nil {
return fmt.Errorf("failed to create transaction logger: %w", err)
}
events, errors := transact.ReadEvents()
count, ok, e := 0, true, Event{}
for ok && err == nil {
select {
case err, ok = <-errors:
case e, ok = <-events:
switch e.EventType {
case EventDelete: // Got a DELETE event!
err = Delete(e.Key)
count++
case EventPut: // Got a PUT event!
err = Put(e.Key, e.Value)
count++
}
}
}
log.Printf("%d events replayed\n", count)
transact.Run()
go func() {
for err := range transact.Err() {
log.Print(err)
}
}()
return err
}
func main() {
// Initializes the transaction log and loads existing data, if any.
// Blocks until all data is read.
err := initializeTransactionLog()
if err != nil {
//함수 멈추고, defer 함수 실행.
panic(err)
}
// Create a new mux router
r := mux.NewRouter()
r.Use(loggingMiddleware)
r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")
r.HandleFunc("/v1", notAllowedHandler)
r.HandleFunc("/v1/{key}", notAllowedHandler)
log.Fatal(http.ListenAndServe(":8080", r))
}
파일을 이용한 것과 큰 차이는 없습니다.
실행했을 경우, put(2), delete(1)에 대한 작업을 잘 저장하는 것을 확인할 수 있습니다.
또한 재실행하고 key a를 조회하면,
맨 마지막에 작업했던 값인 bye를 읽는 것을 확인할 수 있습니다.
'BackEnd > go' 카테고리의 다른 글
[GO] Rest API 구현 (0) | 2024.07.06 |
---|---|
[Go] 동시성 패턴 future (0) | 2024.06.29 |
[Go] struct 와 포인터(자바 클래스와 비교) (0) | 2024.06.25 |
[GO] go를 이용한 안정성 패턴 구현(서킷) (0) | 2024.06.12 |