1+ package scheduler
2+
3+ import (
4+ "flag"
5+ "fmt"
6+ "k8s.io/client-go/tools/clientcmd"
7+ "k8s.io/client-go/util/homedir"
8+ "log"
9+ "math/rand"
10+ "os/exec"
11+ "path/filepath"
12+ "time"
13+
14+ "errors"
15+ "k8s.io/api/core/v1"
16+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+ "k8s.io/apimachinery/pkg/labels"
18+ "k8s.io/apimachinery/pkg/util/wait"
19+ "k8s.io/client-go/informers"
20+ "k8s.io/client-go/kubernetes"
21+ listersv1 "k8s.io/client-go/listers/core/v1"
22+ "k8s.io/client-go/tools/cache"
23+ )
24+
25+ const schedulerName = "my-scheduler"
26+
27+ type predicateFunc func (node * v1.Node , pod * v1.Pod ) bool
28+ type priorityFunc func (node * v1.Node , pod * v1.Pod ) int
29+
30+ type Scheduler struct {
31+ clientset * kubernetes.Clientset
32+ podQueue chan * v1.Pod
33+ nodeLister listersv1.NodeLister
34+ predicates []predicateFunc
35+ priorities []priorityFunc
36+ }
37+
38+ func NewScheduler (podQueue chan * v1.Pod , quit chan struct {}) Scheduler {
39+
40+ var kubeconfig * string
41+ if home := homedir .HomeDir (); home != "" {
42+ kubeconfig = flag .String ("kubeconfig" , filepath .Join (home , ".kube" , "config" ), "(optional) absolute path to the kubeconfig file" )
43+ } else {
44+ kubeconfig = flag .String ("kubeconfig" , "" , "absolute path to the kubeconfig file" )
45+ }
46+ flag .Parse ()
47+
48+
49+ config , err := clientcmd .BuildConfigFromFlags ("" , * kubeconfig )
50+ if err != nil {
51+ panic (err )
52+ }
53+
54+ clientset , err := kubernetes .NewForConfig (config )
55+ if err != nil {
56+ log .Fatal (err )
57+ }
58+
59+ return Scheduler {
60+ clientset : clientset ,
61+ podQueue : podQueue ,
62+ nodeLister : initInformers (clientset , podQueue , quit ),
63+ predicates : []predicateFunc {
64+ capacityPredicate ,
65+ },
66+ priorities : []priorityFunc {
67+ randomPriority ,
68+ },
69+ }
70+ }
71+
72+ func initInformers (clientset * kubernetes.Clientset , podQueue chan * v1.Pod , quit chan struct {}) listersv1.NodeLister {
73+ factory := informers .NewSharedInformerFactory (clientset , 0 )
74+
75+ nodeInformer := factory .Core ().V1 ().Nodes ()
76+ nodeInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
77+ AddFunc : func (obj interface {}) {
78+ node , ok := obj .(* v1.Node )
79+ if ! ok {
80+ log .Println ("this is not a node" )
81+ return
82+ }
83+ log .Printf ("New Node Added to Store: %s" , node .GetName ())
84+ },
85+ })
86+
87+ podInformer := factory .Core ().V1 ().Pods ()
88+ podInformer .Informer ().AddEventHandler (cache.ResourceEventHandlerFuncs {
89+ AddFunc : func (obj interface {}) {
90+ pod , ok := obj .(* v1.Pod )
91+ if ! ok {
92+ log .Println ("this is not a pod" )
93+ return
94+ }
95+ if pod .Spec .NodeName == "" && pod .Spec .SchedulerName == schedulerName {
96+ podQueue <- pod
97+ }
98+ },
99+ UpdateFunc : nil ,
100+ DeleteFunc : nil ,
101+ })
102+
103+ factory .Start (quit )
104+ return nodeInformer .Lister ()
105+ }
106+
107+ func main () {
108+ fmt .Println ("I'm a scheduler!" )
109+
110+ rand .Seed (time .Now ().Unix ())
111+
112+ podQueue := make (chan * v1.Pod , 100000 )
113+ defer close (podQueue )
114+
115+ quit := make (chan struct {})
116+ defer close (quit )
117+
118+ scheduler := NewScheduler (podQueue , quit )
119+ scheduler .Run (quit )
120+ }
121+
122+ func (s * Scheduler ) Run (quit chan struct {}) {
123+ wait .Until (s .ScheduleOne , 0 , quit )
124+ }
125+
126+ func (s * Scheduler ) ScheduleOne () {
127+
128+ p := <- s .podQueue
129+ fmt .Println ("found a pod to schedule:" , p .Namespace , "/" , p .Name )
130+
131+ node , err := s .findFit (p )
132+ if err != nil {
133+ log .Println ("cannot find node that fits pod" , err .Error ())
134+ fmt .Println ("Creating New Node ... \n " )
135+
136+ //TODO: find the write name for command
137+ cmd := exec .Command ("source" ,"/home/ubuntu/openstack-api/create-server.sh" )
138+ out , err := cmd .Output ()
139+ log .Println ("Output\n " ,out ,"Error:\n " ,err )
140+ time .Sleep (10 )
141+ return
142+ }
143+
144+ err = s .bindPod (p , node )
145+ if err != nil {
146+ log .Println ("failed to bind pod" , err .Error ())
147+ return
148+ }
149+
150+ message := fmt .Sprintf ("Placed pod [%s/%s] on %s\n " , p .Namespace , p .Name , node )
151+
152+ err = s .emitEvent (p , message )
153+ if err != nil {
154+ log .Println ("failed to emit scheduled event" , err .Error ())
155+ return
156+ }
157+
158+ fmt .Println (message )
159+ }
160+
161+ func (s * Scheduler ) findFit (pod * v1.Pod ) (string , error ) {
162+ nodes , err := s .nodeLister .List (labels .Everything ())
163+ if err != nil {
164+ return "" , err
165+ }
166+
167+ filteredNodes := s .runPredicates (nodes , pod )
168+ if len (filteredNodes ) == 0 {
169+ s .podQueue <- pod //back to queue for rescheduling
170+ return "" , errors .New ("failed to find node that fits pod" )
171+ }
172+ priorities := s .prioritize (filteredNodes , pod )
173+ return s .findBestNode (priorities ), nil
174+ }
175+
176+ func (s * Scheduler ) bindPod (p * v1.Pod , node string ) error {
177+ return s .clientset .CoreV1 ().Pods (p .Namespace ).Bind (& v1.Binding {
178+ ObjectMeta : metav1.ObjectMeta {
179+ Name : p .Name ,
180+ Namespace : p .Namespace ,
181+ },
182+ Target : v1.ObjectReference {
183+ APIVersion : "v1" ,
184+ Kind : "Node" ,
185+ Name : node ,
186+ },
187+ })
188+ }
189+
190+ func (s * Scheduler ) emitEvent (p * v1.Pod , message string ) error {
191+ timestamp := time .Now ().UTC ()
192+ _ , err := s .clientset .CoreV1 ().Events (p .Namespace ).Create (& v1.Event {
193+ Count : 1 ,
194+ Message : message ,
195+ Reason : "Scheduled" ,
196+ LastTimestamp : metav1 .NewTime (timestamp ),
197+ FirstTimestamp : metav1 .NewTime (timestamp ),
198+ Type : "Normal" ,
199+ Source : v1.EventSource {
200+ Component : schedulerName ,
201+ },
202+ InvolvedObject : v1.ObjectReference {
203+ Kind : "Pod" ,
204+ Name : p .Name ,
205+ Namespace : p .Namespace ,
206+ UID : p .UID ,
207+ },
208+ ObjectMeta : metav1.ObjectMeta {
209+ GenerateName : p .Name + "-" ,
210+ },
211+ })
212+ if err != nil {
213+ return err
214+ }
215+ return nil
216+ }
217+
218+ func (s * Scheduler ) runPredicates (nodes []* v1.Node , pod * v1.Pod ) []* v1.Node {
219+ filteredNodes := make ([]* v1.Node , 0 )
220+ for _ , node := range nodes {
221+ if s .predicatesApply (node , pod ) {
222+ filteredNodes = append (filteredNodes , node )
223+ }
224+ }
225+ log .Println ("nodes that fit:" )
226+ for _ , n := range filteredNodes {
227+ log .Println (n .Name )
228+ }
229+ return filteredNodes
230+ }
231+
232+ func (s * Scheduler ) predicatesApply (node * v1.Node , pod * v1.Pod ) bool {
233+ for _ , predicate := range s .predicates {
234+ if ! predicate (node , pod ) {
235+ return false
236+ }
237+ }
238+ return true
239+ }
240+
241+ func capacityPredicate (node * v1.Node , pod * v1.Pod ) bool {
242+ nodeCPUQuantity := node .Status .Allocatable .Cpu ().MilliValue ()
243+ //if !ok {
244+ // panic("sth is wrong in capacity predicate check 1!")
245+ //}
246+ podCPUQuantity := pod .Spec .Containers [0 ].Resources .Limits .Cpu ().MilliValue ()
247+ //if !ok {
248+ // panic("sth is wrong in capacity predicate check 2!")
249+ //}
250+ nodeMemQuantity := node .Status .Allocatable .Memory ().MilliValue ()
251+ //if !ok {
252+ // panic("sth is wrong in capacity predicate check! 3 ")
253+ //}
254+ podMemQuantity := pod .Spec .Containers [0 ].Resources .Limits .Memory ().MilliValue ()
255+ //if !ok {
256+ // panic("sth is wrong in capacity predicate check! 4")
257+ //}
258+ return (nodeCPUQuantity > podCPUQuantity ) && (nodeMemQuantity > podMemQuantity )
259+ }
260+
261+ func randomPredicate (node * v1.Node , pod * v1.Pod ) bool {
262+ r := rand .Intn (2 )
263+ return r == 0
264+ }
265+
266+ func (s * Scheduler ) prioritize (nodes []* v1.Node , pod * v1.Pod ) map [string ]int {
267+ priorities := make (map [string ]int )
268+ for _ , node := range nodes {
269+ for _ , priority := range s .priorities {
270+ priorities [node .Name ] += priority (node , pod )
271+ }
272+ }
273+ log .Println ("calculated priorities:" , priorities )
274+ return priorities
275+ }
276+
277+ func (s * Scheduler ) findBestNode (priorities map [string ]int ) string {
278+ var maxP int
279+ var bestNode string
280+ for node , p := range priorities {
281+ if p > maxP {
282+ maxP = p
283+ bestNode = node
284+ }
285+ }
286+ return bestNode
287+ }
288+
289+ func randomPriority (node * v1.Node , pod * v1.Pod ) int {
290+ if node .Name == "master-node" {
291+ return - 1
292+ }
293+ return rand .Intn (100 )
294+ }
0 commit comments