책 클라우드 네이티브 go를 참고했습니다.

https://github.com/cloud-native-go/examples (코드)

 

 

분산 클라우드 네이티브의 도전 과제 중 하나는 '상태를 어떻게 유지할 것인가' 입니다.

 

1. 트랜잭션 로그 파일에 상태 저장. 리소스가 변경될 때마다 파일 기반의 트랜잭션 로그를 이용하여 기록. 트랜잭션 로그는 서비스가 트랜잭션을 다시 수행하여 원래의 상태를 쉽게 만들 수 있도록 함.

2. 외부 데이터베이스에 상태 저장. 트랜잭션 로그 저장을 외부 db에 함.

 

1. 트랜잭션 로그

트랜잭션 로그는 변경사항의 기록을 유지하는 로그 파일.

서비스에 문제 생기거나, 재시작 될 경우 트랜잭션을 다시 수행해서 복원할 수 있게 해줍니다.

 

트랜잭션 로그는 이벤트를 오래된 것부터 최신 순으로 읽어서 재연할 것입니다. 그래서 트랜잭션 로그에 필요한 것들은

- 순번(Sequence number) : 쓰기 작업에 대한 고유 ID. 항상 증가.

- 이벤트 타입 : PUT, DELETE 등 이벤트 타입들

- 키 : 트랜잭션에 의해 영향 받는 키(앞의 rest api를 구현할 때 key - value 저장소를 이용했으므로)

- 값 : 이벤트가 PUT이라면, 어느 데이터를 넣었는 지 알아야 하기 때문에 value가 필요.

 

 

2. 파일 기반 트랜잭션 로그

 

트랜잭션 로그를 파일에 저장하면,

장점 : 

  - 다운스트림에 대한 의존성 없음. (외부 서비스에 대한 의존성이 없음)

  - 기술적인 직관성(구현 간단함0

 

단점:

  - 확장 어려움(노드 간 상태 정보를 분산하여 공유하기 위한 방법 필요)]

  - 파일 증가(파일 크기 제한을 두어야 함)

 

이러한 장단점이 있습니다.

 

package main

import (
	"bufio"
	"fmt"
	"net/url"
	"os"
	"sync"
)

type EventType byte

//java의 enum 처럼 사용 가능.
const (
	_                     = iota // iota == 0; ignore this value
	EventDelete EventType = iota // iota == 1
	EventPut                     // iota == 2; implicitly repeat last
)

type Event struct {
	Sequence  uint64
	EventType EventType
	Key       string
	Value     string
}


// io.Writer를 이용하는 대신, 채널을 이용. io.Writer는 싱글 스레드 기반이라 io 시간에 많은 시간이 걸린다고 함.
type TransactionLogger struct {
	events       chan<- Event // Write-only channel for sending events
	errors       <-chan error
	lastSequence uint64   // The last used event sequence number
	file         *os.File // The location of the transaction log
	wg           *sync.WaitGroup //포인터로 선언하지 않으면 복사본을 이용하게 됨. 동기화가 제대로 일어나지 않음.
}

func (l *TransactionLogger) WritePut(key, value string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}

func (l *TransactionLogger) WriteDelete(key string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventDelete, Key: key}
}

func (l *TransactionLogger) Err() <-chan error {
	return l.errors
}

//생성자
func NewTransactionLogger(filename string) (*TransactionLogger, error) {
	var err error
	var l TransactionLogger = TransactionLogger{wg: &sync.WaitGroup{}}

	// Open the transaction log file for reading and writing.
	l.file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
	if err != nil {
		return nil, fmt.Errorf("cannot open transaction log file: %w", err)
	}

	return &l, nil
}

func (l *TransactionLogger) Run() {
	//채널 생성. 16개까지 받을 수 있음. 16개 이상으로 넣으려 하면 blocking 됨.
	events := make(chan Event, 16)
	l.events = events

	errors := make(chan error, 1)
	l.errors = errors

	// Start retrieving events from the events channel and writing them
	// to the transaction log
	go func() {
		for e := range events {
			l.lastSequence++

			_, err := fmt.Fprintf(
				l.file,
				"%d\t%d\t%s\t%s\n",
				l.lastSequence, e.EventType, e.Key, e.Value)

			if err != nil {
				errors <- fmt.Errorf("cannot write to log file: %w", err)
			}

			l.wg.Done()
		}
	}()
}

func (l *TransactionLogger) Wait() {
	l.wg.Wait()
}

func (l *TransactionLogger) Close() error {
	l.Wait()

	if l.events != nil {
		close(l.events) // Terminates Run loop and goroutine
	}

	return l.file.Close()
}


//로그 파일 읽기
func (l *TransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
	scanner := bufio.NewScanner(l.file)
    
    //포인터가 아닌 Event 값 자체를 받는 채널.
	outEvent := make(chan Event)
	outError := make(chan error, 1)

	go func() {
		var e Event

		//끝날 때, 채널을 닫음으로써 더 이상 데이터를 못 넣게 함.(읽을 수는 있음)
		defer close(outEvent)
		defer close(outError)

		for scanner.Scan() {
			line := scanner.Text()
			
			fmt.Sscanf(
				line, "%d\t%d\t%s\t%s",
				&e.Sequence, &e.EventType, &e.Key, &e.Value)
			
            //일련 번호 증가 확인.
			if l.lastSequence >= e.Sequence {
				outError <- fmt.Errorf("transaction numbers out of sequence")
				return
			}

			uv, err := url.QueryUnescape(e.Value)
			if err != nil {
				outError <- fmt.Errorf("value decoding failure: %w", err)
				return
			}

			e.Value = uv
			l.lastSequence = e.Sequence

			//매번 event 값을 새로 생성하지 않음.
            //즉, 복사본을 넣어주는 것임.
			outEvent <- e
		}

		if err := scanner.Err(); err != nil {
			outError <- fmt.Errorf("transaction log read failure: %w", err)
		}
	}()

	return outEvent, outError
}

 

 

 

wait group을 포인터로 선언하지 않으면, 복사본을 이용하기 때문에 동기화가 제대로 일어나지 않습니다.

아래가 예시 입니다.

func main() {
	var wg sync.WaitGroup

	// Correct usage with pointer
	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		// Do some work
	}(&wg)
	wg.Wait()

	// Incorrect usage with value (will cause panic)
	wg.Add(1)
	go func(wg sync.WaitGroup) {
		defer wg.Done() // This modifies a copy, not the original
		// Do some work
	}(wg)
	wg.Wait() // This will not wait for the goroutine because the Add was done on a copy
}

이런 식으로 데드락이 발생하게 됩니다.

 

 

이를 이제 이전 포스트에서 구현한 rest api 에 접목시킬 수 있습니다.

package main

import (
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

var transact *TransactionLogger

func loggingMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		log.Println(r.Method, r.RequestURI)
		next.ServeHTTP(w, r)
	})
}

func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
	http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	err = Put(key, string(value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WritePut(key, string(value))

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WriteDelete(key)

	log.Printf("DELETE key=%s\n", key)
}

//추가된 부분
func initializeTransactionLog() error {
	var err error
	
    //트랜잭션 로그 파일 경로
	transact, err = NewTransactionLogger("/tmp/transactions.log")
	if err != nil {
		return fmt.Errorf("failed to create transaction logger: %w", err)
	}

	//파일 읽어오는 부분. 채널 반환함.
	events, errors := transact.ReadEvents()
	count, ok, e := 0, true, Event{}

	for ok && err == nil {
		select {
		case err, ok = <-errors:

		//파일에서 읽은 이벤트. e는 Event. ok 는 채널 끝 여부 알려줌.
		case e, ok = <-events:
        
        	//기록된 이벤트들 읽고 실행.
			switch e.EventType {
			case EventDelete: // Got a DELETE event!
				err = Delete(e.Key)
				count++
			case EventPut: // Got a PUT event!
				err = Put(e.Key, e.Value)
				count++
			}
		}
	}

	log.Printf("%d events replayed\n", count)

	//이후 요청들(put, delete)에 대해서 트랜잭션 로그를 남김
	transact.Run()

	return err
}

func main() {
	// Initializes the transaction log and loads existing data, if any.
	// Blocks until all data is read.
	err := initializeTransactionLog()
	if err != nil {
		panic(err)
	}

	// Create a new mux router
	r := mux.NewRouter()

	r.Use(loggingMiddleware)

	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	r.HandleFunc("/v1", notAllowedHandler)
	r.HandleFunc("/v1/{key}", notAllowedHandler)

	log.Fatal(http.ListenAndServe(":8080", r))
}

 

실제로 put을 해봤습니다.

첫 번째는 sequence id, 두 번째는 event type(1: delete, 2: put), 세 번째는 key, 네 번째는 value 입니다.

a : hi, b: hello, c: hello,d : "" 를 put 한 것을 확인할 수 있습니다.

 

그리고 종료 후, 다시 실행해봤습니다.

총 6번 작업 읽고 처리한 것 확인 가능

key a에 대한 데이터를 조회하면,

hi를 가져오는 것을 볼 수 있습니다.

 

또한 put 작업을 하면, trasactions.log 파일에 뒤에 7부터 들어가는 것을 확인할 수 있습니다.

 

 

이 코드의 개선해야 할 점으로는

  • 파일을 안전하게 닫기 위한 close 미존재
  • 쓰기 버퍼에 이벤트 남아 있을 때 서비스 종료되면, 이벤트가 사라짐.
  • 키와 값의 크기가 정해져 있지 않음.
  • 로그가 평문으로 저장되기 때문에, 디스크 공간을 많이 차지하게 됨.
  • 로그 영원히 보관.

이 있습니다

 

 

3. 데이터베이스에 상태 저장하기

데이터베이스에 저장할 경우,

 

장점

  - 분산된 state에 대해 덜 걱정해도 됨. -> 클라우드 네이티브에 가깝다고 함.

  - 용이한 확장성. 복제본들 사이에 공유할 데이터가 없으므로 확장이 더 쉬움.

 

단점

  - 병목 현상 발생. 만약 여러 복제본이 db에서 값을 읽으면 문제될 수 있음.

  - 업스트림 의존성 발생. 외부 시스템에 대한 의존성 생김.

package main

import (
	"database/sql"
	"fmt"
	"net/url"
	"sync"

	_ "github.com/lib/pq" // init 함수를 자동으로 호출하지만, 실제 패키지는 사용하지 않음.
)


//디비 연결
type PostgresDbParams struct {
	dbName   string
	host     string
	user     string
	password string
}


//파일이랑 다른 점은 Db 연결 추가, 마지막 seq number 관리 없앰.
type PostgresTransactionLogger struct {
	events chan<- Event
	errors <-chan error 
	db     *sql.DB
	wg     *sync.WaitGroup
}

func (l *PostgresTransactionLogger) WritePut(key, value string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}

func (l *PostgresTransactionLogger) WriteDelete(key string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventDelete, Key: key}
}

func (l *PostgresTransactionLogger) Err() <-chan error {
	return l.errors
}

func (l *PostgresTransactionLogger) LastSequence() uint64 {
	return 0
}


func (l *PostgresTransactionLogger) Run() {
	events := make(chan Event, 16) // Make an events channel
	l.events = events

	errors := make(chan error, 1) // Make an errors channel
	l.errors = errors

	go func() { // The INSERT query
    	//db에서 sequence number 관리
		query := `INSERT INTO transactions
			 (event_type, key, value)
			 VALUES ($1, $2, $3)`

		for e := range events { // Retrieve the next Event
			_, err := l.db.Exec( // Execute the INSERT query
				query,
				e.EventType, e.Key, e.Value)

			if err != nil {
				errors <- err
			}

			l.wg.Done()
		}
	}()
}

func (l *PostgresTransactionLogger) Wait() {
	l.wg.Wait()
}

func (l *PostgresTransactionLogger) Close() error {
	l.wg.Wait()

	if l.events != nil {
		close(l.events) // Terminates Run loop and goroutine
	}

	return l.db.Close()
}

func (l *PostgresTransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
	outEvent := make(chan Event)    // An unbuffered events channel
	outError := make(chan error, 1) // A buffered errors channel
    //unbuffer 이용할 경우, consume 부분이 준비될 때까지 produce하지 않음.

	query := "SELECT sequence, event_type, key, value FROM transactions"

	go func() {
		defer close(outEvent)
		defer close(outError)

		rows, err := l.db.Query(query)
		if err != nil {
			outError <- fmt.Errorf("sql query error: %w", err)
			return
		}

		defer rows.Close() // 커넥션 종료.

		var e Event // Create an empty Event

		for rows.Next() { // Iterate over the rows

			err = rows.Scan(
				&e.Sequence, &e.EventType,
				&e.Key, &e.Value)

			if err != nil {
				outError <- err
				return
			}

			outEvent <- e // Send e to the channel
		}

		err = rows.Err()
		if err != nil {
			outError <- fmt.Errorf("transaction log read failure: %w", err)
		}
	}()

	return outEvent, outError
}

func (l *PostgresTransactionLogger) verifyTableExists() (bool, error) {
	const table = "transactions"

	var result string

	rows, err := l.db.Query(fmt.Sprintf("SELECT to_regclass('public.%s');", table))
	if err != nil {
		return false, err
	}
	defer rows.Close()

	for rows.Next() && result != table {
		rows.Scan(&result)
	}

	return result == table, rows.Err()
}

func (l *PostgresTransactionLogger) createTable() error {
	var err error

	createQuery := `CREATE TABLE transactions (
		 sequence      BIGSERIAL PRIMARY KEY,
		 event_type    SMALLINT,
		 key 		  TEXT,
		 value         TEXT
	   );`

	_, err = l.db.Exec(createQuery)
	if err != nil {
		return err
	}

	return nil
}

func NewPostgresTransactionLogger(param PostgresDbParams) (TransactionLogger, error) {
	connStr := fmt.Sprintf("host=%s dbname=%s user=%s password=%s sslmode=disable",
		param.host, param.dbName, param.user, param.password)

	db, err := sql.Open("postgres", connStr)
	if err != nil {
		return nil, fmt.Errorf("failed to create db value: %w", err)
	}

	//대부분의 드라이버가 핑을 날릴 때 커넥션을 맺음.
	err = db.Ping() // Test the databases connection
	if err != nil {
		return nil, fmt.Errorf("failed to opendb connection: %w", err)
	}

	tl := &PostgresTransactionLogger{db: db, wg: &sync.WaitGroup{}}

	exists, err := tl.verifyTableExists()
	if err != nil {
		return nil, fmt.Errorf("failed to verify table exists: %w", err)
	}
    //테이블 존재하지 않으면 생성
	if !exists {
		if err = tl.createTable(); err != nil {
			return nil, fmt.Errorf("failed to create table: %w", err)
		}
	}

	return tl, nil
}

 

package main

import (
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

var transact TransactionLogger

func loggingMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		log.Println(r.Method, r.RequestURI)
		next.ServeHTTP(w, r)
	})
}

func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
	http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	err = Put(key, string(value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WritePut(key, string(value))

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WriteDelete(key)

	log.Printf("DELETE key=%s\n", key)
}

func initializeTransactionLog() error {
	var err error

	transact, err = NewPostgresTransactionLogger(PostgresDbParams{
		host:     "localhost",
		dbName:   "test",
		user:     "test",
		password: "test",
	})
	if err != nil {
		return fmt.Errorf("failed to create transaction logger: %w", err)
	}

	events, errors := transact.ReadEvents()
	count, ok, e := 0, true, Event{}

	for ok && err == nil {
		select {
		case err, ok = <-errors:

		case e, ok = <-events:
			switch e.EventType {
			case EventDelete: // Got a DELETE event!
				err = Delete(e.Key)
				count++
			case EventPut: // Got a PUT event!
				err = Put(e.Key, e.Value)
				count++
			}
		}
	}

	log.Printf("%d events replayed\n", count)

	transact.Run()

	go func() {
		for err := range transact.Err() {
			log.Print(err)
		}
	}()

	return err
}

func main() {
	// Initializes the transaction log and loads existing data, if any.
	// Blocks until all data is read.
	err := initializeTransactionLog()
	if err != nil {
    	//함수 멈추고, defer 함수 실행.
		panic(err)
	}

	// Create a new mux router
	r := mux.NewRouter()

	r.Use(loggingMiddleware)

	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	r.HandleFunc("/v1", notAllowedHandler)
	r.HandleFunc("/v1/{key}", notAllowedHandler)

	log.Fatal(http.ListenAndServe(":8080", r))
}

파일을 이용한 것과 큰 차이는 없습니다.

 

실행했을 경우, put(2), delete(1)에 대한 작업을 잘 저장하는 것을 확인할 수 있습니다.

 

또한 재실행하고 key a를 조회하면,

이전 작업 읽음.

 

맨 마지막에 작업했던 값인 bye를 읽는 것을 확인할 수 있습니다.

반응형

'BackEnd > go' 카테고리의 다른 글

[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

https://github.com/cloud-native-go/examples 로 공부한 내용입니다.

 

1. net/http를 이용한 Rest API 

package main

import (
	"log"
	"net/http"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("Hello net/http!\n"))
}

func main() {
	http.HandleFunc("/", helloGoHandler)


	// 2번째 인자는 multiplexor. nil인 경우 DefaultServeMux 이용
	//ListenAndServe는 에러가 발생한 경우에만 반환.
    //log.Fatal은 에러가 발생할 경우, 에러메시지 반환하고 종료
	log.Fatal(http.ListenAndServe(":8080", nil))

}

 

multiplexor로 패턴(경로 등)이랑 함수를 매핑해줍니다. 

 

pattern 매칭

이러한 코드가 있고,

 

이런 식으로 등록을 하게 됩니다.

 

 

하지만 다른 mux를 사용하고 싶을 수 있습니다.

책에서는 gorllia/mux를 사용하는 예제를 설명합니다.

 

2. gorilla mux를 이용한 rest api

package main

import (
	"log"
	"net/http"
	//따로 go init {...} 해서 go.mod 만들어야 함.
	"github.com/gorilla/mux"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("Hello net/http!\n"))
}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/", helloGoHandler)

	log.Fatal(http.ListenAndServe(":8080", r))

}

 

 

앞에서 사용한 net/http에서 추가된 기능으로는

경로에 /{key} 같은 형태를 넣을 수 있고 정규표현식도 사용할 수 있습니다.

 

또한 이러한 파라미터를 추출할 수 있습니다.

 

이를 적용한 코드가 아래 코드입니다.

package main

import (
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	var name string = vars["key"]

	w.Write([]byte("Hello " + name + "\n"))
}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/{key}", helloGoHandler)

	log.Fatal(http.ListenAndServe(":8080", r))

}

 

 

실제 출력

보시면 me를 출력한 것을 볼 수 있습니다.

쉽게 파라미터를 추출할 수 있는 것을 확인할 수 있습니다.

 

 

이를 이용해서 put과 get 기능을 만들어보겠습니다.

package main

import "errors"

//멀티 쓰레드로 동작하면?
var store = make(map[string]string)

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	delete(store, key)

	return nil
}

func Get(key string) (string, error) {
	value, ok := store[key]

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store[key] = value

	return nil
}

 

package main

import (
	"errors"
	"io"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := io.ReadAll(r.Body)
	defer r.Body.Close()

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	err = Put(key, string(value))

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)

	log.Printf("Delete key=%s\n", key)

}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	log.Fatal(http.ListenAndServe(":8080", r))
}

 

테스트를 해보면,

1. PUT

key : a, value : hello

 

2. GET

value인 hello 출력되는 것 확인

3.DELETE

key가 a인 것 삭제

 

4. 삭제 후, GET

no such key 출력 확인

잘 동작하는 것을 확인할 수 있습니다.

 

 

3. 동시성 문제

하지만 앞에서 봤던 코드에서 멀티 쓰레드로 동작할 경우 동시성 문제가 생길 수 있습니다.

package main

import "errors"

//멀티 쓰레드로 동작하면?
var store = make(map[string]string)

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	delete(store, key)

	return nil
}

func Get(key string) (string, error) {
	value, ok := store[key]

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store[key] = value

	return nil
}

key-value 저장소를 그냥 map으로 이용했기 때문입니다.

 

 

이를 해결하기 위해서 mutex를 이용합니다.

 

 

package main

import (
	"errors"
	"sync"
)

var store = struct {
	sync.RWMutex
	m map[string]string
}{m: make(map[string]string)}

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	store.Lock()
	delete(store.m, key)
	store.Unlock()

	return nil
}

func Get(key string) (string, error) {
	store.RLock()
	value, ok := store.m[key]
	store.RUnlock()

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store.Lock()
	store.m[key] = value
	store.Unlock()

	return nil
}

 

코드를 보면, get은 read lock을 사용하는 것을 볼 수 있습니다.

 

반응형

k8s 및 go 관련해서 사내 스터디를 진행 중입니다.

future 에 관해 스터디를 했었습니다.

 

책 저자는 future는 비동기 프로세스에 의해 생성되는 값에 플레이스홀더를 제공하는 연산자라고 합니다.(문맥에 따라 다른 의미가 될 수 있다고 항의하지 말라고 합니다.)

 

자바에서는 future를 통해 다른 작업을 하다가, 값이 필요해지면 get()을 해서 기다릴 수 있습니다.

아래 코드도 비슷하게 동작합니다.

slow future를 호출해서 본 작업을 실행하고, Result()를 호출하면, 결과가 올 때 까지 기다립니다.

 

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Future interface {
	Result() (string, error)
}

type InnerFuture struct {
	once sync.Once
	wg   sync.WaitGroup

	res string
	err error

	resCh <-chan string
	errCh <-chan error
}

func (f *InnerFuture) Result() (string, error) {
	
    //SlowFunction이 끝날 때 까지 기다림. 그리고 한 번만 실행되도록 보장.
    f.once.Do(func() {
		f.wg.Add(1)
		defer f.wg.Done()
		fmt.Println("hi")
		fmt.Println(time.Now())
		f.res = <-f.resCh
		f.err = <-f.errCh
		fmt.Println("bye")
	})

	// 결과 값 가져올 때까지 기다리기.
	f.wg.Wait()
	fmt.Println("end")

	return f.res, f.err
}

func SlowFunction(ctx context.Context) Future {
	resCh := make(chan string)
	errCh := make(chan error)

	// 비동기 작업 실행.
	go func() {
		fmt.Println("start async(future job)")
		select {
		case <-time.After(time.Second * 5):
			fmt.Println("end async")
			resCh <- time.Now().String()
			errCh <- nil
		case <-ctx.Done():
			resCh <- ""
			errCh <- ctx.Err()
		}
	}()

	return &InnerFuture{resCh: resCh, errCh: errCh}
}

func main() {
	ctx := context.Background()

	//비동기 작업 실행
	future := SlowFunction(ctx)

	//다른 작업 수행
	fmt.Println("other work start")
	select {
	case <-time.After(time.Second * 3):
		fmt.Println("done")
	}

	//결과 가져오기.
	res, err := future.Result()
	res2, _ := future.Result()

	if err != nil {
		fmt.Println("error : ", err)
		return
	}
	fmt.Println(res)
	fmt.Println(res2)
}

출처 : https://github.com/cloud-native-go/examples (살짝 수정한 코드)

 

 

이를 실행하면, 

 

other work start
start async(future job)
done
hi
2009-11-10 23:00:03 +0000 UTC m=+3.000000001
end async
bye
end
end
2009-11-10 23:00:05 +0000 UTC m=+5.000000001
2009-11-10 23:00:05 +0000 UTC m=+5.000000001

 

이 반환 됩니다.

 

1,2 째줄은 동시에 수행이 됩니다.

3번 째줄은 3초 뒤에 출력이 됩니다.(기존 작업이 3초 sleep이기 때뮨에)

4,5번째 줄은 아까 값을 기다리는 로직이 있었습니다.(once.Do(...)) 이 부분의 로직을 호출했다는 의미로 hi와 호출 시간을 출력했습니다. 앞에서 3초 기다렸기 때문에, +3초 찍히는 걸 볼 수 있습니다.

6번째 줄은 future 내부의 실제 작업이 끝난 것을 의미하고, 7번째 줄은  once.Do(...) 에서 해당 작업이 끝난 것을 감지했을 경우 출력이 됩니다.

7,8 번 째 end가 두 번 출력된 이유는 main 함수에서 future.Result()를 2번 호출하고 있기 때문입니다. 하지만 6번째의 end async는 2번 호출되지 않은 것을 확인할 수 있습니다. 이는 sysn.once 를 이용하기 때문에 이러한 결과가 나왔습니다.

실제로 .Result() 를 처음 호출했을 경우는 future의 비동기 작업이 끝날 때 까지 기다리지만, 그 이후 호출에서는 한 번 더 작업을 실행하지 않고 이전에 처리한 결과값을 바로 리턴합니다.

 

그래서 9,10 번째도 똑같은 시간이 5초가 나온 것을 확인할 수 있습니다. 5초가 나온 이유는 future의 비동기 작업에서 5초 sleep했기 때문입니다.

 

 

비동기 프로그래밍이 진짜 어려운 것 같습니다. webflux 느낌도 나고 재미있네요.

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

k8s 및 go 관련해서 사내 스터디를 진행 중입니다.

 

근데 struct가 어떻게 동작하는지 헷갈려서 자바와 비교를 하게 되었습니다.

 

 

우선 자바 같은 경우에는 class가 있습니다.

이런 식으로 하게 되면, test1 이랑 test2가 같은 객체(메모리 주소 값)를 가지게 됩니다.

test1의 필드를 수정해도 test2의 필드도 같이 수정이 되게 됩니다.

그래서 복사본을 따로 만들어주거나 dto를 사용하는 등 예상치 못한 수정을 막아야 합니다.

 

 

그럼 go의 struct도 똑같이 동작을 하는지 궁금했습니다.

 

 

s라는 student struct와, tt라는 stude struct가 있습니다.

tt := s 를 이용하여 주면, tt와 s가 같은 메모리 주소의 변수를 이용하고 있는지 궁금했습니다.

 

이를 실행하면,

이처럼 tt 다음에 ttt가 아닌 ss가 오게 됩니다.

즉, 자바와는 다르게 tt의 이름을 수정을 해도 s의 이름은 수정이 되지 않았습니다. 

 

또한 메모리 주소도 서로 다른 것을 확인할 수 있습니다.

 

 

반면에 getStudent() 메소드가 포인터를 반환하게 하면, 서로 같은 메모리를 참조하고 있습니다.

(printf에서 &이 사라진 이유는 포인터 변수가 가리키고 있는 메모리 주소(실제 이용하는 객체(?)의 위치)를 알기 위해서입니다.

 

이런 식으로 해야 자바처럼 똑같은 메모리의 변수를 바라보게 됩니다.

 

 

결론은 go의 struct는 referrence가 아닌 value로 할당을 해주게 되면, 그때마다 메모리에 새로운 값을 할당하고 이를 반환하는 방식인 것을 확인할 수 있었습니다,

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

클라우드 네이티브 Go 책을 참고했습니다.

 

 

 

go를 이용하여 안정성 패턴을 구현해봅니다.

(분산 애플리케이션에서의)

 

 

서킷 브레이커

서비스가 실패할 경우, 장애가 퍼지는 걸 막을 수 있습니다.

 

예를 들면, db lock 이나 리소스 부족 등으로 인해 timeout이 발생할 수 있습니다. time out이 계속해서 발생하게 될 경우에는 뒤의 요청에 대해서도 처리를 하지 못합니다. 만약 MSA 라면, 모든 서비스들이 영향을 받게 되고 이는 전체 서비스 장애와 이어 집니다.

 

또한 클라이언트가 retry까지 하게 되면 네트워크 단에 부하가 엄청나게 발생합니다.

서킷 브레이커는 이러한 장애를 막을 수 있습니다.

 

에러가 몇 번 이상 발생했을 경우 서킷을 open 합니다. 그러면 기본 로직은 동작하지 않고 바로 error를 클라이언트에 반환합니다.

시간이 지난 후에는 half open으로 요청 중 일부만 받아들입니다.

만약 이 과정에서 에러가 발생하지 않았다면 서킷이 close되고 기존 로직이 동작하게 됩니다.

 

이를 go로 구현해보겠습니다.

package circuit

// context는 thread safe.
// 즉, 다수의 go routine이 접근해도 괜찮음.

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"
)

type Circuit func(context.Context) (string, error)

func Breaker(circuit Circuit, threshold int) Circuit {

	var last = time.Now()
	var m sync.RWMutex
	failures := 0

	return func(ctx context.Context) (string, error) {
		m.Lock() //mutex 를 이용한 read lock(기존 코드) -> Read lock 을 하면 서킷에 오픈이 안됨..?

		count := failures - threshold

		// 실패 개수가 더 많을 경우
		if count >= 0 {
			retryAt := last.Add(40 * time.MilliSecond)
			if !time.Now().After(retryAt) {
				m.Unlock()
				fmt.Println("open")
				return "open", errors.New("open")
			}
		}

		m.Unlock()

		res, err := circuit(ctx)

		m.Lock()
		defer m.Unlock() //끝날 때 lock

		last = time.Now() //이를 위해 lock이 필요함.

		if err != nil {
			fmt.Println(err)
			failures++ // need lock
			return res, err
		}

		if failures > 0 {
			failures -= 1
		}

		fmt.Println("200")
		return res, nil

	}

}

 

 

테스트 코드 입니다.

package circuit

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"testing"
	"time"
)

func failAfter(threshold int) Circuit {
	count := 0

	// Service function. Fails after 5 tries.
	return func(ctx context.Context) (string, error) {
		count++

		if count > threshold {
			return "", errors.New("INTENTIONAL FAIL!")
		}

		return "Success", nil
	}
}

func waitAndContinue() Circuit {
	return func(ctx context.Context) (string, error) {
		time.Sleep(time.Second)

		if rand.Int()%2 == 0 {
			return "success", nil
		}

		return "Failed", fmt.Errorf("forced failure")
	}
}

func TestCircuitBreakerFailAfter5(t *testing.T) {
	circuit := failAfter(5)
	ctx := context.Background()

	for count := 1; count <= 10; count++ {
		message, err := circuit(ctx)

		t.Logf("attempt %d: %v, %s", count, err, message)

		switch {
		case count <= 5 && err != nil:
			t.Error("expected no error; got", err)
		case count > 5 && err == nil && message != "open":
			t.Error("expected err and open")
		}
	}
}

func TestCircuitBreakerDataRace(t *testing.T) { //테스트 코드!
	ctx := context.Background()

	circuit := failAfter(5)
	breaker := Breaker(circuit, 1)

	wg := sync.WaitGroup{}

	for count := 1; count <= 20; count++ {
		wg.Add(1)
        
        

		go func(count int) {
			defer wg.Done()
			
			message, err := breaker(ctx)
			time.Sleep(10 * time.Millisecond)
			t.Logf("attempt %d: err=%v, message=%s", count, err, message)
		}(count)
	}

	wg.Wait()
}

 

5번 이상이 넘어가면 error가 발생하는 코드 입니다.

그리고 1번 에러가 발생하면, 그 다음에 circuit이 open 됩니다.

여기서 breaker에서는 40 밀리세컨드 동안 open이 됩니다. 그리고 다시 close 되고 error 발생한 후에 open이 되는 것을 볼 수 있습니다.

 

 

* half open 등이 세부 사항은 구현되지 않았습니다.

 

* read lock을 이용해도 잘 됩니다.

 

 

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25

+ Recent posts