1/*2 * Licensed to the Apache Software Foundation (ASF) under one3 * or more contributor license agreements. See the NOTICE file4 * distributed with this work for additional information5 * regarding copyright ownership. The ASF licenses this file6 * to you under the Apache License, Version 2.0 (the7 * "License"); you may not use this file except in compliance8 * with the License. You may obtain a copy of the License at9 *10 * http://www.apache.org/licenses/LICENSE-2.011 *12 * Unless required by applicable law or agreed to in writing, software13 * distributed under the License is distributed on an "AS IS" BASIS,14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.15 * See the License for the specific language governing permissions and16 * limitations under the License.17 */18package org.apache.giraph.writable.kryo;
1920import java.io.DataInput;
21import java.io.DataOutput;
22import java.util.Arrays;
23import java.util.Collections;
24import java.util.LinkedHashMap;
25import java.util.Map;
26import java.util.Map.Entry;
27import java.util.Random;
2829import com.esotericsoftware.kryo.util.DefaultClassResolver;
30import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
31import org.apache.giraph.conf.GiraphConfigurationSettable;
32import com.esotericsoftware.kryo.ClassResolver;
33import com.esotericsoftware.kryo.ReferenceResolver;
34import com.esotericsoftware.kryo.util.MapReferenceResolver;
35import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
36import org.apache.giraph.types.ops.collections.BasicSet;
37import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
38import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
39import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
40import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
41import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
42import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
43import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils;
44import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
45import org.apache.hadoop.conf.Configurable;
46import org.apache.hadoop.conf.Configuration;
47import org.apache.hadoop.io.Writable;
48import org.apache.log4j.Logger;
49import org.objenesis.strategy.StdInstantiatorStrategy;
5051import com.esotericsoftware.kryo.Kryo;
52import com.esotericsoftware.kryo.Serializer;
53import com.esotericsoftware.kryo.factories.SerializerFactory;
54import com.esotericsoftware.kryo.io.Input;
55import com.esotericsoftware.kryo.io.InputChunked;
56import com.esotericsoftware.kryo.io.Output;
57import com.esotericsoftware.kryo.io.OutputChunked;
58import com.esotericsoftware.kryo.pool.KryoCallback;
59import com.esotericsoftware.kryo.pool.KryoFactory;
60import com.esotericsoftware.kryo.pool.KryoPool;
61import com.esotericsoftware.kryo.serializers.ClosureSerializer;
62import com.esotericsoftware.kryo.serializers.FieldSerializer;
63import com.esotericsoftware.kryo.util.ObjectMap;
64import com.google.common.base.Preconditions;
6566import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
6768/**69 * Kryo instance that provides serialization through DataInput/DataOutput70 * that org.apache.hadoop.io.Writable uses.71 *72 * All public APIs are static.73 *74 * It extends Kryo to reuse KryoPool functionality, but have additional needed75 * objects cached as well. If we move to ThreadLocal or other caching76 * technique, we can use composition, instead of inheritance here.77 *78 * TODO: Refactor this class into two separate classes depending on79 * whether the reference tracking is enabled or disabled.80 */81publicclassHadoopKryoextends Kryo {
82/** Pool of reusable Kryo objects, since they are expensive to create */83privatestaticfinal KryoPool KRYO_POOL = new KryoPool.Builder(
84new KryoFactory() {
85 @Override
86public Kryo create() {
87return createKryo(true, true);
88 }
89 }).build();
90/** Thread local HadoopKryo object */91privatestaticfinal ThreadLocal<HadoopKryo> KRYO =
92new ThreadLocal<HadoopKryo>() {
93 @Override protectedHadoopKryo initialValue() {
94return createKryo(false, false);
95 }
96 };
9798/**99 * List of interfaces/parent classes that will not be allowed to be100 * serialized, together with explanation of why, that will be shown101 * when throwing such exception102 */103privatestaticfinal Map<Class<?>, String> NON_SERIALIZABLE;
104105static {
106 NON_SERIALIZABLE = new LinkedHashMap<>();
107 NON_SERIALIZABLE.put(
108 NonKryoWritable.class,
109"it is marked to not allow serialization, " +
110"look at the class for more details");
111 NON_SERIALIZABLE.put(
112 KryoWritableWrapper.class, "recursion is disallowed");
113 NON_SERIALIZABLE.put(
114 Configuration.class,
115"it cannot be supported since it contains ClassLoader");
116 NON_SERIALIZABLE.put(
117 GiraphConfigurationSettable.class, "configuration cannot be set");
118 NON_SERIALIZABLE.put(
119 Configurable.class, "configuration cannot be set");
120 NON_SERIALIZABLE.put(
121 Random.class,
122"it should be rarely serialized, since it would create same stream " +
123"of numbers everywhere, use TransientRandom instead");
124 NON_SERIALIZABLE.put(
125 Logger.class,
126"Logger must be a static field");
127 }
128129/** Reusable Input object */130private InputChunked input;
131/** Reusable Output object */132private OutputChunked output;
133134/** Reusable DataInput wrapper stream */135privateDataInputWrapperStream dataInputWrapperStream;
136/** Reusable DataOutput wrapper stream */137privateDataOutputWrapperStream dataOutputWrapperStream;
138139/**140 * Map of already initialized serializers used141 * for readIntoObject/writeOutOfObject pair of methods142 */143privatefinal ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
144 classToIntoSerializer = new ObjectMap<>();
145146/** Hide constructor, so all access go through pool of cached objects */147privateHadoopKryo() {
148 }
149150/**151 * Constructor that takes custom class resolver and reference resolver.152 * @param classResolver Class resolver153 * @param referenceResolver Reference resolver154 */155privateHadoopKryo(ClassResolver classResolver,
156 ReferenceResolver referenceResolver) {
157super(classResolver, referenceResolver);
158 }
159160// Public API:161162/**163 * Write type of given object and the object itself to the output stream.164 * Inverse of readClassAndObj.165 *166 * @param out Output stream167 * @param object Object to write168 */169publicstaticvoid writeClassAndObj(
170final DataOutput out, final Object object) {
171 writeInternal(out, object, false);
172 }
173174/**175 * Read object from the input stream, by reading first type of the object,176 * and then all of its fields.177 * Inverse of writeClassAndObject.178 *179 * @param in Input stream180 * @return Deserialized object181 * @param <T> Type of the object being read182 */183publicstatic <T> T readClassAndObj(DataInput in) {
184return readInternal(in, null, false);
185 }
186187/**188 * Write an object to output, in a way that can be read by readIntoObject.189 *190 * @param out Output stream191 * @param object Object to be written192 */193publicstaticvoid writeOutOfObject(
194final DataOutput out, final Object object) {
195 writeInternal(out, object, true);
196 }
197198/**199 * Reads an object, from input, into a given object,200 * allowing object reuse.201 * Inverse of writeOutOfObject.202 *203 * @param in Input stream204 * @param object Object to fill from input205 */206publicstaticvoid readIntoObject(DataInput in, Object object) {
207 readInternal(in, object, true);
208 }
209210/**211 * Writes class and object to specified output stream with specified212 * Kryo object. It does not use interim buffers.213 * @param kryo Kryo object214 * @param out Output stream215 * @param object Object216 */217publicstaticvoid writeWithKryo(
218finalHadoopKryo kryo, final Output out,
219final Object object) {
220 kryo.writeClassAndObject(out, object);
221 out.close();
222 }
223224/**225 * Write out of object with given kryo226 * @param kryo Kryo object227 * @param out Output228 * @param object Object to write229 */230publicstaticvoid writeWithKryoOutOfObject(
231finalHadoopKryo kryo, final Output out,
232final Object object) {
233 kryo.writeOutOfObject(out, object);
234 out.close();
235 }
236237/**238 * Reads class and object from specified input stream with239 * specified kryo object.240 * it does not use interim buffers.241 * @param kryo Kryo object242 * @param in Input buffer243 * @param <T> Object type parameter244 * @return Object245 */246publicstatic <T> T readWithKryo(
247finalHadoopKryo kryo, final Input in) {
248 T object;
249 object = (T) kryo.readClassAndObject(in);
250 in.close();
251return object;
252 }
253254/**255 * Read into object with given kryo.256 * @param kryo Kryo object257 * @param in Input258 * @param object Object to read into259 */260publicstaticvoid readWithKryoIntoObject(
261finalHadoopKryo kryo, final Input in, Object object) {
262 kryo.readIntoObject(in, object);
263 in.close();
264 }
265266/**267 * Create copy of the object, by magically recursively copying268 * all of its fields, keeping reference structures (like cycles)269 *270 * @param object Object to be copied271 * @return Copy of the object.272 * @param <T> Type of the object273 */274publicstatic <T> T createCopy(final T object) {
275return KRYO_POOL.run(new KryoCallback<T>() {
276 @Override
277public T execute(Kryo kryo) {
278return kryo.copy(object);
279 }
280 });
281 }
282283/**284 * Returns a kryo which doesn't track objects, hence285 * serialization of recursive/nested objects is not286 * supported.287 *288 * Reference tracking significantly degrades the performance289 * since kryo has to store all serialized objects and search290 * the history to check if an object has been already serialized.291 *292 * @return Hadoop kryo which doesn't track objects.293 */294publicstaticHadoopKryo getNontrackingKryo() {
295return KRYO.get();
296 }
297298// Private implementation:299300/**301 * Create new instance of HadoopKryo, properly initialized.302 * @param trackReferences if true, object references are tracked.303 * @param hasBuffer if true, an interim buffer is used.304 * @return new HadoopKryo instance305 */306privatestaticHadoopKryo createKryo(boolean trackReferences,
307boolean hasBuffer) {
308HadoopKryo kryo;
309if (trackReferences) {
310 kryo = newHadoopKryo();
311 } else {
312// Only use GiraphClassResolver if it is properly initialized.313// This is to enable test cases which use KryoSimpleWrapper314// but don't start ZK.315 kryo = newHadoopKryo(
316 GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
317new DefaultClassResolver(),
318new MapReferenceResolver());
319 }
320321try {
322 kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
323 kryo.register(Class.forName(
324"com.esotericsoftware.kryo.serializers.ClosureSerializer$Closure"),
325new ClosureSerializer());
326 } catch (ClassNotFoundException e) {
327thrownew IllegalStateException(
328"Trying to use Kryo on Java version " +
329 System.getProperty("java.version") +
330", but unable to find needed classes", e);
331 }
332333 kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
334 kryo.register(Collections.nCopies(1, new Object()).getClass(),
335newCollectionsNCopiesSerializer());
336337 ImmutableListSerializer.registerSerializers(kryo);
338 ImmutableMapSerializer.registerSerializers(kryo);
339 ImmutableBiMapSerializerUtils.registerSerializers(kryo);
340341// There are many fastutil classes, register them at the end,342// so they don't use up small registration numbers343 FastUtilSerializer.registerAll(kryo);
344345 kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
346new StdInstantiatorStrategy()));
347348 SerializerFactory customSerializerFactory = new SerializerFactory() {
349 @SuppressWarnings("rawtypes")
350 @Override
351public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
352for (final Entry<Class<?>, String> entry :
353 NON_SERIALIZABLE.entrySet()) {
354if (entry.getKey().isAssignableFrom(type)) {
355// Allow Class object to be serialized, but not a live instance.356returnnew Serializer() {
357 @Override
358public Object read(Kryo kryo, Input input, Class type) {
359thrownew RuntimeException("Cannot serialize " + type +
360". Objects being serialized cannot capture " +
361 entry.getKey() + " because " + entry.getValue() +
362". Either remove field in question" +
363", or make it transient (so that it isn't serialized)");
364 }
365366 @Override
367publicvoid write(Kryo kryo, Output output, Object object) {
368thrownew RuntimeException("Cannot serialize " + type +
369". Objects being serialized cannot capture " +
370 entry.getKey() + " because " + entry.getValue() +
371". Either remove field in question" +
372", or make it transient (so that it isn't serialized)");
373 }
374 };
375 }
376 }
377378if (Writable.class.isAssignableFrom(type) &&
379 !KryoIgnoreWritable.class.isAssignableFrom(type) &&
380// remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,381// for lack of constructors382 !BasicSet.class.isAssignableFrom(type) &&
383 !Basic2ObjectMap.class.isAssignableFrom(type)) {
384// use the Writable method defined by the type385DirectWritableSerializer serializer = newDirectWritableSerializer();
386return serializer;
387 } else {
388 FieldSerializer serializer = new FieldSerializer<>(kryo, type);
389 serializer.setIgnoreSyntheticFields(false);
390return serializer;
391 }
392 }
393 };
394395 kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
396 kryo.setDefaultSerializer(customSerializerFactory);
397398if (hasBuffer) {
399 kryo.input = new InputChunked(4096);
400 kryo.output = new OutputChunked(4096);
401 kryo.dataInputWrapperStream = newDataInputWrapperStream();
402 kryo.dataOutputWrapperStream = newDataOutputWrapperStream();
403 }
404405if (!trackReferences) {
406 kryo.setReferences(false);
407408// Auto reset can only be disabled if the GiraphClassResolver is409// properly initialized.410if (GiraphClassResolver.isInitialized()) {
411 kryo.setAutoReset(false);
412 }
413 }
414return kryo;
415 }
416417/**418 * Register serializer for class with class name419 *420 * @param kryo HadoopKryo421 * @param className Name of the class for which to register serializer422 * @param serializer Serializer to use423 */424privatestaticvoid registerSerializer(HadoopKryo kryo, String className,
425 Serializer serializer) {
426try {
427 kryo.register(Class.forName(className), serializer);
428 } catch (ClassNotFoundException e) {
429thrownew IllegalStateException("Class " + className + " is missing", e);
430 }
431 }
432433/**434 * Initialize reusable objects for reading from given DataInput.435 *436 * @param in Input stream437 */438privatevoid setDataInput(DataInput in) {
439 dataInputWrapperStream.setDataInput(in);
440 input.setInputStream(dataInputWrapperStream);
441 }
442443/**444 * Initialize reusable objects for writing into given DataOutput.445 *446 * @param out Output stream447 */448privatevoid setDataOutput(DataOutput out) {
449 dataOutputWrapperStream.setDataOutput(out);
450 output.setOutputStream(dataOutputWrapperStream);
451 }
452453/**454 * Get or create reusable serializer for given class.455 *456 * @param type Type of the object457 * @return Serializer458 */459private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
460 Class<?> type) {
461 ReusableFieldSerializer<Object> serializer =
462 classToIntoSerializer.get(type);
463if (serializer == null) {
464 serializer = new ReusableFieldSerializer<>(this, type);
465 classToIntoSerializer.put(type, serializer);
466 }
467return serializer;
468 }
469470/**471 * Internal write implementation, that reuses HadoopKryo objects472 * from the pool.473 *474 * @param out Output stream475 * @param object Object to be written476 * @param outOf whether we are writing reusable objects,477 * or full objects with class name478 */479privatestaticvoid writeInternal(
480final DataOutput out, final Object object, finalboolean outOf) {
481 KRYO_POOL.run(new KryoCallback<Void>() {
482 @Override
483public Void execute(Kryo kryo) {
484HadoopKryo hkryo = (HadoopKryo) kryo;
485 hkryo.setDataOutput(out);
486487if (outOf) {
488 hkryo.writeOutOfObject(hkryo.output, object);
489 } else {
490 hkryo.writeClassAndObject(hkryo.output, object);
491 }
492493 hkryo.output.endChunks();
494 hkryo.output.close();
495496returnnull;
497 }
498 });
499 }
500501/**502 * Internal read implementation, that reuses HadoopKryo objects503 * from the pool.504 *505 * @param in Input stream506 * @param outObject Object to fill from input (if not null)507 * @param into whether we are reading reusable objects,508 * or full objects with class name509 * @return Read object (new one, or same passed in if we use reusable)510 * @param <T> Type of the object to read511 */512 @SuppressWarnings("unchecked")
513privatestatic <T> T readInternal(
514final DataInput in, final T outObject, finalboolean into) {
515return KRYO_POOL.run(new KryoCallback<T>() {
516 @Override
517public T execute(Kryo kryo) {
518HadoopKryo hkryo = (HadoopKryo) kryo;
519 hkryo.setDataInput(in);
520521 T object;
522if (into) {
523 hkryo.readIntoObject(hkryo.input, outObject);
524 object = outObject;
525 } else {
526 object = (T) hkryo.readClassAndObject(hkryo.input);
527 }
528 hkryo.input.nextChunks();
529530 hkryo.input.close();
531return object;
532 }
533 });
534 }
535536/**537 * Reads an object, from input, into a given object,538 * allowing object reuse.539 *540 * @param input Input stream541 * @param object Object to fill from input542 */543privatevoid readIntoObject(Input input, Object object) {
544 Preconditions.checkNotNull(object);
545546 Class<?> type = object.getClass();
547 ReusableFieldSerializer<Object> serializer =
548 getOrCreateReusableSerializer(type);
549550 serializer.setReadIntoObject(object);
551 Object result = readObject(input, type, serializer);
552553 Preconditions.checkState(result == object);
554 }
555556/**557 * Write an object to output, in a way that can be read558 * using readIntoObject.559 * @param output Output stream560 * @param object Object to be written561 */562privatevoid writeOutOfObject(Output output, Object object) {
563 ReusableFieldSerializer<Object> serializer =
564 getOrCreateReusableSerializer(object.getClass());
565 writeObject(output, object, serializer);
566 }
567568 }