View Javadoc

1   package org.apache.helix.webapp.resources;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.InetAddress;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.UUID;
28  
29  import org.apache.helix.HelixDataAccessor;
30  import org.apache.helix.HelixException;
31  import org.apache.helix.InstanceType;
32  import org.apache.helix.PropertyPathConfig;
33  import org.apache.helix.PropertyType;
34  import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
35  import org.apache.helix.manager.zk.ZkClient;
36  import org.apache.helix.model.LiveInstance;
37  import org.apache.helix.model.Message;
38  import org.apache.helix.model.Message.MessageType;
39  import org.apache.helix.tools.ClusterSetup;
40  import org.apache.helix.webapp.RestAdminApplication;
41  import org.apache.log4j.Logger;
42  import org.codehaus.jackson.JsonGenerationException;
43  import org.codehaus.jackson.map.JsonMappingException;
44  import org.restlet.Context;
45  import org.restlet.data.Form;
46  import org.restlet.data.MediaType;
47  import org.restlet.data.Request;
48  import org.restlet.data.Response;
49  import org.restlet.data.Status;
50  import org.restlet.resource.Representation;
51  import org.restlet.resource.Resource;
52  import org.restlet.resource.StringRepresentation;
53  import org.restlet.resource.Variant;
54  
55  
56  /**
57   * This resource can be used to send scheduler tasks to the controller.
58   * 
59   * */
60  public class SchedulerTasksResource extends Resource
61  {
62    private final static Logger LOG = Logger.getLogger(SchedulerTasksResource.class);
63  
64    public static String CRITERIA = "Criteria";
65    public static String MESSAGETEMPLATE = "MessageTemplate";
66    public static String TASKQUEUENAME = "TaskQueueName";
67    public SchedulerTasksResource(Context context,
68            Request request,
69            Response response) 
70    {
71      super(context, request, response);
72      getVariants().add(new Variant(MediaType.TEXT_PLAIN));
73      getVariants().add(new Variant(MediaType.APPLICATION_JSON));
74    }
75  
76    @Override
77    public boolean allowGet()
78    {
79      return true;
80    }
81    
82    @Override
83    public boolean allowPost()
84    {
85      return true;
86    }
87    
88    @Override
89    public boolean allowPut()
90    {
91      return false;
92    }
93    
94    @Override
95    public boolean allowDelete()
96    {
97      return false;
98    }
99    
100   @Override
101   public Representation represent(Variant variant)
102   {
103     StringRepresentation presentation = null;
104     try
105     {
106       presentation = getSchedulerTasksRepresentation();
107     }
108     
109     catch(Exception e)
110     {
111       String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
112       presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
113 
114       LOG.error("", e);
115     }  
116     return presentation;
117   }
118   
119   StringRepresentation getSchedulerTasksRepresentation() throws JsonGenerationException, JsonMappingException, IOException
120   {
121     String clusterName = (String)getRequest().getAttributes().get("clusterName");
122     String instanceName = (String)getRequest().getAttributes().get("instanceName");
123     ZkClient zkClient = (ZkClient)getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
124     ClusterSetup setupTool = new ClusterSetup(zkClient);
125     List<String> instances = setupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
126     
127     HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
128     LiveInstance liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
129     String sessionId = liveInstance.getSessionId();
130     
131     StringRepresentation representation = new StringRepresentation("");//(ClusterRepresentationUtil.ObjectToJson(instanceConfigs), MediaType.APPLICATION_JSON);
132     
133     return representation;
134   }
135   
136   @Override
137   public void acceptRepresentation(Representation entity)
138   {
139     try
140     {
141       String clusterName = (String)getRequest().getAttributes().get("clusterName");
142       Form form = new Form(entity);
143       ZkClient zkClient = (ZkClient)getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
144       
145       String msgTemplateString = ClusterRepresentationUtil.getFormJsonParameterString(form, MESSAGETEMPLATE);
146       if(msgTemplateString == null)
147       {
148         throw new HelixException("SchedulerTasksResource need to have MessageTemplate specified.");
149       }
150       Map<String, String> messageTemplate = ClusterRepresentationUtil.getFormJsonParameters(form, MESSAGETEMPLATE);
151       
152       String criteriaString = ClusterRepresentationUtil.getFormJsonParameterString(form, CRITERIA);
153       if(criteriaString == null)
154       {
155         throw new HelixException("SchedulerTasksResource need to have Criteria specified.");
156       }
157       HelixDataAccessor accessor = ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
158       LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader());
159       if(leader == null)
160       {
161         throw new HelixException("There is no leader for the cluster " + clusterName);
162       }
163       
164       Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG, UUID.randomUUID().toString());
165       schedulerMessage.getRecord().getSimpleFields().put(CRITERIA, criteriaString);
166       
167       schedulerMessage.getRecord().getMapFields().put(MESSAGETEMPLATE, messageTemplate);
168       
169       schedulerMessage.setTgtSessionId(leader.getSessionId());
170       schedulerMessage.setTgtName("CONTROLLER");
171       schedulerMessage.setSrcInstanceType(InstanceType.CONTROLLER);
172       String taskQueueName =  ClusterRepresentationUtil.getFormJsonParameterString(form, TASKQUEUENAME);
173       if(taskQueueName != null && taskQueueName.length() > 0)
174       {
175         schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, taskQueueName);
176       }
177       accessor.setProperty(accessor.keyBuilder().controllerMessage(schedulerMessage.getMsgId()), schedulerMessage);
178       
179       Map<String, String> resultMap = new HashMap<String, String>();
180       resultMap.put("StatusUpdatePath", PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName, MessageType.SCHEDULER_MSG.toString(),schedulerMessage.getMsgId()));
181       resultMap.put("MessageType", Message.MessageType.SCHEDULER_MSG.toString());
182       resultMap.put("MsgId", schedulerMessage.getMsgId());
183       
184       // Assemble the rest URL for task status update
185       String ipAddress = InetAddress.getLocalHost().getCanonicalHostName();
186       String url = "http://" + ipAddress+":" + getContext().getAttributes().get(RestAdminApplication.PORT)
187           + "/clusters/" + clusterName+"/Controller/statusUpdates/SCHEDULER_MSG/" + schedulerMessage.getMsgId();
188       resultMap.put("statusUpdateUrl", url);
189       
190       getResponse().setEntity(ClusterRepresentationUtil.ObjectToJson(resultMap), MediaType.APPLICATION_JSON);
191       getResponse().setStatus(Status.SUCCESS_OK);
192     }
193     catch(Exception e)
194     {
195       getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
196           MediaType.APPLICATION_JSON);
197       getResponse().setStatus(Status.SUCCESS_OK);
198       LOG.error("", e);
199     }  
200   }
201 }