2017-02-14 2 views
2

항목이 대기열에있을 때 트리거되는 Azure에 Function app가 있습니다. 다음과 같이 표시됩니다 (크게 단순화).Azure에서 동시 작업 수 제한 queue

public static async Task Run(string myQueueItem, TraceWriter log) 
{ 
    using (var client = new HttpClient()) 
    { 
     client.BaseAddress = new Uri(Config.APIUri); 
     client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); 

     StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json"); 
     HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent); 
     response.EnsureSuccessStatusCode(); 

     string json = await response.Content.ReadAsStringAsync(); 
     ApiResponse apiResponse = JsonConvert.DeserializeObject<ApiResponse>(json); 

     log.Info($"Activity data successfully sent to platform in {apiResponse.elapsed}ms. Tracking number: {apiResponse.tracking}"); 
    } 
} 

이 모든 것이 훌륭하게 작동합니다. 항목이 대기열에 놓일 때마다 우리는 우리 측의 일부 API로 데이터를 보내고 응답을 기록합니다. 시원한.

"대기열 메시지를 생성하는 것"에 큰 영향이있을 때 문제가 발생하며 많은 항목이 한 번에 대기열에 저장됩니다. 이것은 분당 약 1,000 ~ 1,500 개의 항목에서 발생하는 경향이 있습니다. Functions.SendToLimeade : 기능을 실행하는 동안 예외 :

2017-02-14T01 : 45 : 31.692 mscorlib에 오류 로그는 다음과 같이해야합니다. f-SendToLimeade __- 1078179529 : 요청을 보내는 동안 오류 이 발생했습니다. 시스템 : 원격 서버에 연결할 수 없습니다. 시스템 : 각 소켓 주소 (프로토콜/네트워크 주소/포트)의 하나의 사용 만 정상적으로 허용됩니다. 123.123.123.123:443.

처음에는 로컬 소켓이 부족한 Azure Function 앱에 문제가 있다고 생각했습니다 (illustrated here). 그러나 그때 나는 IP 주소를 알아 차렸다. IP 주소 123.123.123.123 (물론이 예의 경우 변경됨)은 HttpClient가 게시하는 IP 주소입니다. 그래서, 지금은 그들이 우리의 서버가 이러한 요청을 처리하기 위해 실행되는 궁금 해서요.

어느 쪽이든, 우리는 여기서 스케일링 문제가 발생합니다. 나는 그것을 해결하는 최선의 방법을 찾아 내려고 노력하고있다.

몇 가지 아이디어 :

  1. 는 로컬 소켓 제한의 경우, article aboveReq.ServicePoint.BindIPEndPointDelegate를 사용하여 로컬 포트 ​​범위를 증가의 예를 가지고있다. 이것은 유망한 것처럼 보이지만, 당신이 할 때 당신은 무엇을합니까 진정한 규모가 필요합니까? 나는이 문제가 2 년 후에 돌아 오기를 원하지 않는다.
  2. 원격 제한 사항 인 경우 함수 런타임에서 한 번에 처리 할 메시지 수를 제어 할 수있는 것처럼 보입니다. 여기에 흥미로운 기사가 ​​있습니다. serviceBus.maxConcurrentCalls을 1로 설정하면 한 번에 하나의 메시지 만 처리됩니다. 어쩌면 이것을 상대적으로 낮은 숫자로 설정할 수 있습니다. 이제는 대기열이 처리 할 수있는 것보다 빠르게 채워질 수 있지만, 그 시점에서 우리는 서버를 추가 할 것입니다.
  3. 여러 개의 Azure 함수 apps? 둘 이상의 Azure 함수가 있고 모두 같은 큐에서 트리거하면 어떻게됩니까? Azure는 기능 앱간에 작업을 분배 할만큼 똑똑하고 대기열을 처리하는 군대를 가질 수 있습니다. 필요에 따라 확장 또는 축소 할 수 있습니까?
  4. 나는 또한 keep-alives를 발견했습니다. 큐 메시지가 넘쳐 흐르고있는 상황에서 어떻게 든 소켓을 열어 둘 수 있다면 아마도 도움이 될 것입니다. 이게 가능 한가요?이 일을 어떻게 할 것인지에 대한 조언이 있습니까?

이러한 유형의 시스템에 권장되는 (확장 가능한!) 디자인에 대한 통찰력은 대단히 감사하겠습니다!

답변

1

이 문제에 대한 해결책을 찾은 것 같습니다. 나는 지난번이 변경 사항을 3 시간 6 시간 동안 실행했습니다. 소켓 오류는 없었습니다. 전에 30 분마다 큰 오류로 이러한 오류가 발생합니다.

먼저, 새로운 클래스를 추가하여 HttpClient를 관리했습니다.

public static class Connection 
{ 
    public static HttpClient Client { get; private set; } 

    static Connection() 
    { 
     Client = new HttpClient(); 

     Client.BaseAddress = new Uri(Config.APIUri); 
     Client.DefaultRequestHeaders.Add("Connection", "Keep-Alive"); 
     Client.DefaultRequestHeaders.Add("Keep-Alive", "timeout=600"); 
     Client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); 
    } 
} 

이제는 함수를 호출 할 때마다 사용하는 정적 인스턴스 인 HttpClient이 있습니다. 내 연구에서 가능한 한 오랫동안 HttpClient 인스턴스를 유지하는 것이 좋습니다. 모든 것이 스레드 안전이며 HttpClient는 요청을 큐에 넣고 동일한 호스트에 요청을 최적화합니다. 알림 Keep-Alive 헤더도 설정합니다 (기본값이라고 생각하지만 암시적인 것으로 생각했습니다). 내 기능에서

, 난 그냥 같은 정적 HttpClient를 예를 잡아 : 정말 소켓 수준에서 무슨 일이 일어나고 있는지의 심층 분석을 수행하지 않은

var client = Connection.Client; 
StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json"); 
HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent); 
response.EnsureSuccessStatusCode(); 

(나는 물어해야 우리 로드 밸런서에서이 트래픽을 볼 수 있다면 IT 담당자가 될 수 있습니다.)하지만 단일 소켓을 서버에 열어두고 대기열 항목이 처리 될 때 많은 HTTP 호출을 생성하기를 바랍니다. 어쨌든, 그게 뭐든지간에 효과가있는 것 같습니다. 어쩌면 누군가 개선 할 생각을 가지고 있을지도 모른다.

1

전용 웹 응용 프로그램에서 기능 대신 소비 계획을 사용하면 3 번 정도 더 많은 기능을 사용할 수 있습니다. 함수는 큰 메시지 대기열을 감지하고 대기열 길이가 안정화 될 때까지 인스턴스를 추가합니다.

maxConcurrentCalls은 인스턴스별로 적용되므로 인스턴스 별 동시성을 제한 할 수 있습니다. 기본적으로 처리율은 입니다.

글로벌 처리량을 제어하는 ​​유일한 방법은 선택한 크기의 전용 웹 응용 프로그램에서 기능을 사용하는 것입니다. 각 응용 프로그램은 대기열을 폴링하고 필요에 따라 작업을 가져옵니다.

최상의 확장 솔루션은 123.123.123.123의로드 밸런싱을 개선하여 대기열 압력에 맞게 기능 확장/축소 기능의 모든 요청을 처리 할 수 ​​있습니다.

지속 연결 afaik는 영구 연결에 유용하지만 함수 실행은 영구 연결로 간주되지 않습니다. 미래에 우리는 함수에 '자신 만의 바인딩'을 추가하려고합니다. 원하는 경우 연결 풀링을 구현할 수 있습니다.

+0

는 지금, 그것은 보인다. 따라서 새로운 기능 앱을 추가하는 부분은 필요하지 않습니다. 그것은 이미 이것을합니다. 소켓 오류 메시지가 Azure의 로컬 인스턴스에 소켓이 없거나 여기에있는 서버에 소켓이 없다는 것이 확실하지 않은지 나는 여전히 불분명합니다. –

+0

내 대답도 아래에 추가되었습니다. 이것은 지금까지 우리를 위해 일하는 것 같습니다! –

+0

이 비슷한 질문/답변도 관심의 대상 일 수 있습니다. https://stackoverflow.com/questions/40094041/throttling-azure-storage-queue-processing-in-azure-function-app – Alex

0

나는 오래 전에 대답 한 질문을 알고 있지만, 마이크로 소프트는 당신이 사용하고 있던 반 패턴을 문서화했다. 이미 스케일링을 처리하기 위해 (새로운 유물에 따라) 네 개의 서로 다른 호스트를 사용하는 것처럼

Improper Instantiation antipattern