# Copyright (c) 2006 by Seo Sanghyeon # Copyright (c) Python Software Foundation # 2006-01-31 sanxiyn Created # 2006-02-03 sanxiyn Added DNS functions and getsockname/getpeername # 2006-03-15 sanxiyn Added socket.error, convert number to constant # 2006-03-17 sanxiyn Added sendall, makefile # 2006-03-18 sanxiyn Added recvfrom, use array slicing # Added getsockopt/setsockopt, getfqdn # 2006-04-13 sanxiyn Added error handling with _handle_error decorator # 2006-04-19 sanxiyn Added SSL support # 2006-04-26 sanxiyn Added sendto, fixed close # 2006-05-01 sanxiyn Added inet_aton/inet_ntoa # 2006-11-11 baus Added _fileobject class from System import Array, Byte, Enum from System.Net import IPAddress, IPEndPoint from System.Net.Sockets import Socket, SocketException, NetworkStream from System.Net.Sockets import AddressFamily, ProtocolType, SocketType from System.Net.Sockets import SocketOptionLevel, SocketOptionName from System.Text import Encoding raw = Encoding.GetEncoding('iso-8859-1') def _make_buffer(size): return Array.CreateInstance(Byte, size) AF_INET = AddressFamily.InterNetwork AF_UNSPEC = AddressFamily.Unspecified SOCK_STREAM = SocketType.Stream SOCK_DGRAM = SocketType.Dgram IPPROTO_IP = ProtocolType.IP SOL_SOCKET = SocketOptionLevel.Socket SO_REUSEADDR = SocketOptionName.ReuseAddress class error(Exception): pass def _handle_error(function): def wrapper(*args, **kwargs): try: return function(*args, **kwargs) except SocketException, e: code = e.SocketErrorCode message = e.Message raise error(code, message) return wrapper def _address_to_endpoint(address): host, port = address ip = gethostbyname(host) return IPEndPoint(IPAddress.Parse(ip), port) def _endpoint_to_address(endpoint): return str(endpoint.Address), endpoint.Port def _make_endpoint(): return IPEndPoint(IPAddress.Any, 0) class PythonSocket: def __init__(self, socket): self.socket = socket @_handle_error def bind(self, address): endpoint = _address_to_endpoint(address) self.socket.Bind(endpoint) def listen(self, backlog): self.socket.Listen(backlog) def accept(self): conn = self.socket.Accept() address = _endpoint_to_address(conn.RemoteEndPoint) return PythonSocket(conn), address @_handle_error def connect(self, address): endpoint = _address_to_endpoint(address) self.socket.Connect(endpoint) def send(self, string): bytes = raw.GetBytes(string) return self.socket.Send(bytes) sendall = send def sendto(self, string, address): bytes = raw.GetBytes(string) endpoint = _address_to_endpoint(address) return self.socket.SendTo(bytes, endpoint) def recv(self, bufsize): buffer = _make_buffer(bufsize) received = self.socket.Receive(buffer) string = raw.GetString(buffer[:received]) return string def recvfrom(self, bufsize): buffer = _make_buffer(bufsize) endpoint = _make_endpoint() received, endpoint = self.socket.ReceiveFrom(buffer, endpoint) string = raw.GetString(buffer[:received]) address = _endpoint_to_address(endpoint) return string, address def close(self): # There may be NetworkStream's still open #self.socket.Close() pass def getsockopt(self, level, name): return self.socket.GetSocketOption(level, name) def setsockopt(self, level, name, value): self.socket.SetSocketOption(level, name, value) def gettimeout(self): return None def settimeout(self, value): pass def setblocking(self, flag): pass def getsockname(self): endpoint = self.socket.LocalEndPoint return _endpoint_to_address(endpoint) def getpeername(self): endpoint = self.socket.RemoteEndPoint return _endpoint_to_address(endpoint) def makefile(self, mode='r', bufsize=-1): stream = NetworkStream(self.socket) return file(stream, mode) def socket(family=AF_INET, type=SOCK_STREAM, proto=IPPROTO_IP): family = Enum.ToObject(AddressFamily, family) type = Enum.ToObject(SocketType, type) proto = Enum.ToObject(ProtocolType, proto) socket = Socket(family, type, proto) return PythonSocket(socket) # -------------------------------------------------------------------- # IP address functions def inet_aton(string): ip = IPAddress.Parse(string) packed = raw.GetString(ip.GetAddressBytes()) return packed def inet_ntoa(packed): bytes = raw.GetBytes(packed) ip = IPAddress(bytes) return str(ip) # -------------------------------------------------------------------- # DNS functions from System.Net import Dns def _safe_gethostbyname(hostname): if not hostname: hostname = str(IPAddress.Any) return Dns.GetHostByName(hostname) def _entry_to_triple(entry): hostname = entry.HostName aliaslist = list(entry.Aliases) ipaddrlist = map(str, entry.AddressList) return hostname, aliaslist, ipaddrlist def gethostname(): return Dns.GetHostName() def gethostbyname(hostname): entry = _safe_gethostbyname(hostname) return str(entry.AddressList[0]) def gethostbyname_ex(hostname): entry = _safe_gethostbyname(hostname) return _entry_to_triple(entry) def gethostbyaddr(address): entry = Dns.GetHostByAddress(address) return _entry_to_triple(entry) def getfqdn(hostname=''): if not hostname: hostname = Dns.GetHostName() entry = Dns.GetHostByName(hostname) return entry.HostName def getaddrinfo(host, port, family=AF_INET, socktype=SOCK_STREAM, proto=IPPROTO_IP, flags=None): entry = _safe_gethostbyname(host) family = Enum.ToObject(AddressFamily, family) socktype = Enum.ToObject(SocketType, socktype) proto = Enum.ToObject(ProtocolType, proto) if family == AF_UNSPEC: family = AF_INET return [ (family, socktype, proto, '', (str(ip), port)) for ip in entry.AddressList ] # -------------------------------------------------------------------- # SSL from ssl import _make_ssl_stream class SSL: def __init__(self, stream): self.stream = stream def write(self, string): bytes = raw.GetBytes(string) self.stream.Write(bytes) return len(bytes) def _read_once(self, bufsize): buffer = _make_buffer(bufsize) count = self.stream.Read(buffer, 0, bufsize) if count: return raw.GetString(buffer[:count]) else: return '' def _read_to_end(self): bufsize = 4096 all = [] while True: string = self._read_once(bufsize) if not string: break all.append(string) return ''.join(all) def read(self, bufsize=None): if bufsize is None: return self._read_to_end() else: return self._read_once(bufsize) def ssl(socket, keyfile=None, certfile=None): stream = NetworkStream(socket.socket) ssl = _make_ssl_stream(stream) return SSL(ssl) # # The following class has been taken directly from the Python # Library source. It is licensed with the Python License. # It is provided for the other standard libraries which make # direct use of this class (for instance urllib2). class _fileobject(object): """Faux file object attached to a socket object.""" default_bufsize = 8192 name = "" __slots__ = ["mode", "bufsize", "softspace", # "closed" is a property, see below "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf"] def __init__(self, sock, mode='rb', bufsize=-1): self._sock = sock self.mode = mode # Not actually used in this version if bufsize < 0: bufsize = self.default_bufsize self.bufsize = bufsize self.softspace = False if bufsize == 0: self._rbufsize = 1 elif bufsize == 1: self._rbufsize = self.default_bufsize else: self._rbufsize = bufsize self._wbufsize = bufsize self._rbuf = "" # A string self._wbuf = [] # A list of strings def _getclosed(self): return self._sock is None closed = property(_getclosed, doc="True if the file is closed") def close(self): try: if self._sock: self.flush() finally: self._sock = None def __del__(self): try: self.close() except: # close() may fail if __init__ didn't complete pass def flush(self): if self._wbuf: buffer = "".join(self._wbuf) self._wbuf = [] self._sock.sendall(buffer) def fileno(self): return self._sock.fileno() def write(self, data): data = str(data) # XXX Should really reject non-string non-buffers if not data: return self._wbuf.append(data) if (self._wbufsize == 0 or self._wbufsize == 1 and '\n' in data or self._get_wbuf_len() >= self._wbufsize): self.flush() def writelines(self, list): # XXX We could do better here for very long lists # XXX Should really reject non-string non-buffers self._wbuf.extend(filter(None, map(str, list))) if (self._wbufsize <= 1 or self._get_wbuf_len() >= self._wbufsize): self.flush() def _get_wbuf_len(self): buf_len = 0 for x in self._wbuf: buf_len += len(x) return buf_len def read(self, size=-1): data = self._rbuf if size < 0: # Read until EOF buffers = [] if data: buffers.append(data) self._rbuf = "" if self._rbufsize <= 1: recv_size = self.default_bufsize else: recv_size = self._rbufsize while True: data = self._sock.recv(recv_size) if not data: break buffers.append(data) return "".join(buffers) else: # Read until size bytes or EOF seen, whichever comes first buf_len = len(data) if buf_len >= size: self._rbuf = data[size:] return data[:size] buffers = [] if data: buffers.append(data) self._rbuf = "" while True: left = size - buf_len recv_size = max(self._rbufsize, left) data = self._sock.recv(recv_size) if not data: break buffers.append(data) n = len(data) if n >= left: self._rbuf = data[left:] buffers[-1] = data[:left] break buf_len += n return "".join(buffers) def readline(self, size=-1): data = self._rbuf if size < 0: # Read until \n or EOF, whichever comes first if self._rbufsize <= 1: # Speed up unbuffered case assert data == "" buffers = [] recv = self._sock.recv while data != "\n": data = recv(1) if not data: break buffers.append(data) return "".join(buffers) nl = data.find('\n') if nl >= 0: nl += 1 self._rbuf = data[nl:] return data[:nl] buffers = [] if data: buffers.append(data) self._rbuf = "" while True: data = self._sock.recv(self._rbufsize) if not data: break buffers.append(data) nl = data.find('\n') if nl >= 0: nl += 1 self._rbuf = data[nl:] buffers[-1] = data[:nl] break return "".join(buffers) else: # Read until size bytes or \n or EOF seen, whichever comes first nl = data.find('\n', 0, size) if nl >= 0: nl += 1 self._rbuf = data[nl:] return data[:nl] buf_len = len(data) if buf_len >= size: self._rbuf = data[size:] return data[:size] buffers = [] if data: buffers.append(data) self._rbuf = "" while True: data = self._sock.recv(self._rbufsize) if not data: break buffers.append(data) left = size - buf_len nl = data.find('\n', 0, left) if nl >= 0: nl += 1 self._rbuf = data[nl:] buffers[-1] = data[:nl] break n = len(data) if n >= left: self._rbuf = data[left:] buffers[-1] = data[:left] break buf_len += n return "".join(buffers) def readlines(self, sizehint=0): total = 0 list = [] while True: line = self.readline() if not line: break list.append(line) total += len(line) if sizehint and total >= sizehint: break return list # Iterator protocols def __iter__(self): return self def next(self): line = self.readline() if not line: raise StopIteration return line