1. 문제 상황

회사에서 겪은 문제입니다.

 

레디스 hash 구조로 데이터가 저장되어 있습니다.

그리고 이 중 몇 개를 가져와서 가공 후 실제 데이터를 제공합니다.

 

부하테스트를 했을 때 기대치보다 낮은 결과가 나왔었습니다.

그래서 원인을 파악하기 위해서 시간을 로그로 찍어본 결과, redis에서 데이터를 조회하는 데에 대부분의 시간을 소모하고 있었습니다.

 

레디스에서 데이터를 읽는 데만 약 140~200ms 정도 걸렸었습니다.

2. 원인

첫 번째로 생각했던 이유는 redis 설정 문제였습니다. 하지만 os 파라미터 등을 조회해봤을 때 큰 문제는 없었습니다.

두 번째네트워크 부하입니다. 데이터가 꽤 크기 때문에 네트워크 단에서 부하가 일어나지 않았을까라고 생각했었습니다.

마지막으로는 redis의 hkey 같은 O(N) 연산을 호출하는 연산 때문이라고 생각했습니다. 레디스는 싱글 쓰레드 기반이기 때문에 영향을 줄 수 있다고 합니다. 하지만 해당 명령어를 2분마다 한 번 호출하고 데이터 개수도 200개가 넘지 않아서 아니라고 생각했습니다.

 

 

그래서 3 가지 중 네트워크 부하가 가장 큰 문제라고 생각해서 이 문제를 해결하려고 했습니다.

 

3. 해결(?)

 

우선 redis에 데이터를 넣을 때, 크기를 줄일 필요가 있다고 생각했습니다.

그러기 위해서 저는 압축을 이용했습니다.

가상면접 사례로 배우는 대규모 시스템 설계 기초에서도 초반에 압축을 해서 응답할 경우 속도 개선이 크다고 한 걸 본 기억이 있습니다.

 

그래서 redis에 넣을 때 snappy로 압축을 해서 넣었습니다.

snappy 같은 경우에는 mongoDB에서도 쓰고 있고, 압축율은 조금 낮더라도 cpu 소모량이 낮습니다.

 

redis에 데이터를 넣는 서버는 압축을 해서 넣어야 하고, 실제 데이터를 꺼내서 처리하는 서버는 압축을 해제해야 하기 때문에 cpu 소모량이 적은 snappy를 선택했습니다.

 

146 ~ 225ms 걸리던 것이 70 ~ 87 ms 까지 개선 되었습니다. 약 2배 이상 빨라졌습니다.

또한 redis에 적재되는 용량이 3.5배 정도 줄어들었습니다.

 

하지만 압축 알고리즘이 있기 때문에 cpu 소모량을 봐야 하고, 부하 테스트도 다시 해봐야 합니다.

이 부분도 해보고, 실제 tps가 올라가는 지 실험 후 결과 공유하겠습니다.

반응형

1. 문제 상황

회사에서 신규 프로젝트 운영을 위해서 python으로 백오피스를 구현하고 있습니다.

리스트로 이루어진 데이터가 입력으로 오면, 이 데이터를 mongoDB에서 조회해서 가공한 후에 지도에 출력하는 프로젝트 입니다.

 

문제는 성능이 나오지 않았습니다.

42개의 데이터를 조회 및 처리하는 데에 5초나 걸렸습니다.

 

2. 원인

mongoDB가 해외에 위치해 있습니다. 그러다보니 query를 날리고 응답을 받는데에 시간이 오래 걸릴 것이라고 생각했습니다.

특히 이런 IO 작업을 싱글 쓰레드로 처리하다 보니 속도가 느려졌다고 생각했었습니다.

 

3.  해결

그래서 싱글 쓰레드로 동작하던 것을 멀티 쓰레드로 변경했습니다.

python의 GIL이 있지만, mongoDB에서 데이터를 가져오는 IO 작업 부분을 멀티 쓰레드로 작업하기 때문에 큰 상관은 없습니다.

 

약 max_worker를 20개로 설정해서 실험을 해봤습니다.

 

기존에 5초가 걸리던 작업이 약 2.3초까지 줄어든 것을 확인했습니다.

42개의 데이터를 가져오는 데 2.3초나 걸리는 것은 성능이 많이 느리다고 생각했습니다.

그래서 다른 해결 방안을 찾고 있었습니다.

 

 

이번에는 데이터의 개수가 약 4500개를 조회하도록 하고 max_worker를 100까지 증가시켰습니다.

무려 24초나 걸렸습니다.

 

이번엔 mongoDB에 index를 생성해서 조회를 해봤습니다.

24초 -> 16초까지 성능은 개선이 되었습니다. 무려 8초나 줄었기 때문에 여기서 만족하고 다른 일을 처리하러 갔습니다.

 

그러던 중, kafka의 partitioner가 생각이 났습니다.

kafka의 경우 produce를 할 때, partitioner의 전략(sticky, round robin)에 따라 데이터를 어느 partition에 데이터를 넣을 지 결정합니다. 그리고 이를 바로 kafka에 전송하는 것이 아니라, 설정한 batch 개수 만큼 쌓거나 일정 시간 지난 후에 한 번에 보내게 됩니다.

이렇게 한 이유가 네트워크 통신 비용을 줄이려고 그랬다는 내용이 갑자기 떠올랐습니다.

 

그래서 데이터 개수만큼의 쿼리를 전송하는 것이 아니라, 하나의 쿼리로 여러 데이터를 조회하도록 수정했습니다.

무려 16초에서 3.7초까지 성능이 개선 되었습니다.

특히 해외에 mongoDB가 있다보니 여러 쿼리를 날리고 응답을 받는 것이 성능에 큰 이슈가 있었던 것으로 파악이 됩니다.

 

결론

개선 방법 성능
멀티 쓰레드로 조회 24 s
mongoDB index 생성 16 s
싱글 쓰레드로 쿼리 하나로 여러 데이터 조회하도록 수정(Bulk) 3.7s

 

 

실제 다른 기능도 멀티 쓰레드로 구현이 되어 있었습니다. 데이터를 많이 조회하지도 않는데, 7초나  소모됩니다. 

이 부분을 맡은 팀원 분에게도 해당 내용 공유를 드렸습니다.

 

이 기능까지 성능이 개선이 된다면, 전체 프로세스가 기존 31초에서 약 4초까지 성능이 개선될 것으로 예상이 됩니다.

 

 

4. 느낀 점

당연히 batch로 데이터를 전송하는 것이 성능이 좋다는 것은 알고 있었습니다. 하지만 이 당시에 제 코드를 작성하고 있을 때는 여러 쿼리를 날리는 것이 문제가 될 것이라고 생각하지 못했습니다. 머리로는 알고 있었지만, 응용은 하지 못했었습니다.

 

머리로만 알던 지식을 완전히 제 것으로 만들기 가장 쉬운 방법은 프로젝트 경험이라는 것을 느낄 수 있었습니다.

(그래서 개발 동아리가 끝난 후에도, 팀원을 구해서 사이드 프로젝트를 계속 유지보수할 생각입니다.)

 

 

성능 개선하는 것이 보람차고 즐겁네요.

성능이 중요한 일을 해보고 싶네요.

 

반응형

회사에서 일을 하다가 마주친 이슈입니다.

 

 

1. 문제 상황

데이터를 외부에서 수집해서 서비스에 맞게 파싱 후, 저희 db와 redis에 넣는 작업을 했었습니다.

데이터 자체는 크지 않았지만, 서비스에 맞게 사용하기 위해 가공한 후의 데이터가 많았습니다.

 

여기서 문제는 가공하고 mongoDB와 redis에 넣을 때 힙을 약 5기가 정도로 많이 사용하고 있었습니다.

 

그래서 왜 이렇게 많이 힙을 사용하는 지 알아봤습니다.

 

 

2. 원인 파악

flatMap을 이용해서 데이터를 처리하고 있었습니다.

 

void test() {
        
    List<String> data = new ArrayList<>();

    Flux.just(data)
            .flatMap(fetchData -> {
                //db에 저장
                return Mono.just(fetchData);
            })
            .flatMap(fetchData -> {
                //redis 에 저장
                return Mono.just(fetchData);
            });
        
}

(실제로 회사에서 이러한 코드를 사용하는 건 아닙니다.

 

실제 데이터를 가져오고, 이를 가공한 후에 넣는데 문제는 flatMap을 사용하다보니 이 과정이 비동기로 동작했습니다.

예를 들면, 저희 api1, api2, api3에 사용하기 위한 데이터를 가공을 하게 되는데 이 데이터를 전부 비동기로 db에 넣고 있었습니다.

그러다보니 힙을 예상보다 많이 사용하게 되었습니다.

 

또한 가공을 할 때, 모든 데이터를 한 번에 가공했었습니다.

데이터가 200만 개라고 가정하면, 200만 개를 전부 한 번에 가공하고 데이터를 db에 넣는 등 작업을 했었습니다.

가공을 하게 되면서 데이터의 사이즈가 늘어나게 되고 이 과정도 문제가 되었습니다.

 

 

3. 해결 방안

일단 모든 데이터를 한 번에 처리하던 것을 일정 개수로 나눠서 가공하고 db 등에 넣도록 수정했습니다.

하지만 이것만 했을 때는 큰 효과가 없었습니다.

왜냐하면 비동기로 동작하기 때문에 나누더라도 모든 데이터가 한번에 가공이 되기 때문입니다.

 

그래서 비동기로 동작하던 것을 동기로 변경을 시켜주고 싶었습니다.

그러기 위해서 flatMap의 concurrency를 제어해주었습니다.

 

flatMap(.... , {concurrency}) 를 통해 설정할 수 있습니다.

이를 1로하니 동기처럼 동작하는 것을 확인했습니다.

(로그 통해서)

 

 

실제로 힙이 5기가 사용하던 것이, 3기가로 극적으로 줄었습니다.

 

하지만 성능은 30초 걸리던 것이 40초로 늘어났습니다.

이러한 트레이드 오프를 찾는 것이 중요한 것 같습니다.

 

 

4. 느낀 점

기존에 스케줄러로 데이터를 가져와서 처리하고 있어서 그대로 썼었습니다.

spring batch가 대용량 데이터 처리에 좋다고 하는데, 이를 이용한다면 이러한 작업을 수월하게 할 수 있지 않을까 라는 생각이 들었습니다.

 

spring batch는 아직 한 번도 안 써봐서 이것도 공부를 해봐야 할 것 같습니다.

(그래서 현재 조그만한 회사 프로젝트를 코틀린 + batch로 작업하고 있습니다.)

반응형

최근에 CMC라는 동아리를 하면서 새로운 프로젝트를 진행하고 있습니다.

 

이 과정에서 삽질 때문에 시간을 너무 많이 써서.... 해당 내용을 기록해두려고 합니다.

 

원래는 GKE를 이용해서 배포를 하려 했었는데, 계속 서비스가 정상적으로 동작하지 않아서 vm instance를 이용해서 배포는 해둔 상태입니다.

 

 

 

1. 문제

우선 서버 세팅은 vpc 를 통해서 priavte subnet(10.0.1.0/24)이랑 public subnet(10.0.2.0/24)을 만들었습니다.

그리고 cloud sql을 private subnet에 등록하려고 했지만, 

이처럼 비공개 서비스 액세스라는 것을 만들라고 해서 이를 이용해서 만들었습니다.

이는 10.0.3.0/24 범위를 가지고 있습니다.

 

여기서 문제가 10.0.3.0/24의 db와 10.0.2.0/24의 vm 인스턴스가 서로 통신이 안되는 상황이었습니다.

 

 

2. 해결

저는 당연히 subnet끼리는 통신이 원활히 될 줄 알았습니다. 하지만 그렇지 않았습니다.

서로 방화벽을 뚫어줘야지 통신이 가능해집니다....

 

그래서 vpc의 방화벽에서 

vpc 방화벽 추가

 

이러한 조건을 추가해줬습니다. 그러니 db랑 연결이 잘 됩니다.

 

 

 

이 간단한 것을 못 찾아서 거의 2시간 쓴 것 같았습니다... 이상한 곳에 문제가 있다고 생각하고 계속 다른 데를 봤네요.

회사에서는 인프라팀이 이러한 일을 다 해주고 있어서 신경을 쓸 필요가 없었습니다. 사실 vpc 설정하고 서버를 배포해보는 것도 이번이 처음이네요. 이 기회로 제가 많이 부족하다는 것을 느낄 수 있었습니다.

반응형

책 클라우드 네이티브 go를 참고했습니다.

https://github.com/cloud-native-go/examples (코드)

 

 

분산 클라우드 네이티브의 도전 과제 중 하나는 '상태를 어떻게 유지할 것인가' 입니다.

 

1. 트랜잭션 로그 파일에 상태 저장. 리소스가 변경될 때마다 파일 기반의 트랜잭션 로그를 이용하여 기록. 트랜잭션 로그는 서비스가 트랜잭션을 다시 수행하여 원래의 상태를 쉽게 만들 수 있도록 함.

2. 외부 데이터베이스에 상태 저장. 트랜잭션 로그 저장을 외부 db에 함.

 

1. 트랜잭션 로그

트랜잭션 로그는 변경사항의 기록을 유지하는 로그 파일.

서비스에 문제 생기거나, 재시작 될 경우 트랜잭션을 다시 수행해서 복원할 수 있게 해줍니다.

 

트랜잭션 로그는 이벤트를 오래된 것부터 최신 순으로 읽어서 재연할 것입니다. 그래서 트랜잭션 로그에 필요한 것들은

- 순번(Sequence number) : 쓰기 작업에 대한 고유 ID. 항상 증가.

- 이벤트 타입 : PUT, DELETE 등 이벤트 타입들

- 키 : 트랜잭션에 의해 영향 받는 키(앞의 rest api를 구현할 때 key - value 저장소를 이용했으므로)

- 값 : 이벤트가 PUT이라면, 어느 데이터를 넣었는 지 알아야 하기 때문에 value가 필요.

 

 

2. 파일 기반 트랜잭션 로그

 

트랜잭션 로그를 파일에 저장하면,

장점 : 

  - 다운스트림에 대한 의존성 없음. (외부 서비스에 대한 의존성이 없음)

  - 기술적인 직관성(구현 간단함0

 

단점:

  - 확장 어려움(노드 간 상태 정보를 분산하여 공유하기 위한 방법 필요)]

  - 파일 증가(파일 크기 제한을 두어야 함)

 

이러한 장단점이 있습니다.

 

package main

import (
	"bufio"
	"fmt"
	"net/url"
	"os"
	"sync"
)

type EventType byte

//java의 enum 처럼 사용 가능.
const (
	_                     = iota // iota == 0; ignore this value
	EventDelete EventType = iota // iota == 1
	EventPut                     // iota == 2; implicitly repeat last
)

type Event struct {
	Sequence  uint64
	EventType EventType
	Key       string
	Value     string
}


// io.Writer를 이용하는 대신, 채널을 이용. io.Writer는 싱글 스레드 기반이라 io 시간에 많은 시간이 걸린다고 함.
type TransactionLogger struct {
	events       chan<- Event // Write-only channel for sending events
	errors       <-chan error
	lastSequence uint64   // The last used event sequence number
	file         *os.File // The location of the transaction log
	wg           *sync.WaitGroup //포인터로 선언하지 않으면 복사본을 이용하게 됨. 동기화가 제대로 일어나지 않음.
}

func (l *TransactionLogger) WritePut(key, value string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}

func (l *TransactionLogger) WriteDelete(key string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventDelete, Key: key}
}

func (l *TransactionLogger) Err() <-chan error {
	return l.errors
}

//생성자
func NewTransactionLogger(filename string) (*TransactionLogger, error) {
	var err error
	var l TransactionLogger = TransactionLogger{wg: &sync.WaitGroup{}}

	// Open the transaction log file for reading and writing.
	l.file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
	if err != nil {
		return nil, fmt.Errorf("cannot open transaction log file: %w", err)
	}

	return &l, nil
}

func (l *TransactionLogger) Run() {
	//채널 생성. 16개까지 받을 수 있음. 16개 이상으로 넣으려 하면 blocking 됨.
	events := make(chan Event, 16)
	l.events = events

	errors := make(chan error, 1)
	l.errors = errors

	// Start retrieving events from the events channel and writing them
	// to the transaction log
	go func() {
		for e := range events {
			l.lastSequence++

			_, err := fmt.Fprintf(
				l.file,
				"%d\t%d\t%s\t%s\n",
				l.lastSequence, e.EventType, e.Key, e.Value)

			if err != nil {
				errors <- fmt.Errorf("cannot write to log file: %w", err)
			}

			l.wg.Done()
		}
	}()
}

func (l *TransactionLogger) Wait() {
	l.wg.Wait()
}

func (l *TransactionLogger) Close() error {
	l.Wait()

	if l.events != nil {
		close(l.events) // Terminates Run loop and goroutine
	}

	return l.file.Close()
}


//로그 파일 읽기
func (l *TransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
	scanner := bufio.NewScanner(l.file)
    
    //포인터가 아닌 Event 값 자체를 받는 채널.
	outEvent := make(chan Event)
	outError := make(chan error, 1)

	go func() {
		var e Event

		//끝날 때, 채널을 닫음으로써 더 이상 데이터를 못 넣게 함.(읽을 수는 있음)
		defer close(outEvent)
		defer close(outError)

		for scanner.Scan() {
			line := scanner.Text()
			
			fmt.Sscanf(
				line, "%d\t%d\t%s\t%s",
				&e.Sequence, &e.EventType, &e.Key, &e.Value)
			
            //일련 번호 증가 확인.
			if l.lastSequence >= e.Sequence {
				outError <- fmt.Errorf("transaction numbers out of sequence")
				return
			}

			uv, err := url.QueryUnescape(e.Value)
			if err != nil {
				outError <- fmt.Errorf("value decoding failure: %w", err)
				return
			}

			e.Value = uv
			l.lastSequence = e.Sequence

			//매번 event 값을 새로 생성하지 않음.
            //즉, 복사본을 넣어주는 것임.
			outEvent <- e
		}

		if err := scanner.Err(); err != nil {
			outError <- fmt.Errorf("transaction log read failure: %w", err)
		}
	}()

	return outEvent, outError
}

 

 

 

wait group을 포인터로 선언하지 않으면, 복사본을 이용하기 때문에 동기화가 제대로 일어나지 않습니다.

아래가 예시 입니다.

func main() {
	var wg sync.WaitGroup

	// Correct usage with pointer
	wg.Add(1)
	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		// Do some work
	}(&wg)
	wg.Wait()

	// Incorrect usage with value (will cause panic)
	wg.Add(1)
	go func(wg sync.WaitGroup) {
		defer wg.Done() // This modifies a copy, not the original
		// Do some work
	}(wg)
	wg.Wait() // This will not wait for the goroutine because the Add was done on a copy
}

이런 식으로 데드락이 발생하게 됩니다.

 

 

이를 이제 이전 포스트에서 구현한 rest api 에 접목시킬 수 있습니다.

package main

import (
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

var transact *TransactionLogger

func loggingMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		log.Println(r.Method, r.RequestURI)
		next.ServeHTTP(w, r)
	})
}

func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
	http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	err = Put(key, string(value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WritePut(key, string(value))

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WriteDelete(key)

	log.Printf("DELETE key=%s\n", key)
}

//추가된 부분
func initializeTransactionLog() error {
	var err error
	
    //트랜잭션 로그 파일 경로
	transact, err = NewTransactionLogger("/tmp/transactions.log")
	if err != nil {
		return fmt.Errorf("failed to create transaction logger: %w", err)
	}

	//파일 읽어오는 부분. 채널 반환함.
	events, errors := transact.ReadEvents()
	count, ok, e := 0, true, Event{}

	for ok && err == nil {
		select {
		case err, ok = <-errors:

		//파일에서 읽은 이벤트. e는 Event. ok 는 채널 끝 여부 알려줌.
		case e, ok = <-events:
        
        	//기록된 이벤트들 읽고 실행.
			switch e.EventType {
			case EventDelete: // Got a DELETE event!
				err = Delete(e.Key)
				count++
			case EventPut: // Got a PUT event!
				err = Put(e.Key, e.Value)
				count++
			}
		}
	}

	log.Printf("%d events replayed\n", count)

	//이후 요청들(put, delete)에 대해서 트랜잭션 로그를 남김
	transact.Run()

	return err
}

func main() {
	// Initializes the transaction log and loads existing data, if any.
	// Blocks until all data is read.
	err := initializeTransactionLog()
	if err != nil {
		panic(err)
	}

	// Create a new mux router
	r := mux.NewRouter()

	r.Use(loggingMiddleware)

	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	r.HandleFunc("/v1", notAllowedHandler)
	r.HandleFunc("/v1/{key}", notAllowedHandler)

	log.Fatal(http.ListenAndServe(":8080", r))
}

 

실제로 put을 해봤습니다.

첫 번째는 sequence id, 두 번째는 event type(1: delete, 2: put), 세 번째는 key, 네 번째는 value 입니다.

a : hi, b: hello, c: hello,d : "" 를 put 한 것을 확인할 수 있습니다.

 

그리고 종료 후, 다시 실행해봤습니다.

총 6번 작업 읽고 처리한 것 확인 가능

key a에 대한 데이터를 조회하면,

hi를 가져오는 것을 볼 수 있습니다.

 

또한 put 작업을 하면, trasactions.log 파일에 뒤에 7부터 들어가는 것을 확인할 수 있습니다.

 

 

이 코드의 개선해야 할 점으로는

  • 파일을 안전하게 닫기 위한 close 미존재
  • 쓰기 버퍼에 이벤트 남아 있을 때 서비스 종료되면, 이벤트가 사라짐.
  • 키와 값의 크기가 정해져 있지 않음.
  • 로그가 평문으로 저장되기 때문에, 디스크 공간을 많이 차지하게 됨.
  • 로그 영원히 보관.

이 있습니다

 

 

3. 데이터베이스에 상태 저장하기

데이터베이스에 저장할 경우,

 

장점

  - 분산된 state에 대해 덜 걱정해도 됨. -> 클라우드 네이티브에 가깝다고 함.

  - 용이한 확장성. 복제본들 사이에 공유할 데이터가 없으므로 확장이 더 쉬움.

 

단점

  - 병목 현상 발생. 만약 여러 복제본이 db에서 값을 읽으면 문제될 수 있음.

  - 업스트림 의존성 발생. 외부 시스템에 대한 의존성 생김.

package main

import (
	"database/sql"
	"fmt"
	"net/url"
	"sync"

	_ "github.com/lib/pq" // init 함수를 자동으로 호출하지만, 실제 패키지는 사용하지 않음.
)


//디비 연결
type PostgresDbParams struct {
	dbName   string
	host     string
	user     string
	password string
}


//파일이랑 다른 점은 Db 연결 추가, 마지막 seq number 관리 없앰.
type PostgresTransactionLogger struct {
	events chan<- Event
	errors <-chan error 
	db     *sql.DB
	wg     *sync.WaitGroup
}

func (l *PostgresTransactionLogger) WritePut(key, value string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventPut, Key: key, Value: url.QueryEscape(value)}
}

func (l *PostgresTransactionLogger) WriteDelete(key string) {
	l.wg.Add(1)
	l.events <- Event{EventType: EventDelete, Key: key}
}

func (l *PostgresTransactionLogger) Err() <-chan error {
	return l.errors
}

func (l *PostgresTransactionLogger) LastSequence() uint64 {
	return 0
}


func (l *PostgresTransactionLogger) Run() {
	events := make(chan Event, 16) // Make an events channel
	l.events = events

	errors := make(chan error, 1) // Make an errors channel
	l.errors = errors

	go func() { // The INSERT query
    	//db에서 sequence number 관리
		query := `INSERT INTO transactions
			 (event_type, key, value)
			 VALUES ($1, $2, $3)`

		for e := range events { // Retrieve the next Event
			_, err := l.db.Exec( // Execute the INSERT query
				query,
				e.EventType, e.Key, e.Value)

			if err != nil {
				errors <- err
			}

			l.wg.Done()
		}
	}()
}

func (l *PostgresTransactionLogger) Wait() {
	l.wg.Wait()
}

func (l *PostgresTransactionLogger) Close() error {
	l.wg.Wait()

	if l.events != nil {
		close(l.events) // Terminates Run loop and goroutine
	}

	return l.db.Close()
}

func (l *PostgresTransactionLogger) ReadEvents() (<-chan Event, <-chan error) {
	outEvent := make(chan Event)    // An unbuffered events channel
	outError := make(chan error, 1) // A buffered errors channel
    //unbuffer 이용할 경우, consume 부분이 준비될 때까지 produce하지 않음.

	query := "SELECT sequence, event_type, key, value FROM transactions"

	go func() {
		defer close(outEvent)
		defer close(outError)

		rows, err := l.db.Query(query)
		if err != nil {
			outError <- fmt.Errorf("sql query error: %w", err)
			return
		}

		defer rows.Close() // 커넥션 종료.

		var e Event // Create an empty Event

		for rows.Next() { // Iterate over the rows

			err = rows.Scan(
				&e.Sequence, &e.EventType,
				&e.Key, &e.Value)

			if err != nil {
				outError <- err
				return
			}

			outEvent <- e // Send e to the channel
		}

		err = rows.Err()
		if err != nil {
			outError <- fmt.Errorf("transaction log read failure: %w", err)
		}
	}()

	return outEvent, outError
}

func (l *PostgresTransactionLogger) verifyTableExists() (bool, error) {
	const table = "transactions"

	var result string

	rows, err := l.db.Query(fmt.Sprintf("SELECT to_regclass('public.%s');", table))
	if err != nil {
		return false, err
	}
	defer rows.Close()

	for rows.Next() && result != table {
		rows.Scan(&result)
	}

	return result == table, rows.Err()
}

func (l *PostgresTransactionLogger) createTable() error {
	var err error

	createQuery := `CREATE TABLE transactions (
		 sequence      BIGSERIAL PRIMARY KEY,
		 event_type    SMALLINT,
		 key 		  TEXT,
		 value         TEXT
	   );`

	_, err = l.db.Exec(createQuery)
	if err != nil {
		return err
	}

	return nil
}

func NewPostgresTransactionLogger(param PostgresDbParams) (TransactionLogger, error) {
	connStr := fmt.Sprintf("host=%s dbname=%s user=%s password=%s sslmode=disable",
		param.host, param.dbName, param.user, param.password)

	db, err := sql.Open("postgres", connStr)
	if err != nil {
		return nil, fmt.Errorf("failed to create db value: %w", err)
	}

	//대부분의 드라이버가 핑을 날릴 때 커넥션을 맺음.
	err = db.Ping() // Test the databases connection
	if err != nil {
		return nil, fmt.Errorf("failed to opendb connection: %w", err)
	}

	tl := &PostgresTransactionLogger{db: db, wg: &sync.WaitGroup{}}

	exists, err := tl.verifyTableExists()
	if err != nil {
		return nil, fmt.Errorf("failed to verify table exists: %w", err)
	}
    //테이블 존재하지 않으면 생성
	if !exists {
		if err = tl.createTable(); err != nil {
			return nil, fmt.Errorf("failed to create table: %w", err)
		}
	}

	return tl, nil
}

 

package main

import (
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

var transact TransactionLogger

func loggingMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		log.Println(r.Method, r.RequestURI)
		next.ServeHTTP(w, r)
	})
}

func notAllowedHandler(w http.ResponseWriter, r *http.Request) {
	http.Error(w, "Not Allowed", http.StatusMethodNotAllowed)
}

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	err = Put(key, string(value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WritePut(key, string(value))

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	transact.WriteDelete(key)

	log.Printf("DELETE key=%s\n", key)
}

func initializeTransactionLog() error {
	var err error

	transact, err = NewPostgresTransactionLogger(PostgresDbParams{
		host:     "localhost",
		dbName:   "test",
		user:     "test",
		password: "test",
	})
	if err != nil {
		return fmt.Errorf("failed to create transaction logger: %w", err)
	}

	events, errors := transact.ReadEvents()
	count, ok, e := 0, true, Event{}

	for ok && err == nil {
		select {
		case err, ok = <-errors:

		case e, ok = <-events:
			switch e.EventType {
			case EventDelete: // Got a DELETE event!
				err = Delete(e.Key)
				count++
			case EventPut: // Got a PUT event!
				err = Put(e.Key, e.Value)
				count++
			}
		}
	}

	log.Printf("%d events replayed\n", count)

	transact.Run()

	go func() {
		for err := range transact.Err() {
			log.Print(err)
		}
	}()

	return err
}

func main() {
	// Initializes the transaction log and loads existing data, if any.
	// Blocks until all data is read.
	err := initializeTransactionLog()
	if err != nil {
    	//함수 멈추고, defer 함수 실행.
		panic(err)
	}

	// Create a new mux router
	r := mux.NewRouter()

	r.Use(loggingMiddleware)

	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	r.HandleFunc("/v1", notAllowedHandler)
	r.HandleFunc("/v1/{key}", notAllowedHandler)

	log.Fatal(http.ListenAndServe(":8080", r))
}

파일을 이용한 것과 큰 차이는 없습니다.

 

실행했을 경우, put(2), delete(1)에 대한 작업을 잘 저장하는 것을 확인할 수 있습니다.

 

또한 재실행하고 key a를 조회하면,

이전 작업 읽음.

 

맨 마지막에 작업했던 값인 bye를 읽는 것을 확인할 수 있습니다.

반응형

'BackEnd > go' 카테고리의 다른 글

[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

https://github.com/cloud-native-go/examples 로 공부한 내용입니다.

 

1. net/http를 이용한 Rest API 

package main

import (
	"log"
	"net/http"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("Hello net/http!\n"))
}

func main() {
	http.HandleFunc("/", helloGoHandler)


	// 2번째 인자는 multiplexor. nil인 경우 DefaultServeMux 이용
	//ListenAndServe는 에러가 발생한 경우에만 반환.
    //log.Fatal은 에러가 발생할 경우, 에러메시지 반환하고 종료
	log.Fatal(http.ListenAndServe(":8080", nil))

}

 

multiplexor로 패턴(경로 등)이랑 함수를 매핑해줍니다. 

 

pattern 매칭

이러한 코드가 있고,

 

이런 식으로 등록을 하게 됩니다.

 

 

하지만 다른 mux를 사용하고 싶을 수 있습니다.

책에서는 gorllia/mux를 사용하는 예제를 설명합니다.

 

2. gorilla mux를 이용한 rest api

package main

import (
	"log"
	"net/http"
	//따로 go init {...} 해서 go.mod 만들어야 함.
	"github.com/gorilla/mux"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	w.Write([]byte("Hello net/http!\n"))
}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/", helloGoHandler)

	log.Fatal(http.ListenAndServe(":8080", r))

}

 

 

앞에서 사용한 net/http에서 추가된 기능으로는

경로에 /{key} 같은 형태를 넣을 수 있고 정규표현식도 사용할 수 있습니다.

 

또한 이러한 파라미터를 추출할 수 있습니다.

 

이를 적용한 코드가 아래 코드입니다.

package main

import (
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

func helloGoHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	var name string = vars["key"]

	w.Write([]byte("Hello " + name + "\n"))
}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/{key}", helloGoHandler)

	log.Fatal(http.ListenAndServe(":8080", r))

}

 

 

실제 출력

보시면 me를 출력한 것을 볼 수 있습니다.

쉽게 파라미터를 추출할 수 있는 것을 확인할 수 있습니다.

 

 

이를 이용해서 put과 get 기능을 만들어보겠습니다.

package main

import "errors"

//멀티 쓰레드로 동작하면?
var store = make(map[string]string)

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	delete(store, key)

	return nil
}

func Get(key string) (string, error) {
	value, ok := store[key]

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store[key] = value

	return nil
}

 

package main

import (
	"errors"
	"io"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

func keyValuePutHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := io.ReadAll(r.Body)
	defer r.Body.Close()

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	err = Put(key, string(value))

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)

	log.Printf("PUT key=%s value=%s\n", key, string(value))
}

func keyValueGetHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	value, err := Get(key)
	if errors.Is(err, ErrorNoSuchKey) {
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	w.Write([]byte(value))

	log.Printf("GET key=%s\n", key)
}

func keyValueDeleteHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	key := vars["key"]

	err := Delete(key)

	if err != nil {
		http.Error(w,
			err.Error(),
			http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)

	log.Printf("Delete key=%s\n", key)

}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/v1/{key}", keyValuePutHandler).Methods("PUT")
	r.HandleFunc("/v1/{key}", keyValueGetHandler).Methods("GET")
	r.HandleFunc("/v1/{key}", keyValueDeleteHandler).Methods("DELETE")

	log.Fatal(http.ListenAndServe(":8080", r))
}

 

테스트를 해보면,

1. PUT

key : a, value : hello

 

2. GET

value인 hello 출력되는 것 확인

3.DELETE

key가 a인 것 삭제

 

4. 삭제 후, GET

no such key 출력 확인

잘 동작하는 것을 확인할 수 있습니다.

 

 

3. 동시성 문제

하지만 앞에서 봤던 코드에서 멀티 쓰레드로 동작할 경우 동시성 문제가 생길 수 있습니다.

package main

import "errors"

//멀티 쓰레드로 동작하면?
var store = make(map[string]string)

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	delete(store, key)

	return nil
}

func Get(key string) (string, error) {
	value, ok := store[key]

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store[key] = value

	return nil
}

key-value 저장소를 그냥 map으로 이용했기 때문입니다.

 

 

이를 해결하기 위해서 mutex를 이용합니다.

 

 

package main

import (
	"errors"
	"sync"
)

var store = struct {
	sync.RWMutex
	m map[string]string
}{m: make(map[string]string)}

var ErrorNoSuchKey = errors.New("no such key")

func Delete(key string) error {
	store.Lock()
	delete(store.m, key)
	store.Unlock()

	return nil
}

func Get(key string) (string, error) {
	store.RLock()
	value, ok := store.m[key]
	store.RUnlock()

	if !ok {
		return "", ErrorNoSuchKey
	}

	return value, nil
}

func Put(key string, value string) error {
	store.Lock()
	store.m[key] = value
	store.Unlock()

	return nil
}

 

코드를 보면, get은 read lock을 사용하는 것을 볼 수 있습니다.

 

반응형

k8s 및 go 관련해서 사내 스터디를 진행 중입니다.

future 에 관해 스터디를 했었습니다.

 

책 저자는 future는 비동기 프로세스에 의해 생성되는 값에 플레이스홀더를 제공하는 연산자라고 합니다.(문맥에 따라 다른 의미가 될 수 있다고 항의하지 말라고 합니다.)

 

자바에서는 future를 통해 다른 작업을 하다가, 값이 필요해지면 get()을 해서 기다릴 수 있습니다.

아래 코드도 비슷하게 동작합니다.

slow future를 호출해서 본 작업을 실행하고, Result()를 호출하면, 결과가 올 때 까지 기다립니다.

 

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Future interface {
	Result() (string, error)
}

type InnerFuture struct {
	once sync.Once
	wg   sync.WaitGroup

	res string
	err error

	resCh <-chan string
	errCh <-chan error
}

func (f *InnerFuture) Result() (string, error) {
	
    //SlowFunction이 끝날 때 까지 기다림. 그리고 한 번만 실행되도록 보장.
    f.once.Do(func() {
		f.wg.Add(1)
		defer f.wg.Done()
		fmt.Println("hi")
		fmt.Println(time.Now())
		f.res = <-f.resCh
		f.err = <-f.errCh
		fmt.Println("bye")
	})

	// 결과 값 가져올 때까지 기다리기.
	f.wg.Wait()
	fmt.Println("end")

	return f.res, f.err
}

func SlowFunction(ctx context.Context) Future {
	resCh := make(chan string)
	errCh := make(chan error)

	// 비동기 작업 실행.
	go func() {
		fmt.Println("start async(future job)")
		select {
		case <-time.After(time.Second * 5):
			fmt.Println("end async")
			resCh <- time.Now().String()
			errCh <- nil
		case <-ctx.Done():
			resCh <- ""
			errCh <- ctx.Err()
		}
	}()

	return &InnerFuture{resCh: resCh, errCh: errCh}
}

func main() {
	ctx := context.Background()

	//비동기 작업 실행
	future := SlowFunction(ctx)

	//다른 작업 수행
	fmt.Println("other work start")
	select {
	case <-time.After(time.Second * 3):
		fmt.Println("done")
	}

	//결과 가져오기.
	res, err := future.Result()
	res2, _ := future.Result()

	if err != nil {
		fmt.Println("error : ", err)
		return
	}
	fmt.Println(res)
	fmt.Println(res2)
}

출처 : https://github.com/cloud-native-go/examples (살짝 수정한 코드)

 

 

이를 실행하면, 

 

other work start
start async(future job)
done
hi
2009-11-10 23:00:03 +0000 UTC m=+3.000000001
end async
bye
end
end
2009-11-10 23:00:05 +0000 UTC m=+5.000000001
2009-11-10 23:00:05 +0000 UTC m=+5.000000001

 

이 반환 됩니다.

 

1,2 째줄은 동시에 수행이 됩니다.

3번 째줄은 3초 뒤에 출력이 됩니다.(기존 작업이 3초 sleep이기 때뮨에)

4,5번째 줄은 아까 값을 기다리는 로직이 있었습니다.(once.Do(...)) 이 부분의 로직을 호출했다는 의미로 hi와 호출 시간을 출력했습니다. 앞에서 3초 기다렸기 때문에, +3초 찍히는 걸 볼 수 있습니다.

6번째 줄은 future 내부의 실제 작업이 끝난 것을 의미하고, 7번째 줄은  once.Do(...) 에서 해당 작업이 끝난 것을 감지했을 경우 출력이 됩니다.

7,8 번 째 end가 두 번 출력된 이유는 main 함수에서 future.Result()를 2번 호출하고 있기 때문입니다. 하지만 6번째의 end async는 2번 호출되지 않은 것을 확인할 수 있습니다. 이는 sysn.once 를 이용하기 때문에 이러한 결과가 나왔습니다.

실제로 .Result() 를 처음 호출했을 경우는 future의 비동기 작업이 끝날 때 까지 기다리지만, 그 이후 호출에서는 한 번 더 작업을 실행하지 않고 이전에 처리한 결과값을 바로 리턴합니다.

 

그래서 9,10 번째도 똑같은 시간이 5초가 나온 것을 확인할 수 있습니다. 5초가 나온 이유는 future의 비동기 작업에서 5초 sleep했기 때문입니다.

 

 

비동기 프로그래밍이 진짜 어려운 것 같습니다. webflux 느낌도 나고 재미있네요.

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

k8s 및 go 관련해서 사내 스터디를 진행 중입니다.

 

근데 struct가 어떻게 동작하는지 헷갈려서 자바와 비교를 하게 되었습니다.

 

 

우선 자바 같은 경우에는 class가 있습니다.

이런 식으로 하게 되면, test1 이랑 test2가 같은 객체(메모리 주소 값)를 가지게 됩니다.

test1의 필드를 수정해도 test2의 필드도 같이 수정이 되게 됩니다.

그래서 복사본을 따로 만들어주거나 dto를 사용하는 등 예상치 못한 수정을 막아야 합니다.

 

 

그럼 go의 struct도 똑같이 동작을 하는지 궁금했습니다.

 

 

s라는 student struct와, tt라는 stude struct가 있습니다.

tt := s 를 이용하여 주면, tt와 s가 같은 메모리 주소의 변수를 이용하고 있는지 궁금했습니다.

 

이를 실행하면,

이처럼 tt 다음에 ttt가 아닌 ss가 오게 됩니다.

즉, 자바와는 다르게 tt의 이름을 수정을 해도 s의 이름은 수정이 되지 않았습니다. 

 

또한 메모리 주소도 서로 다른 것을 확인할 수 있습니다.

 

 

반면에 getStudent() 메소드가 포인터를 반환하게 하면, 서로 같은 메모리를 참조하고 있습니다.

(printf에서 &이 사라진 이유는 포인터 변수가 가리키고 있는 메모리 주소(실제 이용하는 객체(?)의 위치)를 알기 위해서입니다.

 

이런 식으로 해야 자바처럼 똑같은 메모리의 변수를 바라보게 됩니다.

 

 

결론은 go의 struct는 referrence가 아닌 value로 할당을 해주게 되면, 그때마다 메모리에 새로운 값을 할당하고 이를 반환하는 방식인 것을 확인할 수 있었습니다,

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[GO] go를 이용한 안정성 패턴 구현(서킷)  (0) 2024.06.12

클라우드 네이티브 Go 책을 참고했습니다.

 

 

 

go를 이용하여 안정성 패턴을 구현해봅니다.

(분산 애플리케이션에서의)

 

 

서킷 브레이커

서비스가 실패할 경우, 장애가 퍼지는 걸 막을 수 있습니다.

 

예를 들면, db lock 이나 리소스 부족 등으로 인해 timeout이 발생할 수 있습니다. time out이 계속해서 발생하게 될 경우에는 뒤의 요청에 대해서도 처리를 하지 못합니다. 만약 MSA 라면, 모든 서비스들이 영향을 받게 되고 이는 전체 서비스 장애와 이어 집니다.

 

또한 클라이언트가 retry까지 하게 되면 네트워크 단에 부하가 엄청나게 발생합니다.

서킷 브레이커는 이러한 장애를 막을 수 있습니다.

 

에러가 몇 번 이상 발생했을 경우 서킷을 open 합니다. 그러면 기본 로직은 동작하지 않고 바로 error를 클라이언트에 반환합니다.

시간이 지난 후에는 half open으로 요청 중 일부만 받아들입니다.

만약 이 과정에서 에러가 발생하지 않았다면 서킷이 close되고 기존 로직이 동작하게 됩니다.

 

이를 go로 구현해보겠습니다.

package circuit

// context는 thread safe.
// 즉, 다수의 go routine이 접근해도 괜찮음.

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"
)

type Circuit func(context.Context) (string, error)

func Breaker(circuit Circuit, threshold int) Circuit {

	var last = time.Now()
	var m sync.RWMutex
	failures := 0

	return func(ctx context.Context) (string, error) {
		m.Lock() //mutex 를 이용한 read lock(기존 코드) -> Read lock 을 하면 서킷에 오픈이 안됨..?

		count := failures - threshold

		// 실패 개수가 더 많을 경우
		if count >= 0 {
			retryAt := last.Add(40 * time.MilliSecond)
			if !time.Now().After(retryAt) {
				m.Unlock()
				fmt.Println("open")
				return "open", errors.New("open")
			}
		}

		m.Unlock()

		res, err := circuit(ctx)

		m.Lock()
		defer m.Unlock() //끝날 때 lock

		last = time.Now() //이를 위해 lock이 필요함.

		if err != nil {
			fmt.Println(err)
			failures++ // need lock
			return res, err
		}

		if failures > 0 {
			failures -= 1
		}

		fmt.Println("200")
		return res, nil

	}

}

 

 

테스트 코드 입니다.

package circuit

import (
	"context"
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"testing"
	"time"
)

func failAfter(threshold int) Circuit {
	count := 0

	// Service function. Fails after 5 tries.
	return func(ctx context.Context) (string, error) {
		count++

		if count > threshold {
			return "", errors.New("INTENTIONAL FAIL!")
		}

		return "Success", nil
	}
}

func waitAndContinue() Circuit {
	return func(ctx context.Context) (string, error) {
		time.Sleep(time.Second)

		if rand.Int()%2 == 0 {
			return "success", nil
		}

		return "Failed", fmt.Errorf("forced failure")
	}
}

func TestCircuitBreakerFailAfter5(t *testing.T) {
	circuit := failAfter(5)
	ctx := context.Background()

	for count := 1; count <= 10; count++ {
		message, err := circuit(ctx)

		t.Logf("attempt %d: %v, %s", count, err, message)

		switch {
		case count <= 5 && err != nil:
			t.Error("expected no error; got", err)
		case count > 5 && err == nil && message != "open":
			t.Error("expected err and open")
		}
	}
}

func TestCircuitBreakerDataRace(t *testing.T) { //테스트 코드!
	ctx := context.Background()

	circuit := failAfter(5)
	breaker := Breaker(circuit, 1)

	wg := sync.WaitGroup{}

	for count := 1; count <= 20; count++ {
		wg.Add(1)
        
        

		go func(count int) {
			defer wg.Done()
			
			message, err := breaker(ctx)
			time.Sleep(10 * time.Millisecond)
			t.Logf("attempt %d: err=%v, message=%s", count, err, message)
		}(count)
	}

	wg.Wait()
}

 

5번 이상이 넘어가면 error가 발생하는 코드 입니다.

그리고 1번 에러가 발생하면, 그 다음에 circuit이 open 됩니다.

여기서 breaker에서는 40 밀리세컨드 동안 open이 됩니다. 그리고 다시 close 되고 error 발생한 후에 open이 되는 것을 볼 수 있습니다.

 

 

* half open 등이 세부 사항은 구현되지 않았습니다.

 

* read lock을 이용해도 잘 됩니다.

 

 

반응형

'BackEnd > go' 카테고리의 다른 글

[Go] 리소스 상태 유지  (0) 2024.07.09
[GO] Rest API 구현  (0) 2024.07.06
[Go] 동시성 패턴 future  (0) 2024.06.29
[Go] struct 와 포인터(자바 클래스와 비교)  (0) 2024.06.25

 

Persistence Volume(PV)

docker에도 volume이 있음.

-> 컨테이너의 데이터를 영구적으로 보관하기 위해서 이용함.

-> POD 에도 이를 지원하는 기능이 있음.

 

 

apiVersion: v1
kind: Pod
metadata:
  name: tmp
spec:
  containers:
  - image:
    ...
    volumeMounts:
    - mountPath: /opt  //컨테이너 내부 경로
      name: data-volume
    
  volumes:
  - name: data-volume
    hostPath:
      path:/data
      type: Directory

하지만 클러스터에서는 권장하지 않는 방법이라고 함.(단일 노드는 상관이 없음.)

-> 이는 모든 노드의 /data 에 모두 같은 데이터가 있기를 기대하기 때문임.

-> 이를 해결하기 위해서는 k8s가 지원하는 다양한 솔루션을 이용하면 됨.

(NFS, amazon web services 등)

 

volumes:
- name: data-volume
  awsElasticBlockStore:
    volumeID: <volume-id>
    fsType: ext4

이런 식으로 사용 가능.

 

apiVsersion: v1
kind : PersistentVolume
metadata:
  name: pv-voll
spec:
  accessModes:
    - ReadWriteOnce //ReadOnlyMany, ReadWriteMany 옵션도 있음.
  capacity:
    storage: 1Gi
  hostPath: // 앞에서 처럼 aws 등 다양한 솔루션 사용 가능.
    path: /tmp/data

 

access mode 관련 설명

https://kubernetes.io/ko/docs/concepts/storage/persistent-volumes/#%EC%A0%91%EA%B7%BC-%EB%AA%A8%EB%93%9C

 

 

 

Persistence Volume Claim

이는 하나의 PV 로 묶여 있음.

-> 더 큰 용량의 PV에 묶일 수도 있음.

-> 사용 가능한 볼륨이 없으면, 보류 상태가 됨.

 

 

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: cl
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    reqeusts:
      storage: 500Mi

 

이는 이전에 생성된(위의 yaml)에 bine 됨.

(위에서는 1기가이며 accessMode가 일치해서)

 

만약이 PVC가 삭제되면, 해당 볼륨에 어떠한 작업을 수행하는지?

- Retain : 리소스를 수동으로 삭제할 때 까지 다른 claim에서 사용하지 못함.

- Delete: 데이터, PV 삭제

- Recycle: 더 이상 사용하지 않는다고 함.(공식 홈페이지) 대신 동적 프로비저닝이라는 것을 사용한다고 함. 기존 데이터만 삭제

 

 

Pod - PVC - PV 를 통해 PVC 를 볼륨처럼 사용할 수 있음.

-> 이렇게 하는 이유는 파드의 각각 상황에 따라서 다양한 스토리지를 사용할 수 있게 해줌.

 

 

Storage

앞에서는 PVC를 이용하여 요청하려면, PV 를 계속 만들어줘야 하는 단점이 있음.

-> 이를 static provisioning 이라고 함.

 

반대는 dynamic provisioning. 이를 위해서 Storage 가 있음.

 

apiVersion: storage.k8s.io/v1
kind: StroageClass
metadata:
  name: google-storage
provisioner: kubernetis.io/gce-pd

parameters: 
  ... //프로비저너에 전달하는 파라미터들. 이는 홈페이지 참고

 

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: cl
spec:
  accessModes:
    - ReadWriteOnce
  storageClassName: google-storage // 추가된 부분
  resources:
    reqeusts:
      storage: 500Mi

 

StorageClass에 의해서 PV가 자동으로 생성이 됨.

(PV가 생성되지 않는 것은 아님!!)

 

 

 

ConfigMap

이를 이용하여 Pod에 값을 전달할 수 있음(ENV 등)

k create configmap <config-name> --from-literal=<key>=<value>


k create configmap <config-name> --from-file=<file>

 

yaml로도 생성 가능.

apiVersion: v1
kind: ConfigMap
metadata:
  name: game-demo
data:
  # 속성과 비슷한 키; 각 키는 간단한 값으로 매핑됨
  player_initial_lives: "3"
  ui_properties_file_name: "user-interface.properties"

  # 파일과 비슷한 키
  game.properties: |
    enemy.types=aliens,monsters
    player.maximum-lives=5    
  user-interface.properties: |
    color.good=purple
    color.bad=yellow
    allow.textmode=true

 

주의 사항은 이는 암호화를 제공하지 않음.

-> 이를 위해서 k8s의 Secret을 이용하거나 써드파티 도구를 이용하면 됨.

 

pod 에 해당 정보를 주기 위해서는

apiVersion: v1
kind: Pod
metadata:
  name: configmap-demo-pod
spec:
  containers:
    - name: demo
      image: alpine
      command: ["sleep", "3600"]
      envFrom:
        - configMapRef:
          name: app-config // config 이름

 

 

env 등 주입할 때 다양한 방법이 있음.

apiVersion: v1
kind: Pod
metadata:
  name: configmap-demo-pod
spec:
  containers:
    - name: demo
      image: alpine
      command: ["sleep", "3600"]
      env:
        # 환경 변수 정의
        - name: PLAYER_INITIAL_LIVES # 참고로 여기서는 컨피그맵의 키 이름과
          # 대소문자가 다르다.
          valueFrom:
            configMapKeyRef:
              name: game-demo           # 이 값의 컨피그맵.
              key: player_initial_lives # 가져올 키.
        - name: UI_PROPERTIES_FILE_NAME
          valueFrom:
            configMapKeyRef:
              name: game-demo
              key: ui_properties_file_name
      volumeMounts:
        - name: config
          mountPath: "/config"
          readOnly: true
  volumes:
    # 파드 레벨에서 볼륨을 설정한 다음, 해당 파드 내의 컨테이너에 마운트한다.
    - name: config
      configMap:
        # 마운트하려는 컨피그맵의 이름을 제공한다.
        name: game-demo
        # 컨피그맵에서 파일로 생성할 키 배열
        items:
          - key: "game.properties"
            path: "game.properties"
          - key: "user-interface.properties"
            path: "user-interface.properties"

 

반응형

+ Recent posts