This project has retired. For details please refer to its
Attic page.
MemoryObserver xref
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.worker;
20
21 import org.apache.giraph.conf.BooleanConfOption;
22 import org.apache.giraph.conf.FloatConfOption;
23 import org.apache.giraph.conf.GiraphConfiguration;
24 import org.apache.giraph.conf.IntConfOption;
25 import org.apache.giraph.utils.MemoryUtils;
26 import org.apache.giraph.utils.ThreadUtils;
27 import org.apache.giraph.zk.ZooKeeperExt;
28 import org.apache.log4j.Logger;
29 import org.apache.zookeeper.CreateMode;
30 import org.apache.zookeeper.KeeperException;
31 import org.apache.zookeeper.ZooDefs;
32
33 import java.util.concurrent.atomic.AtomicLong;
34
35
36
37
38
39 public class MemoryObserver {
40
41 public static final BooleanConfOption USE_MEMORY_OBSERVER =
42 new BooleanConfOption("giraph.memoryObserver.enabled", false,
43 "Whether or not to use memory observer");
44
45 public static final FloatConfOption FREE_MEMORY_FRACTION_FOR_GC =
46 new FloatConfOption("giraph.memoryObserver.freeMemoryFractionForGc", 0.1f,
47 "For which fraction of free memory will we issue manual gc calls");
48
49 public static final IntConfOption MIN_MS_BETWEEN_FULL_GCS =
50 new IntConfOption("giraph.memoryObserver.minMsBetweenFullGcs", 60 * 1000,
51 "Minimum milliseconds between two manual gc calls");
52
53
54 private static final Logger LOG = Logger.getLogger(MemoryObserver.class);
55
56 private static final int MEMORY_OBSERVER_SLEEP_MS = 1000;
57
58
59 private final AtomicLong lastManualGc = new AtomicLong();
60
61 private final ZooKeeperExt zk;
62
63 private final String zkPath;
64
65 private final int minMsBetweenFullGcs;
66
67
68
69
70
71
72
73
74 public MemoryObserver(final ZooKeeperExt zk,
75 final String zkPath, GiraphConfiguration conf) {
76 this.zk = zk;
77 this.zkPath = zkPath;
78 minMsBetweenFullGcs = MIN_MS_BETWEEN_FULL_GCS.get(conf);
79
80 if (!USE_MEMORY_OBSERVER.get(conf)) {
81 return;
82 }
83
84 try {
85
86 zk.createOnceExt(zkPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
87 CreateMode.PERSISTENT, true);
88 } catch (KeeperException | InterruptedException e) {
89 LOG.info("Exception occurred", e);
90 }
91 setWatcher();
92
93 final float freeMemoryFractionForGc =
94 FREE_MEMORY_FRACTION_FOR_GC.get(conf);
95 ThreadUtils.startThread(new Runnable() {
96 @Override
97 public void run() {
98
99 while (true) {
100 double freeMemoryFraction = MemoryUtils.freeMemoryFraction();
101 long msFromLastGc = System.currentTimeMillis() - lastManualGc.get();
102 if (msFromLastGc > minMsBetweenFullGcs &&
103 freeMemoryFraction < freeMemoryFractionForGc) {
104 try {
105 if (LOG.isInfoEnabled()) {
106 LOG.info("Notifying others about low memory (" +
107 freeMemoryFraction + "% free)");
108 }
109 zk.createExt(
110 zkPath + "/" + System.currentTimeMillis(),
111 new byte[0],
112 ZooDefs.Ids.OPEN_ACL_UNSAFE,
113 CreateMode.EPHEMERAL,
114 false);
115 } catch (KeeperException | InterruptedException e) {
116 LOG.warn("Exception occurred", e);
117 }
118 }
119 if (!ThreadUtils.trySleep(MEMORY_OBSERVER_SLEEP_MS)) {
120 return;
121 }
122 }
123 }
124 }, "memory-observer");
125 }
126
127
128 private void setWatcher() {
129 try {
130
131 zk.getChildrenExt(zkPath, true, false, false);
132 } catch (KeeperException | InterruptedException e) {
133 LOG.warn("Exception occurred", e);
134 }
135 }
136
137
138 public void callGc() {
139 long last = lastManualGc.get();
140 if (System.currentTimeMillis() - last > minMsBetweenFullGcs &&
141 lastManualGc.compareAndSet(last, System.currentTimeMillis())) {
142 if (LOG.isInfoEnabled()) {
143 LOG.info("Calling gc manually");
144 }
145 System.gc();
146 if (LOG.isInfoEnabled()) {
147 LOG.info("Manual gc call done");
148 }
149 }
150 setWatcher();
151 }
152 }