Converting SOCKS5 Connection to SSL: SSL Error during Handshake Operation - Seeking Assistance

50 Views Asked by At

I have implemented a SOCKS5 server in Python and I want to intercept and handle messages over WSS (WebSocket over SSL) with my own certificate. However, I encountered an issue when executing .wrap_socket(sock, server_side=True) before the client hello. It is throwing an SSL error with the message “ssl-ssl-error-ssl-wrong-version-number”, followed by a timeout during the handshake operation (“_ssl.c:985: The handshake operation timed out”). But, I am certain that my certificate and TLS protocol version are correct. So, I wanted to ask if it is possible to convert a SOCKS5 connection to SSL. If it is possible, can you help me understand the reason for this error?

I have tried using all available TLS protocol versions, but none of them work. I would like to be able to intercept and handle WSS messages within the SOCKS5 server (with a valid certificate).

   def establish_socks5(self, sock):
       dest_host, dest_port = None, None
       try:
           ver, nmethods, methods = sock.recv(1), sock.recv(1), sock.recv(1)
           sock.sendall(VER + METHOD)
           ver, cmd, rsv, address_type = sock.recv(1), sock.recv(1), sock.recv(1), sock.recv(1)
           dst_addr = None
           dst_port = None
           if address_type == ADDR_TYPE_IPV4:
               dst_addr, dst_port = sock.recv(4), sock.recv(2)
               dst_addr = '.'.join([str(chr_to_int(i)) for i in dst_addr])
           elif address_type == ADDR_TYPE_DOMAIN:
               addr_len = ord(sock.recv(1))
               dst_addr, dst_port = sock.recv(addr_len), sock.recv(2)
               dst_addr = ''.join([chr(chr_to_int(i)) for i in dst_addr])
           elif address_type == ADDR_TYPE_IPV6:
               dst_addr, dst_port = sock.recv(16), sock.recv(2)
               tmp_addr = []
               for i in range(len(dst_addr) // 2):
                   tmp_addr.append(chr(dst_addr[2 * i] * 256 + dst_addr[2 * i + 1]))
               dst_addr = ':'.join(tmp_addr)
           dst_port = chr_to_int(dst_port[0]) * 256 + chr_to_int(dst_port[1])
           server_sock = sock
           server_ip = ''.join([chr(int(i)) for i in socket.gethostbyname("172.16.8.151").split('.')])
           if cmd == CMD_TYPE_TCP_BIND:
               print('TCP Bind requested, but is not supported by socks5_server')
               sock.close()
           elif cmd == CMD_TYPE_UDP:
               print('UDP requested, but is not supported by socks5_server')
               sock.close()
           elif cmd == CMD_TYPE_CONNECT:
               sock.sendall(VER + SUCCESS + b'\x00' + b'\x01' + encode_str(server_ip +
                                                                           chr(9001 // 256) + chr(
                   9001 % 256)))
               dest_host, dest_port = dst_addr, dst_port
           else:
               # Unsupport/unknown Command
               print('Unsupported/unknown SOCKS5 command requested')
               sock.sendall(VER + UNSUPPORTED_CMD + encode_str(server_ip + chr(9001 // 256) +
                                                               chr(9001 % 256)))
               sock.close()
       except KeyboardInterrupt as e:
           print('Error in SOCKS5 establishment: %s' % e)

       return dest_host, dest_port

   def decorateSocket(self, sock):
    try:
       sslsock = self.context.wrap_socket(sock, server_side=True)
       return sslsock
    except ssl.SSLError as e:
          print(e)
          return None
 
   sock, address = self.serversocket.accept()
   self.establish_socks5(sock)
   newsock = self.decorateSocket(sock)
   newsock.setblocking(0)

0

There are 0 best solutions below