1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.netty;
2021import java.net.InetSocketAddress;
22import java.util.List;
23import com.google.common.collect.Lists;
2425import io.netty.channel.Channel;
26import io.netty.channel.ChannelFuture;
27import io.netty.channel.ChannelFutureListener;
28import org.apache.log4j.Logger;
293031/**32 * Maintains multiple channels and rotates between them. This is thread-safe.33 */34publicclassChannelRotater {
35/** Logger */36privatestaticfinal Logger LOG = Logger.getLogger(ChannelRotater.class);
37/** Index of last used channel */38privateint index = 0;
39/** Channel list */40privatefinal List<Channel> channelList = Lists.newArrayList();
41/** Task id of this channel */42privatefinal Integer taskId;
43/** Address these channels are associated with */44privatefinal InetSocketAddress address;
4546/**47 * Constructor48 *49 * @param taskId Id of the task these channels as associated with50 * @param address Address these channels are associated with51 */52publicChannelRotater(Integer taskId, InetSocketAddress address) {
53this.taskId = taskId;
54this.address = address;
55 }
5657public Integer getTaskId() {
58return taskId;
59 }
6061/**62 * Add a channel to the rotation63 *64 * @param channel Channel to add65 */66publicsynchronizedvoid addChannel(Channel channel) {
67synchronized (channelList) {
68 channelList.add(channel);
69 }
70 }
7172/**73 * Get the next channel74 *75 * @return Next channel76 */77publicsynchronized Channel nextChannel() {
78if (channelList.isEmpty()) {
79 LOG.warn("nextChannel: No channels exist for hostname " +
80 address.getHostName());
81returnnull;
82 }
8384 ++index;
85if (index >= channelList.size()) {
86 index = 0;
87 }
88return channelList.get(index);
89 }
9091/**92 * Remove the a channel93 *94 * @param channel Channel to remove95 * @return Return true if successful, false otherwise96 */97publicsynchronizedboolean removeChannel(Channel channel) {
98boolean success = channelList.remove(channel);
99if (index >= channelList.size()) {
100 index = 0;
101 }
102return success;
103 }
104105/**106 * Get the number of channels in this object107 *108 * @return Number of channels109 */110publicsynchronizedint size() {
111return channelList.size();
112 }
113114/**115 * Close the channels116 *117 * @param channelFutureListener If desired, pass in a channel future listener118 */119publicsynchronizedvoid closeChannels(
120 ChannelFutureListener channelFutureListener) {
121for (Channel channel : channelList) {
122 ChannelFuture channelFuture = channel.close();
123if (channelFutureListener != null) {
124 channelFuture.addListener(channelFutureListener);
125 }
126 }
127 }
128129/**130 * Get a copy of the channels131 *132 * @return Copy of the channels133 */134publicsynchronized Iterable<Channel> getChannels() {
135return Lists.newArrayList(channelList);
136 }
137 }