Skip to content

Commit 3ce3fa3

Browse files
authored
Add files via upload
1 parent f5b7955 commit 3ce3fa3

1 file changed

Lines changed: 294 additions & 0 deletions

File tree

scheduler.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
package scheduler
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"k8s.io/client-go/tools/clientcmd"
7+
"k8s.io/client-go/util/homedir"
8+
"log"
9+
"math/rand"
10+
"os/exec"
11+
"path/filepath"
12+
"time"
13+
14+
"errors"
15+
"k8s.io/api/core/v1"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/labels"
18+
"k8s.io/apimachinery/pkg/util/wait"
19+
"k8s.io/client-go/informers"
20+
"k8s.io/client-go/kubernetes"
21+
listersv1 "k8s.io/client-go/listers/core/v1"
22+
"k8s.io/client-go/tools/cache"
23+
)
24+
25+
const schedulerName = "my-scheduler"
26+
27+
type predicateFunc func(node *v1.Node, pod *v1.Pod) bool
28+
type priorityFunc func(node *v1.Node, pod *v1.Pod) int
29+
30+
type Scheduler struct {
31+
clientset *kubernetes.Clientset
32+
podQueue chan *v1.Pod
33+
nodeLister listersv1.NodeLister
34+
predicates []predicateFunc
35+
priorities []priorityFunc
36+
}
37+
38+
func NewScheduler(podQueue chan *v1.Pod, quit chan struct{}) Scheduler {
39+
40+
var kubeconfig *string
41+
if home := homedir.HomeDir(); home != "" {
42+
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
43+
} else {
44+
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
45+
}
46+
flag.Parse()
47+
48+
49+
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
50+
if err != nil {
51+
panic(err)
52+
}
53+
54+
clientset, err := kubernetes.NewForConfig(config)
55+
if err != nil {
56+
log.Fatal(err)
57+
}
58+
59+
return Scheduler{
60+
clientset: clientset,
61+
podQueue: podQueue,
62+
nodeLister: initInformers(clientset, podQueue, quit),
63+
predicates: []predicateFunc{
64+
capacityPredicate,
65+
},
66+
priorities: []priorityFunc{
67+
randomPriority,
68+
},
69+
}
70+
}
71+
72+
func initInformers(clientset *kubernetes.Clientset, podQueue chan *v1.Pod, quit chan struct{}) listersv1.NodeLister {
73+
factory := informers.NewSharedInformerFactory(clientset, 0)
74+
75+
nodeInformer := factory.Core().V1().Nodes()
76+
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
77+
AddFunc: func(obj interface{}) {
78+
node, ok := obj.(*v1.Node)
79+
if !ok {
80+
log.Println("this is not a node")
81+
return
82+
}
83+
log.Printf("New Node Added to Store: %s", node.GetName())
84+
},
85+
})
86+
87+
podInformer := factory.Core().V1().Pods()
88+
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
89+
AddFunc: func(obj interface{}) {
90+
pod, ok := obj.(*v1.Pod)
91+
if !ok {
92+
log.Println("this is not a pod")
93+
return
94+
}
95+
if pod.Spec.NodeName == "" && pod.Spec.SchedulerName == schedulerName {
96+
podQueue <- pod
97+
}
98+
},
99+
UpdateFunc: nil,
100+
DeleteFunc: nil,
101+
})
102+
103+
factory.Start(quit)
104+
return nodeInformer.Lister()
105+
}
106+
107+
func main() {
108+
fmt.Println("I'm a scheduler!")
109+
110+
rand.Seed(time.Now().Unix())
111+
112+
podQueue := make(chan *v1.Pod, 100000)
113+
defer close(podQueue)
114+
115+
quit := make(chan struct{})
116+
defer close(quit)
117+
118+
scheduler := NewScheduler(podQueue, quit)
119+
scheduler.Run(quit)
120+
}
121+
122+
func (s *Scheduler) Run(quit chan struct{}) {
123+
wait.Until(s.ScheduleOne, 0, quit)
124+
}
125+
126+
func (s *Scheduler) ScheduleOne() {
127+
128+
p := <-s.podQueue
129+
fmt.Println("found a pod to schedule:", p.Namespace, "/", p.Name)
130+
131+
node, err := s.findFit(p)
132+
if err != nil {
133+
log.Println("cannot find node that fits pod", err.Error())
134+
fmt.Println("Creating New Node ... \n")
135+
136+
//TODO: find the write name for command
137+
cmd := exec.Command("source","/home/ubuntu/openstack-api/create-server.sh")
138+
out, err := cmd.Output()
139+
log.Println("Output\n",out,"Error:\n",err)
140+
time.Sleep(10)
141+
return
142+
}
143+
144+
err = s.bindPod(p, node)
145+
if err != nil {
146+
log.Println("failed to bind pod", err.Error())
147+
return
148+
}
149+
150+
message := fmt.Sprintf("Placed pod [%s/%s] on %s\n", p.Namespace, p.Name, node)
151+
152+
err = s.emitEvent(p, message)
153+
if err != nil {
154+
log.Println("failed to emit scheduled event", err.Error())
155+
return
156+
}
157+
158+
fmt.Println(message)
159+
}
160+
161+
func (s *Scheduler) findFit(pod *v1.Pod) (string, error) {
162+
nodes, err := s.nodeLister.List(labels.Everything())
163+
if err != nil {
164+
return "", err
165+
}
166+
167+
filteredNodes := s.runPredicates(nodes, pod)
168+
if len(filteredNodes) == 0 {
169+
s.podQueue <- pod //back to queue for rescheduling
170+
return "", errors.New("failed to find node that fits pod")
171+
}
172+
priorities := s.prioritize(filteredNodes, pod)
173+
return s.findBestNode(priorities), nil
174+
}
175+
176+
func (s *Scheduler) bindPod(p *v1.Pod, node string) error {
177+
return s.clientset.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
178+
ObjectMeta: metav1.ObjectMeta{
179+
Name: p.Name,
180+
Namespace: p.Namespace,
181+
},
182+
Target: v1.ObjectReference{
183+
APIVersion: "v1",
184+
Kind: "Node",
185+
Name: node,
186+
},
187+
})
188+
}
189+
190+
func (s *Scheduler) emitEvent(p *v1.Pod, message string) error {
191+
timestamp := time.Now().UTC()
192+
_, err := s.clientset.CoreV1().Events(p.Namespace).Create(&v1.Event{
193+
Count: 1,
194+
Message: message,
195+
Reason: "Scheduled",
196+
LastTimestamp: metav1.NewTime(timestamp),
197+
FirstTimestamp: metav1.NewTime(timestamp),
198+
Type: "Normal",
199+
Source: v1.EventSource{
200+
Component: schedulerName,
201+
},
202+
InvolvedObject: v1.ObjectReference{
203+
Kind: "Pod",
204+
Name: p.Name,
205+
Namespace: p.Namespace,
206+
UID: p.UID,
207+
},
208+
ObjectMeta: metav1.ObjectMeta{
209+
GenerateName: p.Name + "-",
210+
},
211+
})
212+
if err != nil {
213+
return err
214+
}
215+
return nil
216+
}
217+
218+
func (s *Scheduler) runPredicates(nodes []*v1.Node, pod *v1.Pod) []*v1.Node {
219+
filteredNodes := make([]*v1.Node, 0)
220+
for _, node := range nodes {
221+
if s.predicatesApply(node, pod) {
222+
filteredNodes = append(filteredNodes, node)
223+
}
224+
}
225+
log.Println("nodes that fit:")
226+
for _, n := range filteredNodes {
227+
log.Println(n.Name)
228+
}
229+
return filteredNodes
230+
}
231+
232+
func (s *Scheduler) predicatesApply(node *v1.Node, pod *v1.Pod) bool {
233+
for _, predicate := range s.predicates {
234+
if !predicate(node, pod) {
235+
return false
236+
}
237+
}
238+
return true
239+
}
240+
241+
func capacityPredicate(node *v1.Node, pod *v1.Pod) bool{
242+
nodeCPUQuantity := node.Status.Allocatable.Cpu().MilliValue()
243+
//if !ok {
244+
// panic("sth is wrong in capacity predicate check 1!")
245+
//}
246+
podCPUQuantity := pod.Spec.Containers[0].Resources.Limits.Cpu().MilliValue()
247+
//if !ok {
248+
// panic("sth is wrong in capacity predicate check 2!")
249+
//}
250+
nodeMemQuantity := node.Status.Allocatable.Memory().MilliValue()
251+
//if !ok {
252+
// panic("sth is wrong in capacity predicate check! 3 ")
253+
//}
254+
podMemQuantity := pod.Spec.Containers[0].Resources.Limits.Memory().MilliValue()
255+
//if !ok {
256+
// panic("sth is wrong in capacity predicate check! 4")
257+
//}
258+
return (nodeCPUQuantity > podCPUQuantity) && (nodeMemQuantity > podMemQuantity)
259+
}
260+
261+
func randomPredicate(node *v1.Node, pod *v1.Pod) bool {
262+
r := rand.Intn(2)
263+
return r == 0
264+
}
265+
266+
func (s *Scheduler) prioritize(nodes []*v1.Node, pod *v1.Pod) map[string]int {
267+
priorities := make(map[string]int)
268+
for _, node := range nodes {
269+
for _, priority := range s.priorities {
270+
priorities[node.Name] += priority(node, pod)
271+
}
272+
}
273+
log.Println("calculated priorities:", priorities)
274+
return priorities
275+
}
276+
277+
func (s *Scheduler) findBestNode(priorities map[string]int) string {
278+
var maxP int
279+
var bestNode string
280+
for node, p := range priorities {
281+
if p > maxP {
282+
maxP = p
283+
bestNode = node
284+
}
285+
}
286+
return bestNode
287+
}
288+
289+
func randomPriority(node *v1.Node, pod *v1.Pod) int {
290+
if node.Name == "master-node"{
291+
return -1
292+
}
293+
return rand.Intn(100)
294+
}

0 commit comments

Comments
 (0)