From 6555d60d6b17f2d3ac0a332132748064c57198a9 Mon Sep 17 00:00:00 2001 From: ruti <> Date: Sat, 11 May 2024 17:23:34 +0300 Subject: [PATCH] handle split request --- conev.h | 2 +- extend.c | 128 +++++++++++++++++++++++++++++++------------------------ extend.h | 2 +- proxy.c | 3 +- 4 files changed, 76 insertions(+), 59 deletions(-) diff --git a/conev.h b/conev.h index 73b8238..6df99a1 100644 --- a/conev.h +++ b/conev.h @@ -26,7 +26,7 @@ #endif #ifndef POLLRDHUP - #define POLLRDHUP POLLHUP + #define POLLRDHUP 0 #endif enum eid { diff --git a/extend.c b/extend.c index 46dc95f..38266dd 100644 --- a/extend.c +++ b/extend.c @@ -142,6 +142,7 @@ int reconnect(struct poolhd *pool, struct eval *val, int m) client->type = EV_IGNORE; client->attempt = m; client->cache = 1; + client->buff.offset = 0; return 0; } @@ -240,12 +241,23 @@ int on_response(struct poolhd *pool, struct eval *val, } +static inline void to_tunnel(struct eval *client) +{ + client->pair->type = EV_TUNNEL; + client->type = EV_TUNNEL; + + assert(client->buff.data); + free(client->buff.data); + client->buff.data = 0; + client->buff.size = 0; + client->buff.offset = 0; +} + + int on_tunnel_check(struct poolhd *pool, struct eval *val, char *buffer, size_t bfsize, int out) { - if (out) { - return on_tunnel(pool, val, buffer, bfsize, out); - } + assert(!out); ssize_t n = recv(val->fd, buffer, bfsize, 0); if (n < 1) { if (n) uniperror("recv"); @@ -270,13 +282,7 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val, uniperror("send"); return -1; } - val->type = EV_TUNNEL; - pair->type = EV_TUNNEL; - - assert(pair->buff.data); - free(pair->buff.data); - pair->buff.data = 0; - pair->buff.size = 0; + to_tunnel(pair); if (params.timeout && set_timeout(val->fd, 0)) { @@ -297,8 +303,8 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val, } -int on_desync(struct poolhd *pool, struct eval *val, - char *buffer, size_t bfsize) +int on_desync_again(struct poolhd *pool, + struct eval *val, char *buffer, size_t bfsize) { if (val->flag == FLAG_CONN) { if (mod_etype(pool, val, POLLIN)) { @@ -307,60 +313,24 @@ int on_desync(struct poolhd *pool, struct eval *val, } val = val->pair; } - ssize_t n = 0; int m = val->attempt; LOG((m ? LOG_S : LOG_L), "desync params index: %d\n", m); - if (!val->buff.data) { - n = recv(val->fd, buffer, bfsize, 0); - if (n <= 0) { - if (n) uniperror("recv data"); - return -1; - } - val->buff.size = n; - val->recv_count += n; - assert(val->buff.offset == 0); - - if (!(val->buff.data = malloc(n))) { - uniperror("malloc"); - return -1; - } - memcpy(val->buff.data, buffer, n); - - if (!m) for (; m < params.dp_count; m++) { - struct desync_params *dp = ¶ms.dp[m]; - if (!dp->detect && - (!dp->hosts || check_host(dp->hosts, val)) && - (!dp->proto || check_proto_tcp(dp->proto, val))) { - break; - } - } - if (m >= params.dp_count) { - return -1; - } - val->attempt = m; - - if (params.late_conn) { - return ext_connect(pool, val, - (struct sockaddr_ina *)&val->in6, EV_DESYNC, m); - } - } - else { - n = val->buff.size; - assert(n > 0 && n <= params.bfsize); - memcpy(buffer, val->buff.data, n); - } + ssize_t n = val->buff.size; + assert(n > 0 && n <= params.bfsize); + memcpy(buffer, val->buff.data, n); + if (params.timeout && set_timeout(val->pair->fd, params.timeout)) { return -1; } - ssize_t sn = desync(val->pair->fd, buffer, bfsize, - n, val->buff.offset, (struct sockaddr *)&val->pair->in6, m); + ssize_t sn = desync(val->pair->fd, buffer, bfsize, n, + val->buff.offset, (struct sockaddr *)&val->pair->in6, m); if (sn < 0) { return -1; } + val->buff.offset += sn; if (sn < n) { - val->buff.offset = sn; if (mod_etype(pool, val->pair, POLLOUT)) { uniperror("mod_etype"); return -1; @@ -368,11 +338,57 @@ int on_desync(struct poolhd *pool, struct eval *val, val->pair->type = EV_DESYNC; return 0; } - val->type = EV_TUNNEL; val->pair->type = EV_PRE_TUNNEL; return 0; } + +int on_desync(struct poolhd *pool, struct eval *val, + char *buffer, size_t bfsize, int out) +{ + if (out) { + return on_desync_again(pool, val, buffer, bfsize); + } + if (val->buff.size == bfsize) { + to_tunnel(val); + return 0; + } + ssize_t n = recv(val->fd, buffer, bfsize - val->buff.size, 0); + if (n <= 0) { + if (n) uniperror("recv data"); + return -1; + } + val->buff.size += n; + val->recv_count += n; + + val->buff.data = realloc(val->buff.data, val->buff.size); + if (val->buff.data == 0) { + uniperror("realloc"); + return -1; + } + memcpy(val->buff.data + val->buff.offset, buffer, n); + + int m = val->attempt; + if (!m) for (; m < params.dp_count; m++) { + struct desync_params *dp = ¶ms.dp[m]; + if (!dp->detect && + (!dp->hosts || check_host(dp->hosts, val)) && + (!dp->proto || check_proto_tcp(dp->proto, val))) { + break; + } + } + if (m >= params.dp_count) { + return -1; + } + val->attempt = m; + + if (params.late_conn && val->recv_count == n) { + return ext_connect(pool, val, + (struct sockaddr_ina *)&val->in6, EV_DESYNC, m); + } + return on_desync_again(pool, val, buffer, bfsize); +} + #ifdef __linux__ int protect(int conn_fd, const char *path) { diff --git a/extend.h b/extend.h index ef72bbf..245f31b 100644 --- a/extend.h +++ b/extend.h @@ -5,7 +5,7 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val, char *buffer, size_t bfsize, int out); int on_desync(struct poolhd *pool, struct eval *val, - char *buffer, size_t bfsize); + char *buffer, size_t bfsize, int out); #ifdef __linux__ int protect(int conn_fd, const char *path); diff --git a/proxy.c b/proxy.c index 5e252fe..44a44f6 100644 --- a/proxy.c +++ b/proxy.c @@ -849,7 +849,8 @@ int event_loop(int srvfd) continue; case EV_DESYNC: - if (on_desync(pool, val, buffer, bfsize)) + if (on_desync(pool, val, + buffer, bfsize, etype & POLLOUT)) del_event(pool, val); continue;