#! /usr/bin/env python # Copyright (c) 2011-2012, Raymond Sweha. # Copyright (c) 2011-2012, Vatche Ishakian. # See LICENSE for licensing information import socket import sys import threading import os import time import traceback import datetime from time import sleep import random import operator ListofBuffers = {} dwnclientMsgs = {} NbrofFailures = 0 NbrofSuccess = 0 mythreads = {} mydwnclients = {} #FILENAME = "" LogFileName = "Angel_"+socket.gethostname() +"_Log.log" LOGGING = True f = open(LogFileName, 'w') f.close() # Logs info to a log file, a global function def logToFile(msg): global LogFileName, LOGGING print msg if LOGGING: fhandler = open(LogFileName, 'a') fhandler.write(msg+ "\n"); fhandler.close() ASFHeader = "" #The simple way that I could pass a message to the Thread. class cMessage(dict, threading.Thread): def __init__(self, i_ClientID): threading.Thread.__init__(self) self["MessageLock"] = threading.Lock() self["ClientID"] = i_ClientID self["Data"] = "0" def SetMessage(self, i_Data): self["MessageLock"].acquire() self["Data"] = i_Data self["MessageLock"].release() class cBuffer(dict, threading.Thread): def __init__(self, i_BufferID, i_StartPointer, i_NumOfTrees): threading.Thread.__init__(self) self["Bufferlock"] = threading.Lock() self["BufferID"] = i_BufferID self["Data"] = {} self["Pointer"] = int(i_StartPointer) self["LastPointerReceived"] = 0 #Its the all the future data that we got out of order. self["NumofTrees"] = int(i_NumOfTrees) def AddData(self, i_DataID, i_DataValue): self["Bufferlock"].acquire() self["Data"][int(i_DataID)] = i_DataValue while (self["Data"].has_key(int(self["Pointer"]) + self["NumofTrees"])): self["Pointer"] = int(self["Pointer"]) + self["NumofTrees"] if self["LastPointerReceived"] < int(i_DataID): self["LastPointerReceived"] = int(i_DataID) self["Bufferlock"].release() class downloadclient(threading.Thread): global ListofBuffers, dwnclientMsgs def __init__(self,i_bufferid, i_parentIP, i_parentPort, i_TreeID, i_IP, i_cThreshold, i_TrackerIP, i_TrackerPort, i_PortNum, i_PacketSize): threading.Thread.__init__(self) self.bufferid = i_bufferid self.parentIP = i_parentIP self.parentPort = i_parentPort self.TreeID = i_TreeID self.IP = i_IP self.PortNum = i_PortNum self.cThreshold = i_cThreshold self.TrackerIP = i_TrackerIP self.TrackerPort = i_TrackerPort self.PacketSize= i_PacketSize def run(self): global ListofBuffers, dwnclientMsgs #Loop as long as you don't have a connection. while (True): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((self.parentIP, int(self.parentPort))) s.settimeout(300) #Set Time out on the connection. break; except: logToFile("Error, cant connect to parent "+ self.parentIP +" in tree" +self.TreeID) logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) s.close() s = None if s is None: time.sleep(0.1) #datastream = "" print "bufferID", self.bufferid oBufferObj = ListofBuffers[int(self.bufferid)] msg= "CLIENTS" + self.TreeID + "|" + str(oBufferObj["Pointer"]) + "|" + self.IP #print "Message to send " + self.parentIP, msg s.send(msg) logToFile("Angel " + self.IP+ " requests to be adopted by node " +self.parentIP+ " at tree " +str(self.TreeID)+ " starting at chunk " + str(oBufferObj["Pointer"])) #LastTimeReceived = time.time() while(1): try: #send a message to the server informing it of your id. data1 = "" while (len(data1)<12): data1 = data1+ s.recv(12-len(data1)) l = list(data1) totsize = (ord(l[3]) * 2**8) + ord(l[2]) - 8 SeqNumber = ((((ord(l[7]) * 256 + ord(l[6])) * 256) + ord(l[5])) * 256 + ord(l[4])) #data2 = s.recv(totsize) data2="" print "downloading chunk" , SeqNumber while (len(data2)= self.PacketSize: # tmpdata = datastream[:self.PacketSize] # arrdata = tmpdata.split("|") # tmpChunkID = int(arrdata[0]) # print "ChunkID received = " , tmpChunkID # tmpData = arrdata[1] oBufferObj.AddData(SeqNumber, data1 + data2) # datastream = datastream[self.PacketSize:] if dwnclientMsgs[self.bufferid]["Data"] == "1": logToFile("Break signal was sent: Angel " + self.IP+ " stopped recieving data from node " +str(self.parentIP)+ " at tree " +str(self.TreeID)+ " ending at chunk " + str(oBufferObj["Pointer"])) break; s.close() return class srvr(threading.Thread): global ListofBuffers def __init__(self,clntsock, i_TreeID, i_StartPoint, i_ChildIP, i_StreamRate, i_PacketSize, i_NumOfTrees, i_TrackerIP, i_TrackerPort): threading.Thread.__init__(self) self.myclntsock = clntsock self.TreeID = int(i_TreeID) self.StartingPoint = int(i_StartPoint) self.ChildIP = i_ChildIP self.PiecesSent = {} self.StreamRate = i_StreamRate self.PacketSize = i_PacketSize self.NumOfTress = int(i_NumOfTrees) self.TrackerIP = i_TrackerIP self.TrackerPort = i_TrackerPort def GetNextChunkToSend(self): global ListofBuffers oBufferObj = ListofBuffers[self.TreeID] lPointer = self.StartingPoint while (((not oBufferObj["Data"].has_key(int(lPointer))) or self.PiecesSent.has_key(int(lPointer))) and (oBufferObj["LastPointerReceived"] >= lPointer)): lPointer = lPointer + self.NumOfTress if (lPointer > oBufferObj["LastPointerReceived"]): return -1 else: return lPointer def run(self): global ListofBuffers oBufferObj = ListofBuffers[self.TreeID] logToFile("Angel " + socket.gethostname()+ " started sending data to node " +self.ChildIP+ " at tree " +str(self.TreeID)+ " starting at chunk " + str(self.StartingPoint)) while 1: lPointer = self.GetNextChunkToSend() if lPointer <> -1: if oBufferObj["Data"].has_key(int(lPointer)): try: dataToSend = oBufferObj["Data"][int(lPointer)] #dataToSend = dataToSend[:self.PacketSize] sent = self.myclntsock.send(dataToSend) # if sent == 0: # raise RuntimeError,"socket connection broken" self.PiecesSent[int(lPointer)] = lPointer while self.PiecesSent.has_key(int(self.StartingPoint)): self.StartingPoint = self.StartingPoint + self.NumOfTress #time.sleep(1) #sleepValue = float(self.PacketSize)/float(self.StreamRate) #time.sleep(sleepValue) except: #print "Done with connection ERROR" logToFile("Angel " + socket.gethostname()+ " stopped sending data to node " +self.ChildIP+ " at tree " +str(self.TreeID)+ " ending at chunk " + str(lPointer)) self.myclntsock = None logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) # send a msg to the tracker complaining about child s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s2.connect((self.TrackerIP, int(self.TrackerPort))) s2.settimeout(300) # send a complaint to the tracker that your were upable to reach child msg= "2|"+str(socket.gethostname()) +"|" +str(self.TreeID)+"|" +str(self.ChildIP) s2.send(msg) s2.close() except: logToFile("Uncable to conenct to the tracker") logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) break else: logToFile("ERROR: Server " + socket.gethostname()+ " stopped sending data to node " +self.ChildIP+ " at tree " +str(self.TreeID)+ " ending at chunk " + str(lPointer)) print "Something is wrong with the logic. I don't have data to send to the client..." else: #print "fff", lPointer , self.StartingPoint, self.TreeID sleepValue = float(self.PacketSize)/float(self.StreamRate)/100 time.sleep(sleepValue) return # return sys.exit(2) class cAngel(dict): global ListofBuffers, dwnclientMsgs, mythreads, mydwnclients def __init__(self,i_trackerIP, i_trackerPort, i_fileID, i_UploadCapacity, i_IPAddress, i_portNumber): self.trackerIP = i_trackerIP # Tracker IP address self.trackerPort = int(i_trackerPort) # Tracker port number self.fileID = i_fileID # the ID of the file to be downloaded self.UploadCapacity = i_UploadCapacity #the upload capacity of this client self.IPAddress =i_IPAddress # the IP address of this client self.portNumber = i_portNumber # the port number of this client self.numberOfTrees = 0 self.StreamRate = 0 self.PacketSize = 3496 self.Threshold = 0 #self.MovieStartPointer = 0 #self.MovieStartUpDelay = 0.0 ListofBuffers = {} def sendJoinRequest(self): count = 0 while (True): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: count= count+1 s.connect((self.trackerIP, self.trackerPort)) s.settimeout(300) #Set Time out on the connection. msg="4|"+ str(self.IPAddress) + "|" + str(self.portNumber) + "|" + str(self.fileID) +"|"+str(self.UploadCapacity) #print msg s.send(msg) s.close() break; except: s.close() logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) logToFile( "Error in sendJoinRequest, connection to the tracker cant be established tial number" + str(count)) s = None if count >100: logToFile(" Giving up and aborting") sys.exit(0) time.sleep(0.1) def listenForIncomingConnections(self): ## the tracker will evntually look like this, a listener to requests # process incoming requests lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lstn.bind(('', self.portNumber)) lstn.listen(100) while 1: # accept request from clients (incomingSocket, incomingIP) = lstn.accept() clientMessage = incomingSocket.recv(7) if clientMessage == "TRACKER": incomingMessage = incomingSocket.recv(2) l = list(incomingMessage) totsize = (ord(l[0]) * 2**8) + ord(l[1]) strFirstMessage = incomingSocket.recv(totsize) elif clientMessage =="CLIENTS": strFirstMessage = incomingSocket.recv(int(self.PacketSize)) logToFile("an adoption request from another client was received") elif clientMessage == "ECHOMSG": incomingSocket.send("echoback") incomingSocket.close() continue else: logToFile("Error: An unkown msg was received "+clientMessage + " from node " + str(incomingIP) ) # process the incoming msg self.processesMessage(strFirstMessage, incomingSocket, incomingIP) #incomingSocket.close() def processTrackerMessage(self, incomingMessage, incomingSocket): global ListofBuffers, dwnclientMsgs, mydwnclients, bStartedMoviePlayer arrRequest = incomingMessage.split("|") #print "arrRequest ", arrRequest if (arrRequest[0] == "Welcome"): self.numberOfTrees = arrRequest[1] for i in range(1,int(self.numberOfTrees)+1): ChunkValue = ((int(arrRequest[2]) / int(self.numberOfTrees)) * int(self.numberOfTrees)) + i oBuffer = cBuffer(i, ChunkValue, self.numberOfTrees) ListofBuffers[i] = oBuffer self.StreamRate = int(arrRequest[4]) self.PacketSize = int(arrRequest[5]) self.Threshold = (float(self.PacketSize)/float(self.StreamRate)) * int(self.numberOfTrees) if (arrRequest[0] == "ConnectTo"): #Now create a new download client. if (dwnclientMsgs.has_key(arrRequest[3])): time.sleep(0.2) oMessage = cMessage(arrRequest[3]) dwnclientMsgs[arrRequest[3]] = oMessage dwnclient = downloadclient(arrRequest[3], arrRequest[1],arrRequest[2], arrRequest[3], self.IPAddress, self.Threshold, self.trackerIP, self.trackerPort, self.portNumber, self.PacketSize) mydwnclients[arrRequest[3]] = dwnclient dwnclient.start() # if (arrRequest[0] == "Adopt"): # TODO this function is not that important, # Update the data structure for the children of this tree if (arrRequest[0] == "Disconnect"): #print "Received Disconnect Message" # Stop downloading from a a certain parent # we can wait until we have a new parent #TODO (FIRST STEP) Close the downloadagent, and kill the downloadagent thread. oClientMessage = dwnclientMsgs[arrRequest[3]] #print "Setting the message to be 1" oClientMessage.SetMessage("1") #This should kill the downloadagent thread. dwnclientMsgs[arrRequest[3]] = oClientMessage #print dwnclientMsgs #print "Finished Setting the message to be 1" #del mydwnclients[arrRequest[3]] # if (arrRequest[0] == "Orphan"): # TODO # stop sending data to this child # most probably, we shouldn't do anything here if (arrRequest[0] == "Echo"): # This is a Probe message from the tracker, reply with EchoBack incomingSocket.send("CLIENTSEchoBack") def checkLegetimateChild(self, incomingMessage, incomingIP): # TODO this function is not that important # This function checks when a child ttryies to connect if it was a legetimate one or not return 1 def processesMessage(self, incomingMessage, incomingSocket, incomingIP): arrMessage = incomingMessage.split("|") if (str(arrMessage[0]) == "Welcome") or (str(arrMessage[0]) == "ConnectTo") or (str(arrMessage[0]) == "Disconnect") or (str(arrMessage[0]) == "Echo"): #this msg if from tracker about updating to whom I should connect, or whom should connect to me #this msg is sent from "stream" or "unstream" function in the tracker self.processTrackerMessage(incomingMessage, incomingSocket) else: # the msg is coming from a child who wants to download data # verify he is legitimate, then start downloading if (self.checkLegetimateChild( incomingMessage, incomingIP)== 1): #ANGEL BEING CALLED UPON f = open("CalledAngel" + self.IPAddress, 'w') f.write("Being Called Upon\n") f.close() arrMessage = incomingMessage.split("|") print "Arrival message from the child", incomingMessage ct = srvr(incomingSocket, arrMessage[0], arrMessage[1], arrMessage[2], self.StreamRate, self.PacketSize, self.numberOfTrees, self.trackerIP, self.trackerPort) mythreads[arrMessage[0]+"_"+arrMessage[2]] = ct #mythreads.append(ct) ct.start() def main(): #global FILENAME try: UploadCapacity = sys.argv[1] #the upload capacity of this client IPAddress =socket.gethostname() # the IP address of this client portNumber = int(sys.argv[2]) trackerIP = sys.argv[3] #ent trackerPort = int(sys.argv[4]) #FILENAME = sys.argv[5] fileID = 1 oClient = cAngel(trackerIP, trackerPort, fileID, UploadCapacity, IPAddress, portNumber) oClient.sendJoinRequest() oClient.listenForIncomingConnections() except: logToFile( "A global error accured, aborting!") logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) traceback.print_exc() if __name__ == "__main__": #DOC if len(sys.argv) < 5: print "USAGE: python angel.py AngelUp AngelAddress AngelPort RegistrarAddress RegistrarPort" print "\t AngelUp: The uplink capacity of this angel" print "\t AngelPort: The port at which the angel receive connections" print "\t RegistrarAddress: The address of the registrar (IP or domain name)" print "\t RegistrarPort: The port at which the registrar receive connections" print "Example: python angel.py 3000000 angel.com 9123 registrar.com 9123" sys.exit(0) main()