#! /usr/bin/env python # Copyright (c) 2011-2012, Raymond Sweha. # Copyright (c) 2011-2012, Vatche Ishakian. # See LICENSE for licensing information import socket import sys import threading import os import time import datetime import traceback from time import sleep import re, urllib2 import operator sTreeID = 0 lNodeID = 0 global DEBUG LOGGING=True ASFHeader = "" LogFileName = "Registrar_Log.log" # Logs info to a log file, a global function def logToFile(msg): global LogFileName, LOGGING if LOGGING: fhandler = open(LogFileName, 'a') fhandler.write(msg+ "\n"); print msg fhandler.close() def Static_TreeID() : global sTreeID sTreeID += 1 return sTreeID def Static_NodeID() : global lNodeID lNodeID += 1 return lNodeID #This class represents a node in the tree, That is one node will be mapped to many of these TreeNodes, the "primary key" is TreeID, and TNodeID class TreeNode(dict): def __init__(self, i_nbrChildAllowd, i_TreeID, i_NodeID, i_NodePort, i_PacketSize): self["TNodeID"]= i_NodeID self["TNodePort"]= int(i_NodePort) self["ClosestAdNode"] = None self["ClosestAdDistance"] = 100000 self["ClosestIntNode"] = None self["ClosestIntDistance"] = 0 self["NbrChildAllowed"] = i_nbrChildAllowd self["TreeID"] = i_TreeID self["ChildList"] = [] self["ParentList"] = [] #Should be only one. self["height"] = 0 self["PacketSize"] = int(i_PacketSize) def AddChild(self, i_TreeNode): i_TreeNode["ParentList"] = [] if (self["NbrChildAllowed"] > len(self["ChildList"])): #Adapt self as a parent of the child i_TreeNode["ParentList"].append(self) #Adapt this node as a direct child self["ChildList"].append(i_TreeNode) # tell this node to send the stream from this tree to this child self.stream(i_TreeNode) #update the meta data i_TreeNode.Updateheight() i_TreeNode.updateClosestIntercept() i_TreeNode.updateClosestAdopter() else: #Adapt this node as an intercepteding child if (i_TreeNode["NbrChildAllowed"] < 1): logToFile( "this node cant intercept, something went wrong 111111", self["TreeID"]) return intercepted = None for i in range(0,len(self["ChildList"])): oChildNode = self["ChildList"][i] if (len(oChildNode["ChildList"]) == 0): #this child is a leaf, make it intrecepted intercepted = oChildNode break if (intercepted == None): print "this node cant intercept, something went wrong", self["TreeID"] logToFile( "this node cant intercept, something went wrong", self["TreeID"]) return #adopt child self.stream(i_TreeNode) i_TreeNode["ParentList"].append(self) #Adapt this node as a direct child self["ChildList"].append(i_TreeNode) # logToFile("Node "+self["TNodeID"]+ " adopted node "+i_TreeNode["TNodeID"]+ ". through interception, in tree #"+self["TreeID"]) #orphan intercepted self.unstream(intercepted) for i in range(0, len(self["ChildList"])): if self["ChildList"][i]["TNodeID"] == intercepted["TNodeID"]: rIndex = i self["ChildList"].pop(rIndex) #I am pretty sure that we need the index of it,rather than anything. (easy fix though) #till child to adopt intercepted i_TreeNode.AddChild(intercepted) # No need to update the meta data of self, child and intercepted. It will be updated when calling child.addChild(intercepted) # we might need to debug this buut I think it should be working def updateClosestIntercept(self): if (self["NbrChildAllowed"] > len(self["ChildList"])): #self.possibleChildren indicates the number of children this node can #adapt in this tree, it takes values between 0 and d (the arty of the tree) # this node can adopt one more child, assign self as closest Intercept and exit self["ClosestIntNode"] = self self["ClosestIntDistance"] = 0 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestIntercept() return oldIntercept = self["ClosestIntNode"] minDistance = 10000 intercept = None # find the child with the closestIntercept for i in range(0,len(self["ChildList"])): oChildNode = self["ChildList"][i] #if (oChildNode["ClosestIntNode"] == None): if (len(oChildNode["ChildList"]) == 0): #this child is a leaf, set self as an intercept minDistance = -1 intercept = self if (minDistance > oChildNode["ClosestIntDistance"]): # this child has a closer intercept minDistance = oChildNode["ClosestIntDistance"] intercept = oChildNode["ClosestIntNode"] #if the closest intercept changed, update self and escalate the update up the tree if not (oldIntercept == None or intercept == None): #if (oldIntercept["TNodeID"] != intercept["TNodeID"]): self["ClosestIntNode"] = intercept self["ClosestIntDistance"] = minDistance +1 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestIntercept() else: #if ((oldIntercept == None and intercept != None) or (oldIntercept != None and intercept == None)): self["ClosestIntNode"] = intercept self["ClosestIntDistance"] = minDistance +1 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestIntercept() def updateClosestAdopter(self): # should we break ties in favor of the node with the shoretst distance to the root? if (self["NbrChildAllowed"] > len(self["ChildList"])): #self.possibleChildren indicates the number of children this node can adapt in this tree, it takes values between 0 and d (the arty of the tree) # this node can adopt one more child, assign self as closest adaptor and exit self["ClosestAdNode"] = self self["ClosestAdDistance"] = 0 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestAdopter() return oldAdopter = self["ClosestAdNode"] minDistance = 10000 adaptor = None # find the child with the closestAdopter for i in range(0,len(self["ChildList"])): oChildNode = self["ChildList"][i] if (minDistance > oChildNode["ClosestAdDistance"]): minDistance = oChildNode["ClosestAdDistance"] adaptor = oChildNode["ClosestAdNode"] #if the closest adaptor changed, update self and escalate the update up the tree if not (oldAdopter == None or adaptor == None): #if (oldAdopter["TNodeID"] != adaptor["TNodeID"]): self["ClosestAdNode"] = adaptor self["ClosestAdDistance"] = minDistance +1 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestAdopter() else: #if ((oldAdopter == None and adaptor != None) or (oldAdopter != None and adaptor == None)): self["ClosestAdNode"] = adaptor self["ClosestAdDistance"] = minDistance +1 if (self["ParentList"][0]["TNodeID"] != self["TNodeID"]): self["ParentList"][0].updateClosestAdopter() def Updateheight(self): oldheight = self["height"] MaxHeight = 0; for i in range(0,len(self["ChildList"])): oChildNode = self["ChildList"][i] if (MaxHeight < oChildNode["height"]): MaxHeight = oChildNode["height"] if (oldheight != MaxHeight+1): self["height"] = 1+ MaxHeight self["ParentList"][0].Updateheight() def unstream(self, i_ChildNode): global DEBUG logToFile("Node "+str(self["TNodeID"])+ " deleted node "+str(i_ChildNode["TNodeID"])+ "from tree #"+str(self["TreeID"])) #print i_ChildNode["TNodeID"],str(i_ChildNode["TNodePort"]),"Disconnect|"+str(self["TNodeID"])+"|"+str(self["TNodePort"])+"|"+str(i_ChildNode["TreeID"]) if ( DEBUG =="1"): return 0 # send to child "Disconnect|ParentNodeID|ParentNodePort|TreeID" sMsg = "Disconnect|"+str(self["TNodeID"])+"|"+str(self["TNodePort"])+"|"+str(i_ChildNode["TreeID"]) l1 = chr(int(len(sMsg)/256)) l2 = chr(int(len(sMsg)%256)) sMsg = "TRACKER" +l1 + l2 + sMsg self.sendMessage(i_ChildNode["TNodeID"],str(i_ChildNode["TNodePort"]),sMsg) # send to parent (self) "Orphan|ChildNodeID|ChildNodePort|TreeID" #self.sendMessage(self["TNodeID"],str(self["TNodePort"]),"Orphan|"+str(i_ChildNode["TNodeID"])+"|"+str(i_ChildNode["TNodePort"])+"|"+str(i_ChildNode["TreeID"])) return 0 def stream(self, i_ChildNode): global DEBUG if ( DEBUG =="1"): return 0 logToFile("Node "+str(self["TNodeID"])+ " adopted node "+str(i_ChildNode["TNodeID"])+ "in tree #"+str(self["TreeID"])) #print "Sending message to " , i_ChildNode["TNodeID"] # send to child "ConnectTo|ParentNodeID|ParentNodePort|TreeID" sMsg = "ConnectTo|"+str(self["TNodeID"])+"|"+str(self["TNodePort"])+"|"+str(i_ChildNode["TreeID"]) l1 = chr(int(len(sMsg)/256)) l2 = chr(int(len(sMsg)%256)) sMsg = "TRACKER" + l1 + l2 + sMsg self.sendMessage(i_ChildNode["TNodeID"],str(i_ChildNode["TNodePort"]),sMsg) # send to parent (self) "Adopt|ChildNodeID|ChildNodePort|TreeID" #self.sendMessage(self["TNodeID"],str(self["TNodePort"]),"Adopt|"+str(i_ChildNode["TNodeID"])+"|"+str(i_ChildNode["TNodePort"])+"|"+str(i_ChildNode["TreeID"])) return 0 def Probe(self): global DEBUG if (DEBUG =="1"): return 1 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: #print "DDDDDD", self["TNodeID"], self["TNodePort"] s.connect((self["TNodeID"], int(self["TNodePort"]))) s.send("ECHOMSG") data = s.recv(int(self["PacketSize"])) logToFile( "Successful probe to : "+ str(self["TNodeID"])+" saying " + str(data)) s.close() except: logToFile( "EXCEPTION IN receiving an echoback; probe failed") logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) return False return True # this method send msgContent to the client identified by destinationIP, destinationPort def sendMessage(self, destinationIP, destinationPort, msgContent): global DEBUG if ( DEBUG =="1"): return 0 tries = 0 while (True): print msgContent s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: logToFile("Sending " +destinationIP+ " this msg "+msgContent) s.connect((destinationIP, int(destinationPort))) s.settimeout(300) #Set Time out on the connection. # f=open("Tracker2_"+str( self["TNodeID"]), "a") #f.write("\nConnected") #f.close() s.send(msgContent) break; except: #print "IN EXCEPTION", sys.exc_info() #print ValueError logToFile("Error in sedning "+destinationIP+ " this msg "+msgContent) logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) s.close() s = None tries=tries+1 if tries >= 3: return -1 if s is None: time.sleep(0.1) s.close() #Define a Tree Structure. #The Tree Contains two dictionaries, one is the NodeList, and the other being the AngelList. So The TreeNodes are simply a list in this tree, class cTree(dict): def __init__(self, i_VacSpot, i_Tracker): self["TreeID"] = Static_TreeID() self["VacSpots"] = i_VacSpot self["NodeList"] = {} self["AngelList"] = {} self["RootID"] = 0 #TODO: Need to update the rootid of the tree during each operation. self["Tracker"] = i_Tracker def ChangeConnection(self, ParentNodeID, ChildNodeID): #print "ParentNodeID, ChildNodeID", ParentNodeID, ChildNodeID if (self["NodeList"].has_key(ParentNodeID)): ParentNode = self["NodeList"][ParentNodeID] else: ParentNode = self["AngelList"][ParentNodeID] ChildNode = self["NodeList"][ChildNodeID] RootNode = self["NodeList"][self["RootID"]] # old condition RootNode["ClosestAdNode"]["TNodeID"] == ParentNode["ClosestAdNode"]["TNodeID"] if ((RootNode["ClosestAdNode"]["TNodeID"] == ParentNode["TNodeID"]) or ((ChildNode["ClosestAdNode"] != None) and (RootNode["ClosestAdNode"]["TNodeID"] == ChildNode["ClosestAdNode"]["TNodeID"])) ): #if we sever this connection and add the child to the roots closest intercept, we will re-establish the same connection. #We need to avoid that. by making sure that the parent's closestAdopter no longer propagates upwards. if (ParentNode["ParentList"][0]["TNodeID"] == ParentNode["TNodeID"] ): print "Situation happend: The connection to the root is problematic, and I cannot change anything" return else: GrandParentNode = ParentNode["ParentList"][0] oldparentDistance = ParentNode["ClosestAdDistance"] ParentNode["ClosestAdDistance"] = 10000000 GrandParentNode.updateClosestAdopter() #add child to the roots closest adaptor RootNode["ClosestAdNode"].AddChild(ChildNode) # RootNode["ClosestAdNode"].Updateheight() # RootNode["ClosestAdNode"].updateClosestIntercept() # RootNode["ClosestAdNode"].updateClosestAdopter() #disconnect child from parent and update all parameter ParentNode.unstream(ChildNode) try: for i in range(0, len(ParentNode["ChildList"])): if ParentNode["ChildList"][i]["TNodeID"] == TreeNodeInfo["TNodeID"]: rIndex = i ParentNode["ChildList"].pop(rIndex) except: logToFile("unable to remove a child from a parent") ParentNode["ClosestAdDistance"] = oldparentDistance ParentNode.Updateheight() ParentNode.updateClosestIntercept() ParentNode.updateClosestAdopter() else: #add the child to the the closest adpter in the tree RootNode["ClosestAdNode"].AddChild(ChildNode) #remove the connection ParentNode.unstream(ChildNode) try: for i in range(0, len(ParentNode["ChildList"])): if ParentNode["ChildList"][i]["TNodeID"] == TreeNodeInfo["TNodeID"]: rIndex = i ParentNode["ChildList"].pop(rIndex) except: logToFile("unable to remove a child from a parent") #ParentNode["ChildList"].remove(ChildNode) #update the meta data ParentNode.Updateheight() ParentNode.updateClosestIntercept() ParentNode.updateClosestAdopter() def DeleteFromTree(self,i_NodeID): #update the number of vacant spots in the tree. f=open("TrackerData", "a") f.write("\n curent nodes in this tree "+str(self["NodeList"].keys())) f.close() if not self["NodeList"].has_key(i_NodeID): print " client" , str(i_NodeID), " doesn't exist in this tree" return TreeNodeInfo = self["NodeList"][i_NodeID] self["VacSpots"] = self["VacSpots"] + 1 - TreeNodeInfo["NbrChildAllowed"] if (len(TreeNodeInfo["ChildList"]) == 0): #this node is a leaf #disconnect child from parent and update all parameter ParentNode = TreeNodeInfo["ParentList"][0] ParentNode.unstream(TreeNodeInfo) #oTree["NodeList"][i_Node["NodeID"]] = i_TreeNode for i in range(0, len(ParentNode["ChildList"])): if (ParentNode["ChildList"][i]["TNodeID"] == TreeNodeInfo["TNodeID"]): rIndex = i ParentNode["ChildList"].pop(rIndex) #I am pretty sure that we need the index of it,rather than anything. (easy fix though) self["NodeList"].pop(TreeNodeInfo["TNodeID"]) ParentNode.Updateheight() ParentNode.updateClosestIntercept() ParentNode.updateClosestAdopter() f=open("TrackerData", "a") f.write("\nHere "+str(TreeNodeInfo["TNodeID"])) f.close() else: f=open("TrackerData", "a") f.write("\nHere 2 "+str(TreeNodeInfo["TNodeID"])) f.close() childlist = [] childrenList = TreeNodeInfo["ChildList"] #remove the children from childList for i in range(0,len(TreeNodeInfo["ChildList"])): oChild = TreeNodeInfo["ChildList"][i] TreeNodeInfo.unstream(oChild) TreeNodeInfo["ChildList"] = [] ParentNode = TreeNodeInfo["ParentList"][0] ParentNode.unstream(TreeNodeInfo) #oTree["NodeList"][i_Node["NodeID"]] = i_TreeNode for i in range(0, len(ParentNode["ChildList"])): if ParentNode["ChildList"][i]["TNodeID"] == TreeNodeInfo["TNodeID"]: rIndex = i ParentNode["ChildList"].pop(rIndex) self["NodeList"].pop(TreeNodeInfo["TNodeID"]) ParentNode.Updateheight() ParentNode.updateClosestIntercept() ParentNode.updateClosestAdopter() #insert the children one by one into the tree starting with the one with closestAdaptorDistance SortedChildList = {} for i in range(0,len(childrenList)): SortedChildList[i] = childrenList[i]["ClosestAdDistance"] xSortedChildList = sorted(SortedChildList.iteritems(), key=operator.itemgetter(1) ,reverse=False) RootNode = self["NodeList"][self["RootID"]] self["Tracker"].CheckNeedAngels() for j in range(0, len(xSortedChildList)): oChildToBeAdded = childrenList[xSortedChildList[j][0]] #The Child Being appended could be a complete subtree. ClosesAdNode = RootNode["ClosestAdNode"] #Adapt this node as a direct child ClosesAdNode.AddChild(oChildToBeAdded) # tell this node to send the stream from this tree to this child ClosesAdNode.stream(oChildToBeAdded) #update the meta data ClosesAdNode.Updateheight() ClosesAdNode.updateClosestIntercept() ClosesAdNode.updateClosestAdopter() class Tracker(dict): global ASFHeader def __init__(self, i_Fanout, i_streamrate, i_SeederIP, i_SeederPort, i_NumofTrees, i_SeederCapacity, i_AngelInfo, i_StartUpDelay, i_PacketSize, i_MinThreshold, i_MaxThreshold, i_SequenceNbr, i_StartTime): self["NumTrees"] = i_NumofTrees self["IPAddress"] = "" self["PortNum"] = "" self["SeederIP"] = i_SeederIP self["SeederPort"] = int(i_SeederPort) self["SeederUpldCp"] = i_SeederCapacity self["Trees"] = [] #List of Trees which contain the summaries of the trees self["TreeInfo"] = {} #each tree will occupy a cell in this self["NodeInfo"] = {} #dictionary of node information self["StreamingRate"] = i_streamrate self["TotalVacantSpots"] = 0 self["d_Fanout"] = i_Fanout #d from the math. self["MinThreshold"] = int(i_MinThreshold) self["MaxThreshold"] = int(i_MaxThreshold) self["AngelInfo"] = i_AngelInfo #the IPs and ports of angels self["StartTime"] = i_StartTime self["PacketSize"] = i_PacketSize self["StartUpDelay"] = float(i_StartUpDelay) self["SequenceNbr"] = i_SequenceNbr #Just create an Extra NodeInfo based on the Seeder SeederNode = {} #SeederNode["IPAddress"] = i_SeederIP SeederNode["NodeID"] = i_SeederIP SeederNode["NodePort"] = int(i_SeederPort) SeederNode["FileID"] = "" SeederNode["UpldCp"] = i_SeederCapacity SeederNode["TreeIDs"] = [] # TreeNodeID TreeID relationship (Elaborate). self["NodeInfo"][SeederNode["NodeID"]] = SeederNode self.Seeder(SeederNode) self.CheckNeedAngels() print "Tree", self.PrintTrees() def PrintTrees(self): try: for iTreeKey in self["TreeInfo"].keys(): self.PrintTree(iTreeKey) except: traceback.print_exc() logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) def PrintTree(self, TreeID): #return oTree = self["TreeInfo"][TreeID] print "" print "" print "TreeID = ", TreeID print "VacSpots", oTree["VacSpots"] #print "AngelList", oTree["AngelList"] print "RootID", oTree["RootID"] print "Peer List:" for nodekey in oTree["NodeList"].keys(): oNode = oTree["NodeList"][nodekey] print "\tPeerID", oNode["TNodeID"] if (oNode["ClosestAdNode"] != None): print "\tClosestAdNode", oNode["ClosestAdNode"]["TNodeID"] else: print "\tClosestAdNode", " Empty" print "\tClosestAdDistance", oNode["ClosestAdDistance"] if (oNode["ClosestIntNode"] != None): print "\tClosestIntNode", oNode["ClosestIntNode"]["TNodeID"] else: print "\tClosestIntNode", " Empty" print "\tClosestIntDistance", oNode["ClosestIntDistance"] print "\tNbrChildAllowed", oNode["NbrChildAllowed"] strChild = "[" for i in range(0,len(oNode["ChildList"])): strChild = strChild + oNode["ChildList"][i]["TNodeID"] + "\t" strChild = strChild + "]" print "\tChildList", strChild print "\tParentList", oNode["ParentList"][0]["TNodeID"] print "\theight", oNode["height"] print "" print "" print "Angel List:" for oNodeKey in oTree["AngelList"].keys(): oNode = oTree["AngelList"][oNodeKey] print "\tPeerID", oNode["TNodeID"] if (oNode["ClosestAdNode"] != None): print "\tClosestAdNode", oNode["ClosestAdNode"]["TNodeID"] else: print "\tClosestAdNode", " Empty" print "\tClosestAdDistance", oNode["ClosestAdDistance"] if (oNode["ClosestIntNode"] != None): print "\tClosestIntNode", oNode["ClosestIntNode"]["TNodeID"] else: print "\tClosestIntNode", " Empty" print "\tClosestIntDistance", oNode["ClosestIntDistance"] print "\tNbrChildAllowed", oNode["NbrChildAllowed"] strChild = "[" for i in range(0,len(oNode["ChildList"])): strChild = strChild + oNode["ChildList"][i]["TNodeID"] + "\t" strChild = strChild + "]" print "\tChildList", strChild print "\tParentList", oNode["ParentList"][0]["TNodeID"] print "\theight", oNode["height"] print "" print "" def Seeder(self, SeederNode): # make the seeder join all the trees as a root and assign how many vacant spots it will have in each. TreeStreamRate = float(self["StreamingRate"])/self["NumTrees"] TotnbrofChildren = int(SeederNode["UpldCp"]/ TreeStreamRate) self["TotalVacantSpots"] = TotnbrofChildren i = TotnbrofChildren - self["NumTrees"] # i is the excess after adding one spot in each tree # maxChildren is the maximum number of children in a tree = min(fanout, MinThreshold+2) if self["MinThreshold"]+2 > self["d_Fanout"]: maxChildren = self["d_Fanout"] else: maxChildren =self["MinThreshold"]+2 # assign vacantspots to each tree for j in range(0,self["NumTrees"]): if (i>= maxChildren): # the client can have d children in this tree k = maxChildren elif (i>0): # the client can have i possibleChildren in this tree k = i + 1 else: # the client can have 0 possibleChildren in this tree k = 1 i = i - (k -1) oTree = cTree(k, self) self["TreeInfo"][oTree["TreeID"]] = oTree i_RootNode = TreeNode(k,oTree["TreeID"],SeederNode["NodeID"], int(SeederNode["NodePort"]), self["PacketSize"] ) oTree["NodeList"][SeederNode["NodeID"]] = i_RootNode SeederNode["TreeIDs"] = oTree["TreeID"] oTree["RootID"] = SeederNode["NodeID"] i_RootNode["ParentList"].append(i_RootNode) i_RootNode.Updateheight() i_RootNode.updateClosestIntercept() i_RootNode.updateClosestAdopter() def CheckNeedAngels(self): # to avoid adding Angels set self["MinThreshold"] to 0 for i_Treekey in self["TreeInfo"].keys(): oTree = self["TreeInfo"][i_Treekey] if (oTree["VacSpots"] < self["MinThreshold"]): logToFile("Found that an angel is needed as vacantspost = " + str(oTree["VacSpots"]) +" and minthreshold = "+ str(self["MinThreshold"])+ " available angels" +str(len(self["AngelInfo"])) ) if (len(self["AngelInfo"]) > 0): #print "Adding an angel in tree number ", oTree["TreeID"] logToFile("Adding an angel in tree number "+ str(oTree["TreeID"])) AngelInfo_now = self["AngelInfo"].pop() angelarg = AngelInfo_now.split("|") #treenode (NumberofChildren, treeID,IP,port) angel = TreeNode(int(angelarg[2]),oTree["TreeID"],angelarg[0], angelarg[1], self["PacketSize"]) self.JoinTree(angel, i_Treekey) oTree["AngelList"][angel["TNodeID"]] = angel #oTree["AngelList"].append(angel) #print "Tree", self.PrintTrees() elif (oTree["VacSpots"] > self["MaxThreshold"]): if len(oTree["AngelList"]) > 0: logToFile("Removing an angel from tree number " + str(oTree["TreeID"])) angel = oTree["AngelList"].pop() self.IamLeaving(angel) recycleAngel = str(angel["NodeID"]), "|",str(angel["NodePort"]) self["AngelInfo"].insert(0,recycleAngel) print "Tree", self.PrintTrees() def ReceiveRequest(self, i_request): #msgformat msgtype|nodeid|NodePort|FileID|UploadCapacity arrRequest = i_request.split("|") if arrRequest[0] == "1": #print "Received Complaint Request" , i_request #This means that we are unable to reach parents. NodeInfo = arrRequest[1] NodePort = arrRequest[2] TreeInfo = arrRequest[3] self.unableToReachParent(NodeInfo, TreeInfo) if arrRequest[0] == "2": #This means that we are unable to reach child. NodeInfo = arrRequest[1] TreeInfo = arrRequest[2] ChildNodeInfo = arrRequest[3] self.unableToReachChild(NodeInfo, TreeInfo, ChildNodeInfo) if arrRequest[0] == "3": #This means that the client is finishing and he will leave. NodeInfo = self["NodeInfo"][arrRequest[1]] self.IamLeaving(NodeInfo) if arrRequest[0] == "0": #This means that the node is a new node, Save the info in the node info dictionary. (key is the nodeID) NodeInfo = {} NodeInfo["NodeID"] = arrRequest[1] #Static_NodeID() NodeInfo["NodePort"] = int(arrRequest[2]) NodeInfo["FileID"] = arrRequest[3] NodeInfo["UpldCp"] = arrRequest[4] NodeInfo["TreeIDs"] = [] # TreeNodeID TreeID relationship (Elaborate). if (self["NodeInfo"].get(NodeInfo["NodeID"]) != None ): print "Node " , NodeInfo["NodeID"] , "already exists" return self["NodeInfo"][NodeInfo["NodeID"]] = NodeInfo self.SubscribeToStream(NodeInfo) if arrRequest[0] == "4": #this means an angel is offering help #angelData IP|port|#Children #arrRequest[3] is upload capacity #print "arrRequest", arrRequest angelData = arrRequest[1] + "|" + arrRequest[2] + "|" + str(int(arrRequest[4])* int(self["NumTrees"])/int(float(self["StreamingRate"]))) print "angelData", angelData self["AngelInfo"].insert(0, angelData) #reply with a welcome msg TimeNow = time.gmtime() #ChunkID = (((TimeNow - self["StartTime"]) * self["StreamingRate"])/ self["PacketSize"]) + 1 ChunkID = int(self["SequenceNbr"]) + int(((time.mktime(TimeNow) - time.mktime(self["StartTime"])) * float(self["StreamingRate"]))/ int(self["PacketSize"])) + 1 sMsg = "Welcome|"+str(self["NumTrees"])+"|"+ str(ChunkID) +"|" + str(self["StartUpDelay"]) + "|" + str(self["StreamingRate"]) + "|" + str(self["PacketSize"]) l1 = chr(int(len(sMsg)/256)) l2 = chr(int(len(sMsg)%256)) sMsg = "TRACKER" + l1 + l2 + sMsg self.sendMessage(arrRequest[1],arrRequest[2],sMsg) self.CheckNeedAngels() if arrRequest[0] == "BEACON": #This message is from the Source. self["SequenceNbr"] = arrRequest[1] self["StreamingRate"] = arrRequest[2] self["StartTime"] = time.gmtime() logToFile("Streaming rate: " +str(self["StreamingRate"])) print "Streaming rate: " , str(self["StreamingRate"]) def unableToReachParent(self, ChildNodeID, TreeID): oTree = self["TreeInfo"][int(TreeID)] oChildTreeNode = oTree["NodeList"][ChildNodeID] oParentTreeNode = oChildTreeNode["ParentList"][0] ParentNodeID = oParentTreeNode["TNodeID"] logToFile("child " + str(oChildTreeNode) + " is unable to reach parent " + str(ParentNodeID) +" in tree " + str(TreeID)) if (oParentTreeNode.Probe()): oTree.ChangeConnection(ParentNodeID,ChildNodeID) logToFile("probe to " + str(ParentNodeID) + " sucesseded changing the connection" ) else: self.IamLeaving(self["NodeInfo"][ParentNodeID]) logToFile("probe to " + str(ParentNodeID) + " failed, deleting it" ) def unableToReachChild(self,ParentNodeID , TreeID, ChildNodeID): #print TreeID, self["TreeInfo"] oTree = self["TreeInfo"][int(TreeID)] oChildTreeNode = oTree["NodeList"][ChildNodeID] oParentTreeNode = oTree["NodeList"][ParentNodeID] logToFile("parent " + str(ParentNodeID) + " is unable to reach child " + str(ChildNodeID) +" in tree " + str(TreeID)) if (oChildTreeNode.Probe()): oTree.ChangeConnection(ParentNodeID,ChildNodeID) logToFile("probe to " + str(ChildNodeID) + " sucesseded changing the connection" ) else: self.IamLeaving(self["NodeInfo"][ChildNodeID]) logToFile("probe to " + str(ChildNodeID) + " failed, deleting it" ) def IamLeaving(self, i_NodeInfo): TreeStreamRate = float(self["StreamingRate"])/self["NumTrees"] TotnbrofChildren = int(float(i_NodeInfo["UpldCp"]) / TreeStreamRate) self["TotalVacantSpots"] = self["TotalVacantSpots"] - TotnbrofChildren - len(self["TreeInfo"].keys()) f=open("TrackerData", "a") f.write("\nThis Node is leaving "+str(i_NodeInfo["NodeID"])) f.close() for i_Treekey in self["TreeInfo"].keys(): #f=open("TrackerData", "a") #f.write("\nThis Node is leaving "+str(i_NodeInfo["NodeID"])) #f.close() oTree = self["TreeInfo"][i_Treekey] #self.PrintTree(i_Treekey) oTree.DeleteFromTree( i_NodeInfo["NodeID"]) self["NodeInfo"].pop(i_NodeInfo["NodeID"]) self.CheckNeedAngels() # this function will check the health of the swarm and decide if angels are needed to be added or not f=open("TrackerData", "a") f.write("\ncurrentclients "+str(self["NodeInfo"].keys())) f.close() # this method send msgContent to the client identified by destinationIP, destinationPort def sendMessage(self, destinationIP, destinationPort, msgContent): global DEBUG tries=0 if ( DEBUG =="1"): return 0 while (True): # print msgContent s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: print destinationIP, destinationPort logToFile("Sending " +destinationIP+ " this msg "+msgContent[200:]) s.connect((destinationIP, int(destinationPort))) s.settimeout(300) #Set Time out on the connection. s.send(msgContent) break; except: # print "IN EXCEPTION1", sys.exc_info() # print str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2]) logToFile("Error in sedning "+destinationIP+ " this msg "+msgContent) logToFile(str(sys.exc_info()[0])+str(sys.exc_info()[1])+str(sys.exc_info()[2])) s.close() s = None tries=tries+1 if tries >= 10: return -1 if s is None: time.sleep(0.1) #print "msgContent" , msgContent # s.send(msgContent) #print "AFTER SEND" s.close() return 0 def SubscribeToStream(self, i_Node): global ASFHeader # TreeStreamRate = float(self["StreamingRate"])/self["NumTrees"] # TotnbrofChildren = int(float(i_Node["UpldCp"]) / TreeStreamRate) # self["TotalVacantSpots"] = self["TotalVacantSpots"] + TotnbrofChildren - self["NumTrees"] # i = TotnbrofChildren # send a msg to the node welcoming it and informing it how many trees there are TimeNow = time.gmtime() #f=open("TrackerData", "a") # f.write("\ncurrentclients "+str(self["NodeInfo"].keys())) # f.close() print self["SequenceNbr"] , time.mktime(TimeNow), time.mktime(self["StartTime"]), self["StreamingRate"], self["PacketSize"], self["SequenceNbr"] ChunkID = int(self["SequenceNbr"]) + int(((time.mktime(TimeNow) - time.mktime(self["StartTime"])) * float(self["StreamingRate"]))/ int(self["PacketSize"])) + 1 sMsg = "Welcome|"+str(self["NumTrees"])+"|"+ str(ChunkID) +"|" + str(self["StartUpDelay"]) + "|" + str(self["StreamingRate"]) + "|" + str(self["PacketSize"]) l1 = chr(int(len(sMsg)/256)) l2 = chr(int(len(sMsg)%256)) sMsg = "TRACKER" + l1 + l2 + sMsg + ASFHeader lError = self.sendMessage(i_Node["NodeID"],i_Node["NodePort"],sMsg) if lError <0: return TreeStreamRate = float(self["StreamingRate"])/self["NumTrees"] TotnbrofChildren = int(float(i_Node["UpldCp"]) / TreeStreamRate) self["TotalVacantSpots"] = self["TotalVacantSpots"] + TotnbrofChildren - self["NumTrees"] i = TotnbrofChildren SortedTreeList = {} for i_Treekey in self["TreeInfo"].keys(): SortedTreeList[i_Treekey] = self["TreeInfo"][i_Treekey]["VacSpots"] xSortedTreeList = sorted(SortedTreeList.iteritems(), key=operator.itemgetter(1) ,reverse=False) for j in range(0, len(xSortedTreeList)): i_Treekey = xSortedTreeList[j][0] k = 0 oTree = self["TreeInfo"][i_Treekey] if (i >= self["d_Fanout"]): k = self["d_Fanout"] elif(i > 0): k = i else: k = 0 i = i - k i_TreeNode = TreeNode(k,i_Treekey,i_Node["NodeID"], i_Node["NodePort"] ,self["PacketSize"]) oTree["NodeList"][i_Node["NodeID"]] = i_TreeNode i_Node["TreeIDs"].append(i_Treekey) self.JoinTree(i_TreeNode, i_Treekey) self.CheckNeedAngels() def JoinTree(self, i_TreeNode, TreeID): #The node should have been created already, and we are being passed the TreeID oTree = self["TreeInfo"][TreeID] if (oTree == []): print "There is a problem" logToFile("Error: No tree to add the treeNode to: Tree #", TreeID) return oTree["VacSpots"] = int(oTree["VacSpots"]) + (int(i_TreeNode["NbrChildAllowed"]) -1) if oTree["VacSpots"] < 0: print "There is a problem" logToFile("Number of vacant spots is negative cant add client #" +str(i_TreeNode["TNodeID"])+" to tree #"+ str(TreeID)) return oRootNode = oTree["NodeList"][oTree["RootID"]] if i_TreeNode["NbrChildAllowed"] ==0: oClosestAdapter = oRootNode["ClosestAdNode"] oClosestAdapter.AddChild(i_TreeNode) else: if (oRootNode["ClosestAdDistance"] >(oRootNode["ClosestIntDistance"]+1)): oClosestInter = oRootNode["ClosestIntNode"] oClosestInter.AddChild(i_TreeNode) else: oClosestAdapter = oRootNode["ClosestAdNode"] oClosestAdapter.AddChild(i_TreeNode) def main(): global DEBUG, ASFHeader TrackerIP = socket.gethostname() TrackerPort = int(sys.argv[1]) AngelInfo = [] DEBUG= "0" f = open(LogFileName, 'w') f.close() # Listen for incoming msgs from the seeder lstn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lstn.bind(('', TrackerPort)) lstn.listen(100) #May be use only one instead of 100 while 1: (clientsocket, clientAddress) = lstn.accept() clientMessage = clientsocket.recv(6) if clientMessage == "SEEDER": # check if the msg is really from the seeder clientMessage = clientsocket.recv(2) l = list(clientMessage) totsize = (ord(l[0]) * 2**8) + ord(l[1]) # read info about the stream strFirstMessage = clientsocket.recv(totsize) print "strFirstMessage", strFirstMessage data1 = clientsocket.recv(12) print "data1", data1 l = list(data1) totsize = (ord(l[3]) * 2**8) + ord(l[2]) - 8 data2 = "" while (len(data2)