1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */1819package org.apache.giraph.comm.messages;
2021import it.unimi.dsi.fastutil.longs.LongArrayList;
22import it.unimi.dsi.fastutil.longs.LongListIterator;
2324import java.io.IOException;
25import java.util.Iterator;
2627import org.apache.giraph.factories.MessageValueFactory;
28import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
29import org.apache.giraph.utils.ExtendedDataOutput;
30import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
31import org.apache.hadoop.io.Writable;
3233/**34 * Create an iterable for messages based on a pointer list35 *36 * @param <M> messageType37 */38publicclass PointerListMessagesIterable<M extends Writable>
39implements Iterable<M> {
40/** Message class */41privatefinal MessageValueFactory<M> messageValueFactory;
42/** List of pointers to messages in byte array */43privatefinal LongArrayList pointers;
44/** Holds the byte arrays of serialized messages */45privatefinalExtendedByteArrayOutputBuffer msgBuffer;
46/** Reader to read data from byte buffer */47privatefinalUnsafeReusableByteArrayInput messageReader;
4849/**50 *51 * @param messageValueFactory message value factory52 * @param pointers pointers to messages in buffer53 * @param msgBuffer holds the byte arrays of serialized messages54 */55publicPointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
56 LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
57this.messageValueFactory = messageValueFactory;
58this.pointers = pointers;
59this.msgBuffer = msgBuffer;
60// TODO - if needed implement same for Safe as well61 messageReader = newUnsafeReusableByteArrayInput();
62 }
6364/**65 * Create message from factory66 *67 * @return message instance68 */69protected M createMessage() {
70return messageValueFactory.newInstance();
71 }
7273 @Override
74public Iterator<M> iterator() {
75returnnew Iterator<M>() {
76privatefinal LongListIterator iterator = pointers.iterator();
77privatefinal M reusableMsg =
78 PointerListMessagesIterable.this.createMessage();
79 @Override
80publicboolean hasNext() {
81return iterator.hasNext();
82 }
8384 @Override
85public M next() {
86long pointer = iterator.nextLong();
87try {
88int index = (int) (pointer >>> 32);
89int offset = (int) pointer;
90ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
91 messageReader.initialize(buffer.getByteArray(), offset,
92 buffer.getPos());
93 reusableMsg.readFields(messageReader);
94 } catch (IOException e) {
95thrownew IllegalStateException("Got exception : " + e);
96 }
97return reusableMsg;
98 }
99100 @Override
101publicvoid remove() {
102thrownew UnsupportedOperationException();
103 }
104 };
105 }
106 }