Kafka를 통해 데이터를 송신해야하는 케이스가 생겼다.

비지니스 로직상 빈도는 낮으나(한달에 한번도 호출 안한다.) 한번 발생하면 한번에 약 3만건의 데이터를 전송해야한다.

 

개발 후 consumer측과 테스트를 진행하던 도중 문제가 생겼는데

데이터를 단건 씩 보낼때는 이상이 없었으나 3만건을 반복문으로 계속 보냈더니 수신한 데이터의 건은 1,2000건이었다.....

 

LAG나 다른 kafak의 상태를  보고싶었지만 kafkacat과 같은 툴을 반입할 수 없고, kafka 서버에 직접 터미널로 접속할 수 없는 상태였다.

 

일단 송신과정에서 누락이 된것인가 싶어서 kafka producer의 ACKS옵션을 변경하였다.

 

acks

application.properties

spring.kafka.producer.acks=-1

 

acks의 옵션은 총 3가지로 -1, 0, 1이다.

  • 0 : 프로듀서가 kafka에 메시지를 전송하고 kafka로부터 잘 전송되었는지 응답을 받지 않는다.
  • 1 :  프로듀서가 kafka에 메시지를 전송하고 kafka의 leader가 잘 받았는지 응답을 받는다.
  • -1(all) : 프로듀서가 kafka에 메시지를 전송하고 kafka의 leader, follower가 잘 받았는지 응답을 받는다.

옵션 1, -1은 응답으로 실패를 받는다면 spring kafka가 재전송을 한다.

 


akcs를 적용후에도 동일한 증상이 발생.

단건 전송시에는 문제가 없었으니 데이터를 100건마다 Thread.sleep을 적용해보았다.

 

if (count % 100 == 0){
	Thread.sleep(1000)
}

 

수신측에서 모든 데이터가 들어왔다고 한다.

하지만 kafka 전송하는 로직이 @Service로 싱글톤 내부에 있는 메소드다.

 

싱글톤 내부의 메소드에서 Thread.sleep을 사용하게 되면 해당 로직이 끝나기 전까지는 블록상태가 된다.(맞나...?)

 


일단 kafka로 send하는 로직에 시간을 늘리면 해결된다는 부분은 확인을 했으니 방법을 찾아봤다.

 

Excuters로 별도의 스레드에 할당 할까 하였지만?

Kafka Producer의 다른 옵션들이 있는것을 확인

 


batch.size

메시지보관 사이즈로 linger의 옵션 시간동안 메시지를 쌓는다.

쌓인 메시지가 size보다 작으면 linger의 옵션까지 대기했다가 전송한다.

 

디폴트:16384

 

linger

linger.ms는 데이터 전송까지의 시간을 의미한다.

기본값은 0으로 데이터를 즉시 전송한다.

 

linger.ms = 10라면 10ms 마다 send한다.

 

단 batch.size만큼 데이터가 쌓여있다면 linger옵션을 무시하고 전송한다.

즉 batch.size가 꽉 찰 때 까지 기다리는 옵션

 

spring.kafka.producer.properties.linger.ms=10
// 혹은
spring.kafka.producer.linger.ms=10

Spring Kafka의 버전마다 해당 옵션 키값이 다른 듯 하다.

 

compression

메시지를 압축하는 옵션이다.

 

메시지를 압축하면 하지 않는 것 보다 발송 지연이 생기지만 비지니스 로직상 허용 가능한 부분이기에 추가하였다.

압축을 통하여 batch.size에 쌓이는 메세지의 수가 늘어나도록 설정하였다.

 

spring.kafka.producer.compression-type=snappy

 


 

리액트 교과서 Chapter10(툴팁 컴포넌트)

class Tooltip extends React.Component {
  constructor(props) {
    super(props)
    this.state = {opacity :false}
    this.toggle = this.toggle.bind(this)
  }
  // 토글 함수
  toggle() {
    const tooltipNode = ReactDOM.findDOMNode(this)
    this.setState({
      opacity: !this.state.opacity
    })
  }
  render() {
    // 토글 여부에 따른 스타일 변경
    const style = {
      "zIndex": this.state.opacity ? 1000 : -1000,
      "opacity": +this.state.opacity,
      "top": (this.state.top || 0) + 20,
      "left": (this.state.left || 0) - 30
  };
    return (
    <div style={{display: "inline"}}  >
    // 마우스 오버, 아웃 시 토글 함수 동작
      <span onMouseOver={this.toggle} onMouseOut={this.toggle} style={{color: 'blue'}}>{this.props.children}</span>
      <div style={style} >{this.props.text}</div>
    </div>
    )
  } 
}



let tooltip = document.getElementById('tooltip')
let root = ReactDOM.createRoot(tooltip)

root.render(
  <Tooltip text="The Book you're reading now">"React Quickly" </Tooltip>
)

 


 

 

 

빌드를 간편하게 하기위한 스크립트 정리

packge.json에 빌드 스크립트를 추가
npm build를 사용하여 jsx -> js로 변환한다.

"scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "build": ".\\node_modules\\.bin\\babel .\\jsx -d .\\js --extensions .jsx --copy-files -w"
  },

 

메뉴 컴포넌트 작성

menu.jsx

class Menu extends React.Component {

  render() {
    const menu = [ 'Home',
      'About',
      'Service',
      'Portfolio',
      'Contact us'
    ]
    return (
      <div>
        {menu.map((x, i) => {
          return <div key={i}>
            <Link label={x}></Link>
            <br></br>
          </div>
        })}
      </div>

    ) 
  }
}

 

Link 컴포넌트 작성

Link.jsx

class Link extends React.Component{
  render() {
    const url = '/' + this.props.label.toLowerCase().trim().replace(' ','-')
    return (
      <a href={url}>{this.props.label}</a>
    )
  }
}

 

script 작성

script.jsx

let menu = document.getElementById('menu')
let root = ReactDOM.createRoot(menu)

root.render(
  <Menu></Menu>
)

 

해당 컴포넌트들을 html에서 로드

/menu/index.html
만약 menu가 하나의 jsx에 작성되어 있다면 하나의 js만 불러와도 될듯.

    <script src="../js/components/menu/Menu.js" type="text/javascript"></script>
    <script src="../js/components/common/Link.js" type="text/javascript"></script>
    <script src="../js/components/script.js" type="text/javascript"></script>

 


https://github.com/Kmmanki/react_note/tree/master/chapter09

 

 

GitHub - Kmmanki/react_note: 리액트 교과서 따라하기

리액트 교과서 따라하기. Contribute to Kmmanki/react_note development by creating an account on GitHub.

github.com

 

참고 사이트

 

 

Spring Integration Overview

스프링 인티그레이션의 핵심 개념과 메인 컴포넌트 소개

godekdls.github.io

 

Main Component

스프링 인테그레이션은 파이프 필터 모델로 작성되며 구성은 “파이프”, “필터”, “메시지”로 이루어져 있다.


Message

스프링 인테그레이션에서 사용되는 데이터를 의미하며 사용하는 데이터를 감싼 래퍼(wrapper)이다.
즉 Message와 같은 wrapper형식으로 사용한다.


Message Channel(Pipe)

메세지 채널은 Pipe의 역할을 하며 Message를 Filter로 전달하는 역할을 한다.
메시지를 전달하는 프로듀서 채널, 메시지를 받아가는 컨슈머 채널이 ****분리되어 있어 채널간 인터셉터 등을 사용 할 수 있다.

Message Channel Interface

  • MessageChannel
  • PollableChannel
  • SubscribeChannel

Message Channel Impliment

위의 메시지 인터페이스를 구현한 구연체들이 존재한다.

  • publishSubscribeChannel: 전달받은 Message를 자신을 구독한 모든 Handler에게 전달한다.
  • QueueChannel: Point to Point로 해당 채널의 컨슈머가 여럿 있더라도 하나의 컨슈머에 전달한다.
  • 큐가 가득차면 send 메소드를 호출 시 sender를 블록킹한다*.
  • 큐가 비어있다면 recevie호출이 블록킹 된다.*
  • PriorityChannel: 큐 채널은 First -In-First-Out을 따르지만 PriorityChannel은 메시지마다 우선순위를 부여할 수 있다.
  • RendezvousChannel: receive를 호출 하기 전까지 sender를 블록한다.
  • DirectChannel: point-to-pint로 동작하며 PollableChannel을 구현한것이 아닌 SubscribeChannel을 구현하였기에 메시지를 직접 Handdlrt에 전달한다.
    Poller방식이 아니기 때무에 스레드를 스케줄링 하지 않아 오버헤드가 없기 때문에 Spring Integration의 디폴트 채널이다.
    구독하고 있는 MessageHandler를 호출하는 일을 메시지 디스페처에 위임한다.
  • ExcutorChannel: DirectChannel과 같지만 MessageHandler를 호출하는 디스패치 수행을 TastExcutor에게 위임한다는 것이다.
  • FlusMessageChannel: 리액티브 구독자가 온맨드로 컨슘해갈 수 있도록 전달받은 메시지를 변경하는 채널

MessageEndpoint(Filter)

Filter는 메시지를 컨슘하는 모든 구성요소를 말한다.

Message Transformer

메시지의 내용 및 구성요소를 변환하고 수정한 메시지를 반환하는 일을 한다.
Ex) Byte를 String으로 변환한다던지.

MessageFilter

메시지 필터는 해당 메시지를 다른 채널로 전달할지 말지를 결정한다. 필터에서는 간단한 boolean 테스트 메소드가 있으면 된다. Ex) header 값에 따른 전달 여부 판단.

MessageRouter

메시지 라우터는 해당 메시지를 여러 채널중 분기처리를 할 때 사용된다.
Ex) 콜 시작 메시지 → callStartChannel, 콜 종료 메시지 → callEndChannel

Spliter

입력 받은 메시지를 여러 메시지로 분할하는 역할을 담당하며 일반적으로 “복합”페이로드를 세분화하여 여러 메시지로 나누는데 사용한다.

Aggregator

Spliter와 반대로 여러 메시지를 하나의 메시지로 통합할 때 사용된다.

ServiceActivator

서비스 인스턴스를 메시지 시스템에 연결하기 위한 범용 엔드포인트이다. 입력 채널을 반드시 구성해야하며 반환할 수 있는 경우 출력 체널도 지정할 수 있다.

ChannelAdeptor

채널 어뎁터는 메시지 채널을 다른 시스템이나 전송 구성요소에 연결해주는 엔드포인트다. 어뎁터는 인바운드와 아웃바운드로 구성된다.


간단한 예제

https://github.com/Kmmanki/spring_integration_study_1

 

GitHub - Kmmanki/spring_integration_study_1

Contribute to Kmmanki/spring_integration_study_1 development by creating an account on GitHub.

github.com

해당 예제는 pythonSender - Spring Integration - pythonReceiver 로 구성되어 있으며

 

python과 Integration은 TCP 소켓으로 연결한다.

python sender에서 1 이라는 입력을 주면 spring integration에서 1 > 2 메시지를 만들어 pythonReceiver 전달하고 pythonReceiver 는 전달 받은 메시지에 > 3을 붙여 1 > 2 > 3을 만들어 integration으로 reply한다. integration은 전달받은 1 > 2 > 3에 > 4를 붙여 1 > 2 > 3 > 4를 다시 reply하여 sender로 전송한다.

 

예제코드 도식화

 

 

 

'기타 > 기타의 기타' 카테고리의 다른 글

자격증 정리  (0) 2023.12.28
c++ 포인터와 referense를 이용한 두 변수 값 바꾸기  (1) 2023.12.26
JVM Runtime Data Area  (1) 2023.12.23

+ Recent posts