From 460c727deae294bda698d67939082be8d4131b69 Mon Sep 17 00:00:00 2001 From: Vadim Vetrov Date: Tue, 30 Jul 2024 15:01:58 +0300 Subject: [PATCH] Support for multithreading. Cleanup code --- youtubeUnblock.c | 275 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 188 insertions(+), 87 deletions(-) diff --git a/youtubeUnblock.c b/youtubeUnblock.c index 5c258fa..8cb88e3 100644 --- a/youtubeUnblock.c +++ b/youtubeUnblock.c @@ -20,6 +20,8 @@ #include #include +#include + #ifndef NOUSE_GSO #define USE_GSO #endif @@ -29,15 +31,28 @@ #endif #define RAWSOCKET_MARK 0xfc70 +#define MAX_THREADS 16 + +#ifndef THREADS_NUM +#define THREADS_NUM 1 +#endif + +#if THREADS_NUM > MAX_THREADS +#error "Too much threads" +#endif + +#ifndef __linux__ +#error "The package is linux only!" +#endif static struct { - uint32_t queue_num; - struct mnl_socket *nl; - uint32_t portid; + uint32_t queue_start_num; int rawsocket; - + pthread_mutex_t rawsocket_lock; + int threads; } config = { - .rawsocket = -2 + .rawsocket = -2, + .threads=THREADS_NUM }; static int parse_args(int argc, const char *argv[]) { @@ -52,7 +67,7 @@ static int parse_args(int argc, const char *argv[]) { uint32_t queue_num = strtoul(argv[1], &end, 10); if (errno != 0 || *end != '\0') goto errormsg_help; - config.queue_num = queue_num; + config.queue_start_num = queue_num; return 0; errormsg_help: @@ -64,13 +79,7 @@ errormsg_help: return -1; } -static int open_socket(void) { - if (config.nl != NULL) { - errno = EALREADY; - perror("socket is already opened"); - return -1; - } - +static int open_socket(struct mnl_socket **_nl) { struct mnl_socket *nl = NULL; nl = mnl_socket_open(NETLINK_NETFILTER); @@ -85,21 +94,21 @@ static int open_socket(void) { return -1; } - config.nl = nl; - config.portid = mnl_socket_get_portid(nl); + *_nl = nl; return 0; } -static int close_socket(void) { - if (config.nl == NULL) return 1; - if (mnl_socket_close(config.nl) < 0) { +static int close_socket(struct mnl_socket **_nl) { + struct mnl_socket *nl = *_nl; + if (nl == NULL) return 1; + if (mnl_socket_close(nl) < 0) { perror("mnl_socket_close"); return -1; } - config.nl = NULL; + *_nl = NULL; return 0; } @@ -121,17 +130,27 @@ static int open_raw_socket(void) { const int *val = &one; if (setsockopt(config.rawsocket, IPPROTO_IP, IP_HDRINCL, val, sizeof(one)) < 0) { - printf("setsockopt(IP_HDRINCL, 1) failed\n"); + fprintf(stderr, "setsockopt(IP_HDRINCL, 1) failed\n"); return -1; } int mark = RAWSOCKET_MARK; if (setsockopt(config.rawsocket, SOL_SOCKET, SO_MARK, &mark, sizeof(mark)) < 0) { - printf("setsockopt(SO_MARK, %d) failed\n", mark); + fprintf(stderr, "setsockopt(SO_MARK, %d) failed\n", mark); return -1; } + int mst = pthread_mutex_init(&config.rawsocket_lock, NULL); + if (mst) { + fprintf(stderr, "Mutex err: %d\n", mst); + close(config.rawsocket); + errno = mst; + + return -1; + } + + return config.rawsocket; } @@ -144,9 +163,12 @@ static int close_raw_socket(void) { if (close(config.rawsocket)) { perror("Unable to close raw socket"); + pthread_mutex_destroy(&config.rawsocket_lock); return -1; } + pthread_mutex_destroy(&config.rawsocket_lock); + config.rawsocket = -2; return 0; } @@ -383,9 +405,14 @@ static int send_raw_socket(struct pkt_buff *pktb) { } }; + pthread_mutex_lock(&config.rawsocket_lock); + int sent = sendto(config.rawsocket, pktb_data(pktb), pktb_len(pktb), 0, (struct sockaddr *)&daddr, sizeof(daddr)); + + pthread_mutex_unlock(&config.rawsocket_lock); + return sent; } @@ -398,22 +425,6 @@ struct packet_data { uint16_t payload_len; }; -/** - * Used to accept unsupported packets (GSOs) - */ -static int fallback_accept_packet(uint32_t id) { - char buf[MNL_SOCKET_BUFFER_SIZE]; - struct nlmsghdr *verdnlh; - verdnlh = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, config.queue_num); - nfq_nlmsg_verdict_put(verdnlh, id, NF_ACCEPT); - - if (mnl_socket_sendto(config.nl, verdnlh, verdnlh->nlmsg_len) < 0) { - perror("mnl_socket_send"); - return MNL_CB_ERROR; - } - - return MNL_CB_OK; -} #define TLS_CONTENT_TYPE_HANDSHAKE 0x16 #define TLS_HANDSHAKE_TYPE_CLIENT_HELLO 0x01 @@ -564,9 +575,30 @@ nextMessage: return vrd; } +// Per-queue data. Passed to queue_cb. +struct queue_data { + struct mnl_socket **_nl; + int queue_num; +}; +/** + * Used to accept unsupported packets (GSOs) + */ +static int fallback_accept_packet(uint32_t id, struct queue_data qdata) { + char buf[MNL_SOCKET_BUFFER_SIZE]; + struct nlmsghdr *verdnlh; + verdnlh = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, qdata.queue_num); + nfq_nlmsg_verdict_put(verdnlh, id, NF_ACCEPT); + + if (mnl_socket_sendto(*qdata._nl, verdnlh, verdnlh->nlmsg_len) < 0) { + perror("mnl_socket_send"); + return MNL_CB_ERROR; + } + + return MNL_CB_OK; +} + +static int process_packet(const struct packet_data packet, struct queue_data qdata) { - -static int process_packet(const struct packet_data packet) { char buf[MNL_SOCKET_BUFFER_SIZE]; struct nlmsghdr *verdnlh; @@ -576,7 +608,7 @@ static int process_packet(const struct packet_data packet) { #endif if (packet.hw_proto != ETH_P_IP) { - return fallback_accept_packet(packet.id); + return fallback_accept_packet(packet.id, qdata); } const int family = AF_INET; @@ -609,7 +641,7 @@ static int process_packet(const struct packet_data packet) { struct verdict vrd = analyze_tls_data(data, data_len); - verdnlh = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, config.queue_num); + verdnlh = nfq_nlmsg_put(buf, NFQNL_MSG_VERDICT, qdata.queue_num); nfq_nlmsg_verdict_put(verdnlh, packet.id, NF_ACCEPT); if (vrd.gvideo_hello) { @@ -690,7 +722,7 @@ static int process_packet(const struct packet_data packet) { } */ - if (mnl_socket_sendto(config.nl, verdnlh, verdnlh->nlmsg_len) < 0) { + if (mnl_socket_sendto(*qdata._nl, verdnlh, verdnlh->nlmsg_len) < 0) { perror("mnl_socket_send"); goto error; @@ -699,12 +731,15 @@ static int process_packet(const struct packet_data packet) { return MNL_CB_OK; fallback: - return fallback_accept_packet(packet.id); + return fallback_accept_packet(packet.id, qdata); error: return MNL_CB_ERROR; } static int queue_cb(const struct nlmsghdr *nlh, void *data) { + + struct queue_data *qdata = data; + struct nfqnl_msg_packet_hdr *ph = NULL; struct nlattr *attr[NFQA_MAX+1] = {0}; struct packet_data packet = {0}; @@ -730,45 +765,30 @@ static int queue_cb(const struct nlmsghdr *nlh, void *data) { if (attr[NFQA_CAP_LEN] != NULL && ntohl(mnl_attr_get_u32(attr[NFQA_CAP_LEN])) != packet.payload_len) { fprintf(stderr, "The packet was truncated! Skip!\n"); - return fallback_accept_packet(packet.id); + return fallback_accept_packet(packet.id, *qdata); } if (attr[NFQA_MARK] != NULL) { // Skip packets sent by rawsocket to escape infinity loop. if (ntohl(mnl_attr_get_u32(attr[NFQA_MARK])) == RAWSOCKET_MARK) { - return fallback_accept_packet(packet.id); + return fallback_accept_packet(packet.id, *qdata); } } - return process_packet(packet); + return process_packet(packet, *qdata); } -int main(int argc, const char *argv[]) -{ +int init_queue(int queue_num) { + struct mnl_socket *nl; - if (parse_args(argc, argv)) { - perror("Unable to parse args"); - exit(EXIT_FAILURE); - } - -#ifdef USE_TCP_SEGMENTATION - printf("Using TCP segmentation!\n"); -#else - printf("Using IP fragmentation!\n"); -#endif - - if (open_socket()) { + if (open_socket(&nl)) { perror("Unable to open socket"); - exit(EXIT_FAILURE); + return -1; } - if (open_raw_socket() < 0) { - perror("Unable to open raw socket"); - close_socket(); - exit(EXIT_FAILURE); - } + uint32_t portid = mnl_socket_get_portid(nl); struct nlmsghdr *nlh; char *buf; @@ -780,63 +800,144 @@ int main(int argc, const char *argv[]) } - nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, config.queue_num); + nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); nfq_nlmsg_cfg_put_cmd(nlh, AF_INET, NFQNL_CFG_CMD_BIND); - if (mnl_socket_sendto(config.nl, nlh, nlh->nlmsg_len) < 0) { + if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { perror("mnl_socket_send"); - goto die_buf; + goto die; } - nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, config.queue_num); + nlh = nfq_nlmsg_put(buf, NFQNL_MSG_CONFIG, queue_num); nfq_nlmsg_cfg_put_params(nlh, NFQNL_COPY_PACKET, 0xffff); #ifdef USE_GSO - printf("GSO is enabled!\n"); - mnl_attr_put_u32(nlh, NFQA_CFG_FLAGS, htonl(NFQA_CFG_F_GSO)); mnl_attr_put_u32(nlh, NFQA_CFG_MASK, htonl(NFQA_CFG_F_GSO)); #endif - if (mnl_socket_sendto(config.nl, nlh, nlh->nlmsg_len) < 0) { + if (mnl_socket_sendto(nl, nlh, nlh->nlmsg_len) < 0) { perror("mnl_socket_send"); - goto die_buf; + goto die; } + /* ENOBUFS is signalled to userspace when packets were lost * on kernel side. In most cases, userspace isn't interested * in this information, so turn it off. */ int ret = 1; - mnl_socket_setsockopt(config.nl, NETLINK_NO_ENOBUFS, &ret, sizeof(int)); + mnl_socket_setsockopt(nl, NETLINK_NO_ENOBUFS, &ret, sizeof(int)); + + struct queue_data qdata = { + ._nl = &nl, + .queue_num = queue_num + }; + + printf("Queue %d started!\n", qdata.queue_num); while (1) { - ret = mnl_socket_recvfrom(config.nl, buf, buf_size); + ret = mnl_socket_recvfrom(nl, buf, buf_size); if (ret == -1) { perror("mnl_socket_recvfrom"); - goto die_buf; + continue; } - ret = mnl_cb_run(buf, ret, 0, config.portid, queue_cb, NULL); + ret = mnl_cb_run(buf, ret, 0, portid, queue_cb, &qdata); if (ret < 0) { perror("mnl_cb_run"); - // goto die_buf; } } - printf("%d\n", config.queue_num); - errno = 0; - free(buf); - close_socket(); + close_socket(&nl); return 0; -die_buf: +die: free(buf); die_sock: - close_raw_socket(); - close_socket(); - exit(EXIT_FAILURE); + close_socket(&nl); + return -1; +} + +// Per-queue config. Used to initialize a queue. Passed to wrapper +struct queue_conf { + uint16_t i; + int queue_num; +}; + +struct queue_res { + int status; +}; + +static struct queue_res threads_reses[MAX_THREADS]; + +void *init_queue_wrapper(void *qdconf) { + struct queue_conf *qconf = qdconf; + struct queue_res *thres = threads_reses + qconf->i; + + thres->status = init_queue(qconf->queue_num); + + fprintf(stderr, "Thread %d exited with status %d\n", qconf->i, thres->status); + + return thres; +} + +int main(int argc, const char *argv[]) { + if (parse_args(argc, argv)) { + perror("Unable to parse args"); + exit(EXIT_FAILURE); + } + +#ifdef USE_TCP_SEGMENTATION + printf("Using TCP segmentation!\n"); +#else + printf("Using IP fragmentation!\n"); +#endif + +#ifdef USE_GSO + printf("GSO is enabled!\n"); +#endif + + if (open_raw_socket() < 0) { + perror("Unable to open raw socket"); + exit(EXIT_FAILURE); + } + +#if THREADS_NUM == 1 + struct queue_conf tconf = { + .i = 0, + .queue_num = config.queue_start_num + }; + + struct queue_res *qres = init_queue_wrapper(&tconf); +#else + struct queue_conf thread_confs[MAX_THREADS]; + pthread_t threads[MAX_THREADS]; + for (int i = 0; i < config.threads; i++) { + struct queue_conf *tconf = thread_confs + i; + pthread_t *thr = threads + i; + + tconf->queue_num = config.queue_start_num + i; + tconf->i = i; + + pthread_create(thr, NULL, init_queue_wrapper, tconf); + } + + void *res; + for (int i = 0; i < config.threads; i++) { + pthread_join(threads[i], &res); + + struct queue_res *qres = res; + } +#endif + + if (close_raw_socket() < 0) { + perror("Unable to close raw socket"); + exit(EXIT_FAILURE); + } + + return qres->status; }