Skip to content

FEATURE: Add CompletableFuture Pipeline API#1035

Open
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipenewapi
Open

FEATURE: Add CompletableFuture Pipeline API#1035
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipenewapi

Conversation

@oliviarla
Copy link
Collaborator

@oliviarla oliviarla commented Dec 23, 2025

🔗 Related Issue

  • Pipeline API를 추가합니다.
  • 사용 방법
    • Pipeline 객체를 생성하여 하나의 파이프라인으로 포함할 명령들을 조합한다.
    • 파이프라인에 포함할 수 있는 명령들을 제한할 수 있다. (명령어의 종류, 개수)
    • 명령어의 개수는 최대 500개로 제한한다. 이를 통해 한 노드에는 최대 하나의 파이프라인이 보내진다.
    public class Pipeline<V> {
    
        // 다양한 명령을 포함할 수 있는 PipelineOperationImpl 타입을 value에 저장한다.
        private Map<MemcachedNode, Operation> ops;
        
        public Pipeline() {
          // ...
        }
        
        public Pipeline<V> lopInsert(String key, int index, V value) {
          // ...
          return this;
        }
        
        public Pipeline<V> mopInsert(String key, String mkey, V value) {
          // ...
          return this;
        }
        
        // ...
        
    }
    • 명령의 조합 순서대로 파이프라인이 만들어지지만, 서버에서 처리 도중 다른 연산이 끼어들 수 있다.
    • 서로 다른 여러 키를 파이프라인에 포함하는 경우, 각 노드 별로 파이프라인이 생성되어 노드에 전달된다.
    • Pipeline 객체를 execute 메서드에 넘겨야 파이프라인이 수행되며, List<Object> 에 각 명령에 대한 응답이 담긴다.
    ArcusFuture<List<Object>> execute(Pipeline<T> pipeline)

⌨️ What I did

  • 예외 처리
    • 모든 응답이 올 때 까지 기다린 후 여러 요청에서 발생한 실패/에러들을 하나의 CompositeException 타입으로 모아 던진다.
  • switchover 처리
    • responseIndex를 사용하여 아직 응답이 오지 않은 operation들만 새로운 master 노드에서 initialize할 때 포함되도록 한다.
  • migration 처리
    • gotStatus 메서드의 인자로 Operation을 받아, 여러 Operation으로 분리되더라도 원본 콜백에서 원본 index에 값을 추가할 수 있도록 한다.

@oliviarla oliviarla force-pushed the pipenewapi branch 2 times, most recently from 7a9e90a to 0d98bc8 Compare January 22, 2026 06:42
@oliviarla oliviarla requested a review from uhm0311 January 22, 2026 06:42
* NOTHING_TO_UPDATE, NOT_SUPPORTED
* or unknown statement
*/
atomicReference.get().set(index, status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<Object>로 반환하는 것보다 PipelineResult 클래스를 만들어서 List<Boolean> 필드와 Exception 필드를 넣어서 반환하는 것이 낫지 않나요?
결과를 받아서 사용하는 쪽에서 매번 타입 체크하고 타입 변환을 하는 것보다 나을 것 같습니다.

Copy link
Collaborator Author

@oliviarla oliviarla Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반환 타입에 대해 논의해보는 게 좋을 것 같습니다.
제시해주신 것처럼 별도 클래스를 만들기보다는 Map<Integer, CollectionOperationStatus>타입을 반환해서 실패 시 cause만을 저장하는건 어떤가요? 모두 성공한 경우 Map은 empty가 됩니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

다른 API에서는 CollectionOperationStatus가 아닌 Exception을 반환하는 것으로 되어 있습니다.
여기서도 Map<Integer, Exception> 타입으로 반환하는 것은 어떤가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipe는 multi key 연산이 될 수도 있으며 이로 인해 원자성, 순서를 보장할 수 없습니다.
예를 들어 500개의 명령을 파이프로 묶어 execute 메서드로 실행시키고자 하고, 5개의 노드에서 각각 100개씩 명령을 처리하던 도중, 특정 한 노드에서 50번째 명령에 에러가 발생해 이후 명령을 실행하지 않았을 때 이를 어떻게 표현할 지 결정해야 합니다.

오프라인으로 논의했을 때 다음과 같은 클래스를 반환하는 것을 제안했는데, 여기서 실행되지 않은 명령 인덱스에는 NOT_EXECUTED OperationStatus를 저장해두는게 좋을 것 같습니다.

public class PipelineResult {
    private final Map<Integer, OperationStatus> statusFailures;
    private final Map<Integer, Throwable> exceptions;
}

@uhm0311 다른 제안이 있다면 코멘트 부탁드립니다.

코드 수정은 @jhpark816 님까지 확인한 후 진행하겠습니다.

Copy link
Collaborator

@jhpark816 jhpark816 Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla @uhm0311
List<Object> 응답과 PipelineResult 응답 형식에서 아래 오류 시에 어떻게 표현하게 되나요?

  • ERROR|CLIENT_ERROR|SERVER_ERROR
  • PIPE_ERROR
  • opertion timeout
  • cancel (by worker thread or io thread)

Copy link
Collaborator Author

@oliviarla oliviarla Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • operation timeout의 경우 complete 로직과 무관하므로 이 논의와 연관이 없습니다.
  • cancel의 경우 항상 complete exceptionally되기 때문에 예외랑 동일시합니다.

List<Object>

  • 특정 노드에 보낸 응답에서 에러가 발생하지 않았다면, 응답 결과(true/false/OperationStatus)가 List에 저장됩니다.
  • 특정 노드에 보낸 응답에서 에러가 발생한 경우(cancel 포함), 해당 노드로 보냈던 모든 요청에 대해 null이 List에 저장됩니다. 각 노드로 요청을 보내는 메서드에서 ArcusFutureImpl를 반환하기 때문에, exceptionally complete된 경우 결과 값을 확인할 수 없기 때문이었습니다.

PipelineResult

  • 특정 노드에 보낸 응답에서 에러가 발생하지 않았다면, 실패 응답 결과(OperationStatus)가 status Map에 저장됩니다.
  • 특정 노드에 보낸 응답에서 에러가 발생한 경우(cancel 포함), 에러 내부에 담아둔 <실패 응답 결과와 에러가 발생한 위치>를 토대로 실패 응답 결과와 NOT_EXECUTED 상태를 status Map에 저장하고, 에러 자체도 throwable Map에 저장합니다.

Copy link
Collaborator

@jhpark816 jhpark816 Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

응답 결과를 true, false가 아닌 모두 OperationStatus로 리턴하는 것은 어떤지?
따라서, List<OperationStatus> 리턴하는 것은 어떤가요?
오류로 수행되지 않았거나 수행 결과를 알 수 없으면, null 저장하고요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

제안해주신 방법이 더 단순하니 괜찮아보입니다.

});
}

private Map<MemcachedNode, Map.Entry<List<KeyedOperation>, List<Integer>>> getOpsAndIndexesByNode(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 메소드와 getFutureToOriginIndexes()에서 사용하는 자료구조들이 복잡한 것 같습니다.
각 Entry의 Key와 Value를 임시 변수에 담아서 사용하고, 어떤 역할을 하는 변수인지 알아볼 수 있으면 좋겠습니다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants