handle split request

This commit is contained in:
ruti 2024-05-11 17:23:34 +03:00
parent e3be71ea46
commit 6555d60d6b
4 changed files with 76 additions and 59 deletions

View File

@ -26,7 +26,7 @@
#endif
#ifndef POLLRDHUP
#define POLLRDHUP POLLHUP
#define POLLRDHUP 0
#endif
enum eid {

112
extend.c
View File

@ -142,6 +142,7 @@ int reconnect(struct poolhd *pool, struct eval *val, int m)
client->type = EV_IGNORE;
client->attempt = m;
client->cache = 1;
client->buff.offset = 0;
return 0;
}
@ -240,12 +241,23 @@ int on_response(struct poolhd *pool, struct eval *val,
}
static inline void to_tunnel(struct eval *client)
{
client->pair->type = EV_TUNNEL;
client->type = EV_TUNNEL;
assert(client->buff.data);
free(client->buff.data);
client->buff.data = 0;
client->buff.size = 0;
client->buff.offset = 0;
}
int on_tunnel_check(struct poolhd *pool, struct eval *val,
char *buffer, size_t bfsize, int out)
{
if (out) {
return on_tunnel(pool, val, buffer, bfsize, out);
}
assert(!out);
ssize_t n = recv(val->fd, buffer, bfsize, 0);
if (n < 1) {
if (n) uniperror("recv");
@ -270,13 +282,7 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val,
uniperror("send");
return -1;
}
val->type = EV_TUNNEL;
pair->type = EV_TUNNEL;
assert(pair->buff.data);
free(pair->buff.data);
pair->buff.data = 0;
pair->buff.size = 0;
to_tunnel(pair);
if (params.timeout &&
set_timeout(val->fd, 0)) {
@ -297,8 +303,8 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val,
}
int on_desync(struct poolhd *pool, struct eval *val,
char *buffer, size_t bfsize)
int on_desync_again(struct poolhd *pool,
struct eval *val, char *buffer, size_t bfsize)
{
if (val->flag == FLAG_CONN) {
if (mod_etype(pool, val, POLLIN)) {
@ -307,26 +313,62 @@ int on_desync(struct poolhd *pool, struct eval *val,
}
val = val->pair;
}
ssize_t n = 0;
int m = val->attempt;
LOG((m ? LOG_S : LOG_L), "desync params index: %d\n", m);
if (!val->buff.data) {
n = recv(val->fd, buffer, bfsize, 0);
ssize_t n = val->buff.size;
assert(n > 0 && n <= params.bfsize);
memcpy(buffer, val->buff.data, n);
if (params.timeout &&
set_timeout(val->pair->fd, params.timeout)) {
return -1;
}
ssize_t sn = desync(val->pair->fd, buffer, bfsize, n,
val->buff.offset, (struct sockaddr *)&val->pair->in6, m);
if (sn < 0) {
return -1;
}
val->buff.offset += sn;
if (sn < n) {
if (mod_etype(pool, val->pair, POLLOUT)) {
uniperror("mod_etype");
return -1;
}
val->pair->type = EV_DESYNC;
return 0;
}
val->pair->type = EV_PRE_TUNNEL;
return 0;
}
int on_desync(struct poolhd *pool, struct eval *val,
char *buffer, size_t bfsize, int out)
{
if (out) {
return on_desync_again(pool, val, buffer, bfsize);
}
if (val->buff.size == bfsize) {
to_tunnel(val);
return 0;
}
ssize_t n = recv(val->fd, buffer, bfsize - val->buff.size, 0);
if (n <= 0) {
if (n) uniperror("recv data");
return -1;
}
val->buff.size = n;
val->buff.size += n;
val->recv_count += n;
assert(val->buff.offset == 0);
if (!(val->buff.data = malloc(n))) {
uniperror("malloc");
val->buff.data = realloc(val->buff.data, val->buff.size);
if (val->buff.data == 0) {
uniperror("realloc");
return -1;
}
memcpy(val->buff.data, buffer, n);
memcpy(val->buff.data + val->buff.offset, buffer, n);
int m = val->attempt;
if (!m) for (; m < params.dp_count; m++) {
struct desync_params *dp = &params.dp[m];
if (!dp->detect &&
@ -340,37 +382,11 @@ int on_desync(struct poolhd *pool, struct eval *val,
}
val->attempt = m;
if (params.late_conn) {
if (params.late_conn && val->recv_count == n) {
return ext_connect(pool, val,
(struct sockaddr_ina *)&val->in6, EV_DESYNC, m);
}
}
else {
n = val->buff.size;
assert(n > 0 && n <= params.bfsize);
memcpy(buffer, val->buff.data, n);
}
if (params.timeout &&
set_timeout(val->pair->fd, params.timeout)) {
return -1;
}
ssize_t sn = desync(val->pair->fd, buffer, bfsize,
n, val->buff.offset, (struct sockaddr *)&val->pair->in6, m);
if (sn < 0) {
return -1;
}
if (sn < n) {
val->buff.offset = sn;
if (mod_etype(pool, val->pair, POLLOUT)) {
uniperror("mod_etype");
return -1;
}
val->pair->type = EV_DESYNC;
return 0;
}
val->type = EV_TUNNEL;
val->pair->type = EV_PRE_TUNNEL;
return 0;
return on_desync_again(pool, val, buffer, bfsize);
}
#ifdef __linux__

View File

@ -5,7 +5,7 @@ int on_tunnel_check(struct poolhd *pool, struct eval *val,
char *buffer, size_t bfsize, int out);
int on_desync(struct poolhd *pool, struct eval *val,
char *buffer, size_t bfsize);
char *buffer, size_t bfsize, int out);
#ifdef __linux__
int protect(int conn_fd, const char *path);

View File

@ -849,7 +849,8 @@ int event_loop(int srvfd)
continue;
case EV_DESYNC:
if (on_desync(pool, val, buffer, bfsize))
if (on_desync(pool, val,
buffer, bfsize, etype & POLLOUT))
del_event(pool, val);
continue;