Another event type for transparent reconnect mode

This commit is contained in:
ruti 2024-10-19 15:37:02 +03:00
parent 225ccc8319
commit eb95748269
4 changed files with 142 additions and 93 deletions

View File

@ -38,7 +38,8 @@ enum eid {
EV_CONNECT, EV_CONNECT,
EV_IGNORE, EV_IGNORE,
EV_TUNNEL, EV_TUNNEL,
EV_UDP_TUNNEL EV_UDP_TUNNEL,
EV_FIRST_TUNNEL
}; };
#define FLAG_S4 1 #define FLAG_S4 1
@ -52,7 +53,8 @@ char *eid_name[] = {
"EV_CONNECT", "EV_CONNECT",
"EV_IGNORE", "EV_IGNORE",
"EV_TUNNEL", "EV_TUNNEL",
"EV_UDP_TUNNEL" "EV_UDP_TUNNEL",
"EV_FIRST_TUNNEL"
}; };
#endif #endif
@ -60,7 +62,6 @@ struct buffer {
size_t size; size_t size;
unsigned int offset; unsigned int offset;
char *data; char *data;
bool locked;
}; };
struct eval { struct eval {

166
extend.c
View File

@ -156,7 +156,7 @@ static int reconnect(struct poolhd *pool, struct eval *val, int m)
struct eval *client = val->pair; struct eval *client = val->pair;
if (create_conn(pool, client, if (create_conn(pool, client,
(struct sockaddr_ina *)&val->in6, EV_TUNNEL)) { (struct sockaddr_ina *)&val->in6, EV_FIRST_TUNNEL)) {
return -1; return -1;
} }
val->pair = 0; val->pair = 0;
@ -217,7 +217,8 @@ static int on_trigger(int type, struct poolhd *pool, struct eval *val)
int m = val->pair->attempt + 1; int m = val->pair->attempt + 1;
bool can_reconn = ( bool can_reconn = (
val->pair->buff.locked && !val->recv_count val->pair->buff.data && !val->recv_count
&& params.auto_level > AUTO_NOBUFF
); );
if (!can_reconn && params.auto_level <= AUTO_NOSAVE) { if (!can_reconn && params.auto_level <= AUTO_NOSAVE) {
return -1; return -1;
@ -303,13 +304,16 @@ static int on_response(struct poolhd *pool, struct eval *val,
static inline void free_first_req(struct eval *client) static inline void free_first_req(struct eval *client)
{ {
client->type = EV_TUNNEL;
client->pair->type = EV_TUNNEL;
assert(client->buff.data); assert(client->buff.data);
free(client->buff.data); free(client->buff.data);
memset(&client->buff, 0, sizeof(client->buff)); memset(&client->buff, 0, sizeof(client->buff));
} }
static ssize_t on_first_send(struct eval *client, char *buffer, ssize_t n, ssize_t bfsize) static int setup_conn(struct eval *client, char *buffer, ssize_t n)
{ {
int m = client->attempt; int m = client->attempt;
@ -335,16 +339,99 @@ static ssize_t on_first_send(struct eval *client, char *buffer, ssize_t n, ssize
&& set_timeout(client->pair->fd, params.timeout)) { && set_timeout(client->pair->fd, params.timeout)) {
return -1; return -1;
} }
if (client->buff.locked) { return 0;
assert(bfsize >= client->buff.size);
memcpy(buffer, client->buff.data, client->buff.size);
return client->buff.size;
}
return n;
} }
ssize_t tcp_send_hook(struct poolhd *pool, struct eval *remote, static int cancel_setup(struct eval *remote)
{
if (params.timeout && params.auto_level <= AUTO_NOSAVE &&
set_timeout(remote->fd, 0)) {
return -1;
}
if (post_desync(remote->fd, remote->pair->attempt)) {
return -1;
}
return 0;
}
int send_saved_req(struct poolhd *pool,
struct eval *client, char *buffer, ssize_t bfsize)
{
ssize_t offset = client->buff.offset;
ssize_t n = client->buff.size - offset;
assert(bfsize >= n);
memcpy(buffer, client->buff.data + offset, n);
ssize_t sn = tcp_send_hook(client->pair, buffer, bfsize, n);
if (sn < 0) {
return -1;
}
client->buff.offset += sn;
if (sn < n) {
if (mod_etype(pool, client->pair, POLLOUT) ||
mod_etype(pool, client, 0)) {
uniperror("mod_etype");
return -1;
}
}
return 0;
}
int on_first_tunnel(struct poolhd *pool,
struct eval *val, char *buffer, ssize_t bfsize, int etype)
{
if ((etype & POLLOUT) && val->flag == FLAG_CONN) {
if (mod_etype(pool, val, POLLIN) ||
mod_etype(pool, val->pair, POLLIN)) {
uniperror("mod_etype");
return -1;
}
return send_saved_req(pool, val->pair, buffer, bfsize);
}
ssize_t n = tcp_recv_hook(pool, val, buffer, bfsize);
if (n < 1) {
return n;
}
if (val->flag != FLAG_CONN) {
val->buff.size += n;
if (val->buff.size >= bfsize) {
free_first_req(val);
}
else {
val->buff.data = realloc(val->buff.data, val->buff.size);
if (val->buff.data == 0) {
uniperror("realloc");
return -1;
}
memcpy(val->buff.data + val->buff.size - n, buffer, n);
return send_saved_req(pool, val, buffer, bfsize);
}
}
else {
if (on_response(pool, val, buffer, n) == 0) {
return 0;
}
free_first_req(val->pair);
int m = val->pair->attempt;
if (val->pair->cache &&
cache_add((struct sockaddr_ina *)&val->in6, m) < 0) {
return -1;
}
}
if (tcp_send_hook(val->pair, buffer, bfsize, n) < n) {
return -1;
}
return 0;
}
ssize_t tcp_send_hook(struct eval *remote,
char *buffer, size_t bfsize, ssize_t n) char *buffer, size_t bfsize, ssize_t n)
{ {
ssize_t sn = -1; ssize_t sn = -1;
@ -355,15 +442,14 @@ ssize_t tcp_send_hook(struct poolhd *pool, struct eval *remote,
if (sn < 0 && get_e() == EAGAIN) { if (sn < 0 && get_e() == EAGAIN) {
return 0; return 0;
} }
remote->pair->round_sent += sn;
return sn; return sn;
} }
struct eval *client = remote->pair; struct eval *client = remote->pair;
if (client->recv_count == n) { if (client->recv_count == n
n = on_first_send(client, buffer, n, bfsize); && setup_conn(client, buffer, n) < 0) {
if (n < 0) { return -1;
return -1;
}
} }
int m = client->attempt; int m = client->attempt;
LOG((m ? LOG_S : LOG_L), "desync params index: %d\n", m); LOG((m ? LOG_S : LOG_L), "desync params index: %d\n", m);
@ -373,6 +459,8 @@ ssize_t tcp_send_hook(struct poolhd *pool, struct eval *remote,
sn = desync(remote->fd, buffer, bfsize, n, sn = desync(remote->fd, buffer, bfsize, n,
offset, (struct sockaddr *)&remote->in6, m); offset, (struct sockaddr *)&remote->in6, m);
remote->pair->round_sent += sn;
return sn; return sn;
} }
@ -403,47 +491,15 @@ ssize_t tcp_recv_hook(struct poolhd *pool, struct eval *val,
} }
return -1; return -1;
} }
val->recv_count += n;
if (val->flag != FLAG_CONN if (val->round_sent == 0) {
&& params.auto_level > AUTO_NOBUFF val->round_count++;
&& val->pair->recv_count == 0) val->pair->round_sent = 0;
{
val->buff.size += n;
if (val->buff.size >= bfsize) {
free_first_req(val);
return n;
}
val->buff.locked = 1;
val->buff.data = realloc(val->buff.data, val->buff.size);
if (val->buff.data == 0) {
uniperror("realloc");
return -1;
}
memcpy(val->buff.data + val->buff.size - n, buffer, n);
} }
else if (val->recv_count == 0 && val->flag == FLAG_CONN) if (val->flag == FLAG_CONN
{ && val->round_count >= params.repeats
if (val->pair->buff.locked) { && cancel_setup(val)) {
if (on_response(pool, val, buffer, n) == 0) { return -1;
return 0;
}
free_first_req(val->pair);
}
if (params.timeout && params.auto_level <= AUTO_NOSAVE &&
set_timeout(val->fd, 0)) {
return -1;
}
int m = val->pair->attempt;
if (post_desync(val->fd, m)) {
return -1;
}
if (val->pair->cache &&
cache_add((struct sockaddr_ina *)&val->in6, m) < 0) {
return -1;
}
} }
return n; return n;
} }

View File

@ -10,7 +10,7 @@ int socket_mod(int fd, struct sockaddr *dst);
int connect_hook(struct poolhd *pool, struct eval *val, int connect_hook(struct poolhd *pool, struct eval *val,
struct sockaddr_ina *dst, int next); struct sockaddr_ina *dst, int next);
ssize_t tcp_send_hook(struct poolhd *pool, struct eval *val, ssize_t tcp_send_hook(struct eval *val,
char *buffer, size_t bfsize, ssize_t n); char *buffer, size_t bfsize, ssize_t n);
ssize_t tcp_recv_hook(struct poolhd *pool, struct eval *val, ssize_t tcp_recv_hook(struct poolhd *pool, struct eval *val,
@ -19,6 +19,9 @@ ssize_t tcp_recv_hook(struct poolhd *pool, struct eval *val,
ssize_t udp_hook(struct eval *val, ssize_t udp_hook(struct eval *val,
char *buffer, size_t bfsize, ssize_t n, struct sockaddr_ina *dst); char *buffer, size_t bfsize, ssize_t n, struct sockaddr_ina *dst);
int on_first_tunnel(struct poolhd *pool,
struct eval *val, char *buffer, ssize_t bfsize, int etype);
#ifdef __linux__ #ifdef __linux__
int protect(int conn_fd, const char *path); int protect(int conn_fd, const char *path);
#else #else

57
proxy.c
View File

@ -641,31 +641,27 @@ int on_tunnel(struct poolhd *pool, struct eval *val,
LOG(LOG_S, "pollout (fd=%d)\n", val->fd); LOG(LOG_S, "pollout (fd=%d)\n", val->fd);
val = pair; val = pair;
pair = val->pair; pair = val->pair;
assert(val->buff.data); }
if (val->buff.data) {
if (etype & POLLHUP) { if (etype & POLLHUP) {
return -1; return -1;
} }
n = val->buff.size - val->buff.offset; n = val->buff.size - val->buff.offset;
ssize_t sn = tcp_send_hook(pool, pair, ssize_t sn = tcp_send_hook(pair,
val->buff.data + val->buff.offset, n, n); val->buff.data + val->buff.offset, n, n);
if (sn < 0) { if (sn < 0) {
uniperror("send"); uniperror("send");
return -1; return -1;
} }
val->round_sent += sn;
if (sn < n) { if (sn < n) {
val->buff.offset += sn; val->buff.offset += sn;
return 0; return 0;
} }
if (!val->buff.locked) { free(val->buff.data);
free(val->buff.data); val->buff.data = 0;
val->buff.data = 0; val->buff.size = 0;
val->buff.size = 0; val->buff.offset = 0;
val->buff.offset = 0;
}
if (mod_etype(pool, val, POLLIN) || if (mod_etype(pool, val, POLLIN) ||
mod_etype(pool, pair, POLLIN)) { mod_etype(pool, pair, POLLIN)) {
@ -673,9 +669,6 @@ int on_tunnel(struct poolhd *pool, struct eval *val,
return -1; return -1;
} }
} }
if (val->buff.data && !val->buff.locked) {
return 0;
}
do { do {
n = tcp_recv_hook(pool, val, buffer, bfsize); n = tcp_recv_hook(pool, val, buffer, bfsize);
//if (n < 0 && get_e() == EAGAIN) { //if (n < 0 && get_e() == EAGAIN) {
@ -685,32 +678,21 @@ int on_tunnel(struct poolhd *pool, struct eval *val,
if (n < 0) { if (n < 0) {
return -1; return -1;
} }
val->recv_count += n; ssize_t sn = tcp_send_hook(pair, buffer, bfsize, n);
if (val->round_sent == 0) {
val->round_count++;
pair->round_sent = 0;
}
ssize_t sn = tcp_send_hook(pool, pair, buffer, bfsize, n);
if (sn < 0) { if (sn < 0) {
uniperror("send"); uniperror("send");
return -1; return -1;
} }
val->round_sent += sn;
if (sn < n) { if (sn < n) {
LOG(LOG_S, "send: %zd != %zd (fd: %d)\n", sn, n, pair->fd); LOG(LOG_S, "send: %zd != %zd (fd: %d)\n", sn, n, pair->fd);
assert(val->buff.locked assert(!(val->buff.size || val->buff.offset));
|| !(val->buff.size || val->buff.offset));
if (!val->buff.locked) { val->buff.size = n - sn;
val->buff.size = n - sn; if (!(val->buff.data = malloc(n - sn))) {
if (!(val->buff.data = malloc(n - sn))) { uniperror("malloc");
uniperror("malloc"); return -1;
return -1;
}
memcpy(val->buff.data, buffer + sn, n - sn);
} }
else val->buff.offset += sn; memcpy(val->buff.data, buffer + sn, n - sn);
if (mod_etype(pool, val, 0) || if (mod_etype(pool, val, 0) ||
mod_etype(pool, pair, POLLOUT)) { mod_etype(pool, pair, POLLOUT)) {
@ -904,8 +886,10 @@ static inline int on_connect(struct poolhd *pool, struct eval *val, int e)
uniperror("mod_etype"); uniperror("mod_etype");
return -1; return -1;
} }
val->type = EV_TUNNEL; int t = params.auto_level <= AUTO_NOBUFF
val->pair->type = EV_TUNNEL; ? EV_TUNNEL : EV_FIRST_TUNNEL;
val->type = t;
val->pair->type = t;
} }
if (resp_error(val->pair->fd, if (resp_error(val->pair->fd,
error, val->pair->flag) < 0) { error, val->pair->flag) < 0) {
@ -975,6 +959,11 @@ int event_loop(int srvfd)
close_conn(pool, val); close_conn(pool, val);
continue; continue;
case EV_FIRST_TUNNEL:
if (on_first_tunnel(pool, val, buffer, bfsize, etype))
close_conn(pool, val);
continue;
case EV_TUNNEL: case EV_TUNNEL:
if (on_tunnel(pool, val, buffer, bfsize, etype)) if (on_tunnel(pool, val, buffer, bfsize, etype))
close_conn(pool, val); close_conn(pool, val);