This project has retired. For details please refer to its Attic page.
DefaultMessageClasses xref
View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.giraph.conf;
19  
20  import java.io.DataInput;
21  import java.io.DataOutput;
22  import java.io.IOException;
23  
24  import org.apache.giraph.combiner.MessageCombiner;
25  import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
26  import org.apache.giraph.factories.DefaultMessageValueFactory;
27  import org.apache.giraph.factories.MessageValueFactory;
28  import org.apache.giraph.utils.ReflectionUtils;
29  import org.apache.giraph.utils.WritableUtils;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.io.WritableComparable;
32  import com.google.common.base.Preconditions;
33  
34  /**
35   * Default implementation of MessageClasses
36   *
37   * @param <I> Vertex id type
38   * @param <M> Message type
39   */
40  public class DefaultMessageClasses
41      <I extends WritableComparable, M extends Writable>
42      implements MessageClasses<I, M> {
43    /** message class */
44    private Class<M> messageClass;
45    /** message value factory class */
46    private Class<? extends MessageValueFactory<M>>
47    messageValueFactoryClass;
48    /** message combiner class */
49    private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
50    /** whether message class was manually modified in this superstep */
51    private boolean messageClassModified;
52    /** message encode and store type */
53    private MessageEncodeAndStoreType messageEncodeAndStoreType;
54  
55    /** Constructor */
56    public DefaultMessageClasses() {
57    }
58  
59    /**
60     * Constructor
61     * @param messageClass message class
62     * @param messageValueFactoryClass message value factory class
63     * @param messageCombinerClass message combiner class
64     * @param messageEncodeAndStoreType message encode and store type
65     */
66    public DefaultMessageClasses(
67        Class<M> messageClass,
68        Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
69        Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
70          MessageEncodeAndStoreType messageEncodeAndStoreType) {
71      this.messageClass = messageClass;
72      this.messageValueFactoryClass = messageValueFactoryClass;
73      this.messageCombinerClass = messageCombinerClass;
74      this.messageClassModified = false;
75      this.messageEncodeAndStoreType = messageEncodeAndStoreType;
76    }
77  
78    @Override
79    public Class<M> getMessageClass() {
80      return messageClass;
81    }
82  
83    @Override
84    public MessageValueFactory<M> createMessageValueFactory(
85        ImmutableClassesGiraphConfiguration conf) {
86      if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
87        return new DefaultMessageValueFactory<>(messageClass, conf);
88      }
89  
90      MessageValueFactory factory = ReflectionUtils.newInstance(
91          messageValueFactoryClass, conf);
92      if (!factory.newInstance().getClass().equals(messageClass)) {
93        throw new IllegalStateException("Message factory " +
94          messageValueFactoryClass + " creates " +
95          factory.newInstance().getClass().getName() + ", but message type is " +
96          messageClass.getName());
97      }
98      return factory;
99    }
100 
101   @Override
102   public MessageCombiner<? super I, M> createMessageCombiner(
103       ImmutableClassesGiraphConfiguration conf) {
104     if (messageCombinerClass == null) {
105       return null;
106     }
107 
108     MessageCombiner combiner =
109         ReflectionUtils.newInstance(messageCombinerClass, conf);
110     if (combiner != null) {
111       Preconditions.checkState(
112           combiner.createInitialMessage().getClass().equals(messageClass));
113     }
114     return combiner;
115   }
116 
117   @Override
118   public boolean useMessageCombiner() {
119     return messageCombinerClass != null;
120   }
121 
122   @Override
123   public boolean ignoreExistingVertices() {
124     return false;
125   }
126 
127   @Override
128   public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
129     return messageEncodeAndStoreType;
130   }
131 
132   @Override
133   public MessageClasses<I, M> createCopyForNewSuperstep() {
134     return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
135         messageCombinerClass, messageEncodeAndStoreType);
136   }
137 
138   @Override
139   public void verifyConsistent(
140       ImmutableClassesGiraphConfiguration conf) {
141     Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
142         MessageValueFactory.class, messageValueFactoryClass);
143     ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
144         "Message factory", messageValueFactoryClass);
145 
146     if (messageCombinerClass != null) {
147       Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
148           MessageCombiner.class, messageCombinerClass);
149       ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
150           "Vertex id", messageCombinerClass);
151       ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
152           "Outgoing message", messageCombinerClass);
153     }
154   }
155 
156   /**
157    * Set message class
158    * @param messageClass message classs
159    */
160   public void setMessageClass(Class<M> messageClass) {
161     this.messageClassModified = true;
162     this.messageClass = messageClass;
163   }
164 
165   /**
166    * Set message class if not set already in this superstep
167    * @param messageClass message class
168    */
169   public void setIfNotModifiedMessageClass(Class<M> messageClass) {
170     if (!messageClassModified) {
171       this.messageClass = messageClass;
172     }
173   }
174 
175   public void setMessageCombinerClass(
176       Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
177     this.messageCombinerClass = messageCombinerClass;
178   }
179 
180   public void setMessageValueFactoryClass(
181       Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
182     this.messageValueFactoryClass = messageValueFactoryClass;
183   }
184 
185   public void setMessageEncodeAndStoreType(
186       MessageEncodeAndStoreType messageEncodeAndStoreType) {
187     this.messageEncodeAndStoreType = messageEncodeAndStoreType;
188   }
189 
190   @Override
191   public void write(DataOutput out) throws IOException {
192     WritableUtils.writeClass(messageClass, out);
193     WritableUtils.writeClass(messageValueFactoryClass, out);
194     WritableUtils.writeClass(messageCombinerClass, out);
195     out.writeBoolean(messageClassModified);
196     out.writeByte(messageEncodeAndStoreType.ordinal());
197   }
198 
199   @Override
200   public void readFields(DataInput in) throws IOException {
201     messageClass = WritableUtils.readClass(in);
202     messageValueFactoryClass = WritableUtils.readClass(in);
203     messageCombinerClass = WritableUtils.readClass(in);
204     messageClassModified = in.readBoolean();
205     messageEncodeAndStoreType =
206         messageEncodeAndStoreType.values()[in.readByte()];
207   }
208 }